Skip to content
Snippets Groups Projects
train_allele_specific_models_command.py 16.5 KiB
Newer Older
Tim O'Donnell's avatar
Tim O'Donnell committed
"""
Tim O'Donnell's avatar
Tim O'Donnell committed
Train Class1 single allele models.
Tim O'Donnell's avatar
Tim O'Donnell committed
"""
import argparse
import signal
Tim O'Donnell's avatar
Tim O'Donnell committed
import sys
Tim O'Donnell's avatar
Tim O'Donnell committed
import time
Tim O'Donnell's avatar
Tim O'Donnell committed
import random
from multiprocessing import Pool, Queue, cpu_count
from pprint import pprint
Tim O'Donnell's avatar
Tim O'Donnell committed

import pandas
import tqdm  # progress bar
Tim O'Donnell's avatar
Tim O'Donnell committed
tqdm.monitor_interval = 0  # see https://github.com/tqdm/tqdm/issues/481
Tim O'Donnell's avatar
Tim O'Donnell committed

from .class1_affinity_predictor import Class1AffinityPredictor
from .common import configure_logging, set_keras_backend
Tim O'Donnell's avatar
Tim O'Donnell committed


# To avoid pickling large matrices to send to child processes when running in
# parallel, we use this global variable as a place to store data. Data that is
# stored here before creating the thread pool will be inherited to the child
# processes upon fork() call, allowing us to share large data with the workers
Tim O'Donnell's avatar
Tim O'Donnell committed
# via shared memory.
Tim O'Donnell's avatar
Tim O'Donnell committed
# Note on parallelization:
# It seems essential currently (tensorflow==1.4.1) that no processes are forked
# after tensorflow has been used at all, which includes merely importing
# keras.backend. So we must make sure not to use tensorflow in the main process
# if we are running in parallel.

Tim O'Donnell's avatar
Tim O'Donnell committed
parser = argparse.ArgumentParser(usage=__doc__)

parser.add_argument(
Tim O'Donnell's avatar
Tim O'Donnell committed
    metavar="FILE.csv",
Tim O'Donnell's avatar
Tim O'Donnell committed
    help=(
        "Training data CSV. Expected columns: "
        "allele, peptide, measurement_value"))
Tim O'Donnell's avatar
Tim O'Donnell committed
parser.add_argument(
    "--out-models-dir",
Tim O'Donnell's avatar
Tim O'Donnell committed
    metavar="DIR",
Tim O'Donnell's avatar
Tim O'Donnell committed
    help="Directory to write models and manifest")
parser.add_argument(
    "--hyperparameters",
Tim O'Donnell's avatar
Tim O'Donnell committed
    metavar="FILE.json",
Tim O'Donnell's avatar
Tim O'Donnell committed
    required=True,
    help="JSON or YAML of hyperparameters")
Tim O'Donnell's avatar
Tim O'Donnell committed
parser.add_argument(
    "--allele",
    default=None,
    nargs="+",
    help="Alleles to train models for. If not specified, all alleles with "
    "enough measurements will be used.")
Tim O'Donnell's avatar
Tim O'Donnell committed
parser.add_argument(
    "--min-measurements-per-allele",
    type=int,
Tim O'Donnell's avatar
Tim O'Donnell committed
    default=50,
    help="Train models for alleles with >=N measurements.")
parser.add_argument(
    "--ignore-inequalities",
    action="store_true",
    default=False,
    help="Do not use affinity value inequalities even when present in data")
Tim O'Donnell's avatar
Tim O'Donnell committed
parser.add_argument(
    "--percent-rank-calibration-num-peptides-per-length",
    type=int,
Tim O'Donnell's avatar
Tim O'Donnell committed
    metavar="N",
Tim O'Donnell's avatar
Tim O'Donnell committed
    default=int(1e5),
    help="Number of peptides per length to use to calibrate percent ranks. "
    "Set to 0 to disable percent rank calibration. The resulting models will "
Tim O'Donnell's avatar
Tim O'Donnell committed
    "not support percent ranks. Default: %(default)s.")
parser.add_argument(
    "--n-models",
    type=int,
    metavar="N",
    help="Ensemble size, i.e. how many models to train for each architecture. "
    "If specified here it overrides any 'n_models' specified in the "
    "hyperparameters.")
parser.add_argument(
    "--max-epochs",
    type=int,
    metavar="N",
    help="Max training epochs. If specified here it overrides any 'max_epochs' "
    "specified in the hyperparameters.")
Tim O'Donnell's avatar
Tim O'Donnell committed
parser.add_argument(
Tim O'Donnell's avatar
Tim O'Donnell committed
    type=int,
Tim O'Donnell's avatar
Tim O'Donnell committed
    help="Keras verbosity. Default: %(default)s",
    default=0)
Tim O'Donnell's avatar
Tim O'Donnell committed
    "--num-jobs",
Tim O'Donnell's avatar
Tim O'Donnell committed
    help="Number of processes to parallelize training and percent rank "
    "calibration over, respectively. Experimental. If only one value is specified "
    "then the same number of jobs is used for both phases."
Tim O'Donnell's avatar
Tim O'Donnell committed
    "Set to 1 for serial run. Set to 0 to use number of cores. Default: %(default)s.")
parser.add_argument(
    "--backend",
Tim O'Donnell's avatar
Tim O'Donnell committed
    choices=("tensorflow-gpu", "tensorflow-cpu", "tensorflow-default"),
    help="Keras backend. If not specified will use system default.")
Tim O'Donnell's avatar
Tim O'Donnell committed
parser.add_argument(
    "--gpus",
Tim O'Donnell's avatar
Tim O'Donnell committed
    type=int,
    metavar="N",
    help="Number of GPUs to attempt to parallelize across. Requires running "
    "in parallel.")
parser.add_argument(
    "--max-workers-per-gpu",
    type=int,
    metavar="N",
    default=1000,
    help="Maximum number of workers to assign to a GPU. Additional tasks will "
    "run on CPU.")
Tim O'Donnell's avatar
Tim O'Donnell committed

Tim O'Donnell's avatar
Tim O'Donnell committed
def run(argv=sys.argv[1:]):
    # On sigusr1 print stack trace
    print("To show stack trace, run:\nkill -s USR1 %d" % os.getpid())
    signal.signal(signal.SIGUSR1, lambda sig, frame: traceback.print_stack())

Tim O'Donnell's avatar
Tim O'Donnell committed
    args = parser.parse_args(argv)
Tim O'Donnell's avatar
Tim O'Donnell committed

Tim O'Donnell's avatar
Tim O'Donnell committed
    configure_logging(verbose=args.verbosity > 1)

    hyperparameters_lst = yaml.load(open(args.hyperparameters))
    assert isinstance(hyperparameters_lst, list)
Tim O'Donnell's avatar
Tim O'Donnell committed
    print("Loaded hyperparameters list: %s" % str(hyperparameters_lst))

    df = pandas.read_csv(args.data)
    print("Loaded training data: %s" % (str(df.shape)))

    df = df.ix[
        (df.peptide.str.len() >= 8) & (df.peptide.str.len() <= 15)
    ]
    print("Subselected to 8-15mers: %s" % (str(df.shape)))
Tim O'Donnell's avatar
Tim O'Donnell committed

    if args.ignore_inequalities and "measurement_inequality" in df.columns:
        print("Dropping measurement_inequality column")
        del df["measurement_inequality"]

Tim O'Donnell's avatar
Tim O'Donnell committed
    # Allele counts are in terms of quantitative data only.
    allele_counts = (
        df.loc[df.measurement_type == "quantitative"].allele.value_counts())
Tim O'Donnell's avatar
Tim O'Donnell committed

        alleles = [normalize_allele_name(a) for a in args.allele]
    else:
        alleles = list(allele_counts.ix[
            allele_counts > args.min_measurements_per_allele
        ].index)
Tim O'Donnell's avatar
Tim O'Donnell committed

    # Allele names in data are assumed to be already normalized.
    df = df.loc[df.allele.isin(alleles)].dropna()

    print("Selected %d alleles: %s" % (len(alleles), ' '.join(alleles)))
    print("Training data: %s" % (str(df.shape)))
Tim O'Donnell's avatar
Tim O'Donnell committed

    GLOBAL_DATA["train_data"] = df
Tim O'Donnell's avatar
Tim O'Donnell committed
    GLOBAL_DATA["args"] = args
Tim O'Donnell's avatar
Tim O'Donnell committed
    predictor = Class1AffinityPredictor()
Tim O'Donnell's avatar
Tim O'Donnell committed
        # Serial run.
Tim O'Donnell's avatar
Tim O'Donnell committed
        print("Running in serial.")
Tim O'Donnell's avatar
Tim O'Donnell committed
        if args.backend:
            set_keras_backend(args.backend)

Tim O'Donnell's avatar
Tim O'Donnell committed
        # Parallel run.
        num_workers = args.num_jobs[0] if args.num_jobs[0] else cpu_count()

        worker_init_args = None
Tim O'Donnell's avatar
Tim O'Donnell committed
        if args.gpus:
Tim O'Donnell's avatar
Tim O'Donnell committed
            print("Attempting to round-robin assign each worker a GPU.")
Tim O'Donnell's avatar
Tim O'Donnell committed
            if args.backend != "tensorflow-default":
                print("Forcing keras backend to be tensorflow-default")
                args.backend = "tensorflow-default"
Tim O'Donnell's avatar
Tim O'Donnell committed

            gpu_assignments_remaining = dict((
                (gpu, args.max_workers_per_gpu) for gpu in range(args.gpus)
            ))
            worker_init_args = Queue()
            for worker_num in range(num_workers):
                if gpu_assignments_remaining:
                    # Use a GPU
                    gpu_num = sorted(
                        gpu_assignments_remaining,
                        key=lambda key: gpu_assignments_remaining[key])[0]
                    gpu_assignments_remaining[gpu_num] -= 1
                    if not gpu_assignments_remaining[gpu_num]:
                        del gpu_assignments_remaining[gpu_num]
Tim O'Donnell's avatar
Tim O'Donnell committed
                    gpu_assignment = [gpu_num]
                else:
                    # Use CPU
                    gpu_assignment = []

                worker_init_args.put({
                    'gpu_device_nums': gpu_assignment,
                    'keras_backend': args.backend
                })
Tim O'Donnell's avatar
Tim O'Donnell committed
                print("Worker %d assigned GPUs: %s" % (worker_num, gpu_assignment))
Tim O'Donnell's avatar
Tim O'Donnell committed
            initializer=worker_init_entrypoint,
            initargs=(worker_init_args,),
            processes=num_workers)
        print("Started pool of %d workers: %s" % (num_workers, str(worker_pool)))
Tim O'Donnell's avatar
Tim O'Donnell committed

    if args.out_models_dir and not os.path.exists(args.out_models_dir):
        print("Attempting to create directory: %s" % args.out_models_dir)
        os.mkdir(args.out_models_dir)
        print("Done.")

Tim O'Donnell's avatar
Tim O'Donnell committed
    start = time.time()
Tim O'Donnell's avatar
Tim O'Donnell committed
    work_items = []
Tim O'Donnell's avatar
Tim O'Donnell committed
    for (h, hyperparameters) in enumerate(hyperparameters_lst):
        n_models = None
        if 'n_models' in hyperparameters:
            n_models = hyperparameters.pop("n_models")
        if args.n_models:
            n_models = args.n_models
        if not n_models:
            raise ValueError("Specify --ensemble-size or n_models hyperparameter")

        if args.max_epochs:
            hyperparameters['max_epochs'] = args.max_epochs
Tim O'Donnell's avatar
Tim O'Donnell committed

        for (i, allele) in enumerate(df.allele.unique()):
            for model_group in range(n_models):
                work_dict = {
                    'model_group': model_group,
                    'n_models': n_models,
                    'allele_num': i,
                    'n_alleles': len(alleles),
                    'hyperparameter_set_num': h,
                    'num_hyperparameter_sets': len(hyperparameters_lst),
                    'allele': allele,
                    'data': None,  # subselect from GLOBAL_DATA["train_data"]
                    'verbose': args.verbosity,
Tim O'Donnell's avatar
Tim O'Donnell committed
                    'progress_print_interval': None if worker_pool else 5.0,
                    'predictor': predictor if not worker_pool else None,
                    'save_to': args.out_models_dir if not worker_pool else None,
                }
                work_items.append(work_dict)

Tim O'Donnell's avatar
Tim O'Donnell committed
    if worker_pool:
        print("Processing %d work items in parallel." % len(work_items))
Tim O'Donnell's avatar
Tim O'Donnell committed
        # The estimated time to completion is more accurate if we randomize
        # the order of the work.
        random.shuffle(work_items)

        # We sort here so the predictors are in order of hyperparameter set num.
        # This is convenient so that the neural networks get merged for each
        # allele in the same order.
        predictors = [
            predictor for (_, predictor)
            in sorted(
                tqdm.tqdm(
                    worker_pool.imap_unordered(
                        train_model_entrypoint, work_items, chunksize=1),
Tim O'Donnell's avatar
Tim O'Donnell committed
                    total=len(work_items)),
                key=lambda pair: pair[0])
Tim O'Donnell's avatar
Tim O'Donnell committed

        print("Merging %d predictors fit in parallel." % (len(predictors)))
        predictor = Class1AffinityPredictor.merge([predictor] + predictors)
        print("Saving merged predictor to: %s" % args.out_models_dir)
        predictor.save(args.out_models_dir)
    else:
        # Run in serial. In this case, every worker is passed the same predictor,
        # which it adds models to, so no merging is required. It also saves
        # as it goes so no saving is required at the end.
        for _ in tqdm.trange(len(work_items)):
            item = work_items.pop(0)  # want to keep freeing up memory
            (_, work_predictor) = train_model_entrypoint(item)
Tim O'Donnell's avatar
Tim O'Donnell committed
            assert work_predictor is predictor
        assert not work_items
Tim O'Donnell's avatar
Tim O'Donnell committed
    print("*" * 30)
Tim O'Donnell's avatar
Tim O'Donnell committed
    training_time = time.time() - start
Tim O'Donnell's avatar
Tim O'Donnell committed
    print("Trained affinity predictor with %d networks in %0.2f min." % (
        len(predictor.neural_networks), training_time / 60.0))
Tim O'Donnell's avatar
Tim O'Donnell committed
    print("*" * 30)

Tim O'Donnell's avatar
Tim O'Donnell committed
    if worker_pool:
        worker_pool.close()
        worker_pool.join()
Tim O'Donnell's avatar
Tim O'Donnell committed
        worker_pool = None
Tim O'Donnell's avatar
Tim O'Donnell committed

Tim O'Donnell's avatar
Tim O'Donnell committed
    start = time.time()
Tim O'Donnell's avatar
Tim O'Donnell committed
    if args.percent_rank_calibration_num_peptides_per_length > 0:
        alleles = list(predictor.supported_alleles)

Tim O'Donnell's avatar
Tim O'Donnell committed
        print("Performing percent rank calibration. Encoding peptides.")
        encoded_peptides = predictor.calibrate_percentile_ranks(
Tim O'Donnell's avatar
Tim O'Donnell committed
            alleles=[],  # don't actually do any calibration, just return peptides
            num_peptides_per_length=args.percent_rank_calibration_num_peptides_per_length)
Tim O'Donnell's avatar
Tim O'Donnell committed

        # Now we encode the peptides for each neural network, so the encoding
        # becomes cached.
        for network in predictor.neural_networks:
            network.peptides_to_network_input(encoded_peptides)
Tim O'Donnell's avatar
Tim O'Donnell committed
        assert encoded_peptides.encoding_cache  # must have cached the encoding
Tim O'Donnell's avatar
Tim O'Donnell committed
        print("Finished encoding peptides for percent ranks in %0.2f sec." % (
Tim O'Donnell's avatar
Tim O'Donnell committed
            time.time() - start))
Tim O'Donnell's avatar
Tim O'Donnell committed
        print("Calibrating percent rank calibration for %d alleles." % len(alleles))
Tim O'Donnell's avatar
Tim O'Donnell committed
            # Serial run
Tim O'Donnell's avatar
Tim O'Donnell committed
            print("Running in serial.")
Tim O'Donnell's avatar
Tim O'Donnell committed
            worker_pool = None
            results = (
                calibrate_percentile_ranks(
                    allele=allele,
Tim O'Donnell's avatar
Tim O'Donnell committed
                    predictor=predictor,
                    peptides=encoded_peptides)
                for allele in alleles)
Tim O'Donnell's avatar
Tim O'Donnell committed
        else:
            # Parallel run
            # Store peptides in global variable so they are in shared memory
            # after fork, instead of needing to be pickled.
            GLOBAL_DATA["calibration_peptides"] = encoded_peptides
Tim O'Donnell's avatar
Tim O'Donnell committed
            worker_pool = Pool(
                processes=(
                    args.num_jobs[-1]
                    if args.num_jobs[-1] else None))
Tim O'Donnell's avatar
Tim O'Donnell committed
            print("Using worker pool: %s" % str(worker_pool))
            results = worker_pool.imap_unordered(
                partial(
                    calibrate_percentile_ranks,
Tim O'Donnell's avatar
Tim O'Donnell committed
                    predictor=predictor),
Tim O'Donnell's avatar
Tim O'Donnell committed
                alleles,
                chunksize=1)
Tim O'Donnell's avatar
Tim O'Donnell committed
        for result in tqdm.tqdm(results, total=len(alleles)):
            predictor.allele_to_percent_rank_transform.update(result)
        print("Done calibrating %d additional alleles." % len(alleles))
Tim O'Donnell's avatar
Tim O'Donnell committed
        predictor.save(args.out_models_dir, model_names_to_write=[])

Tim O'Donnell's avatar
Tim O'Donnell committed
    percent_rank_calibration_time = time.time() - start

    if worker_pool:
        worker_pool.close()
        worker_pool.join()

Tim O'Donnell's avatar
Tim O'Donnell committed
    print("Train time: %0.2f min. Percent rank calibration time: %0.2f min." % (
        training_time / 60.0, percent_rank_calibration_time / 60.0))
Tim O'Donnell's avatar
Tim O'Donnell committed
    print("Predictor written to: %s" % args.out_models_dir)

Tim O'Donnell's avatar
Tim O'Donnell committed

def train_model_entrypoint(item):
    return train_model(**item)
        model_group,
        n_models,
        allele_num,
        n_alleles,
        hyperparameter_set_num,
        num_hyperparameter_sets,
        allele,
        verbose,
Tim O'Donnell's avatar
Tim O'Donnell committed
        progress_print_interval,
        predictor,
        save_to):

    if predictor is None:
        predictor = Class1AffinityPredictor()

    if data is None:
        full_data = GLOBAL_DATA["train_data"]
        data = full_data.loc[full_data.allele == allele]

    subset = hyperparameters.get("train_data", {}).get("subset", "all")
    if subset == "quantitative":
        data = data.loc[
            data.measurement_type == "quantitative"
        ]
    elif subset == "all":
        pass
    else:
        raise ValueError("Unsupported subset: %s" % subset)

    hyperparameters.setdefault("train_data", {})["num_points"] = len(data)

    progress_preamble = (
        "[%4d / %4d alleles] "
        "[%2d / %2d replicates]: %s " % (
            hyperparameter_set_num + 1,
            num_hyperparameter_sets,
            allele_num + 1,
            n_alleles,
            model_group + 1,
            n_models,
    train_data = data.sample(frac=1.0)
Tim O'Donnell's avatar
Tim O'Donnell committed
    (model,) = predictor.fit_allele_specific_predictors(
        architecture_hyperparameters_list=[hyperparameters],
        allele=allele,
        peptides=train_data.peptide.values,
        affinities=train_data.measurement_value.values,
        inequalities=(
            train_data.measurement_inequality.values
            if "measurement_inequality" in train_data.columns else None),
        models_dir_for_save=save_to,
        progress_preamble=progress_preamble,
Tim O'Donnell's avatar
Tim O'Donnell committed
        progress_print_interval=progress_print_interval,
        verbose=verbose)
Tim O'Donnell's avatar
Tim O'Donnell committed
    if allele_num == 0 and model_group == 0:
        # For the first model for the first allele, print the architecture.
Tim O'Donnell's avatar
Tim O'Donnell committed
        print("*** HYPERPARAMETER SET %d***" %
              (hyperparameter_set_num + 1))
        pprint(hyperparameters)
Tim O'Donnell's avatar
Tim O'Donnell committed
        print("*** ARCHITECTURE FOR HYPERPARAMETER SET %d***" %
              (hyperparameter_set_num + 1))
        model.network(borrow=True).summary()

    return (hyperparameter_set_num, predictor)
def calibrate_percentile_ranks(allele, predictor, peptides=None):
    """
    Private helper function.
    """
    global GLOBAL_DATA
    if peptides is None:
        peptides = GLOBAL_DATA["calibration_peptides"]
    predictor.calibrate_percentile_ranks(
        peptides=peptides,
        alleles=[allele])
    return {
        allele: predictor.allele_to_percent_rank_transform[allele],
    }


def worker_init_entrypoint(arg_queue):
    if arg_queue:
Tim O'Donnell's avatar
Tim O'Donnell committed
        kwargs = arg_queue.get()
    else:
        kwargs = {}
Tim O'Donnell's avatar
Tim O'Donnell committed
    worker_init(**kwargs)
def worker_init(keras_backend=None, gpu_device_nums=None):
    if keras_backend or gpu_device_nums:
        print("WORKER pid=%d assigned GPU devices: %s" % (
Tim O'Donnell's avatar
Tim O'Donnell committed
            os.getpid(), gpu_device_nums))
        set_keras_backend(
            keras_backend, gpu_device_nums=gpu_device_nums)
Tim O'Donnell's avatar
Tim O'Donnell committed
if __name__ == '__main__':
    run()