From 5b4dfbd205e4c26c6f39c983cd1125038e5bc7d6 Mon Sep 17 00:00:00 2001 From: Tim O'Donnell <timodonnell@gmail.com> Date: Tue, 28 Nov 2017 13:25:49 -0500 Subject: [PATCH] Experimental support for parallelization (Theano only) in train_allele_specific_models_command --- .../train_allele_specific_models_command.py | 128 ++++++++++++++---- ...st_train_allele_specific_models_command.py | 10 +- 2 files changed, 112 insertions(+), 26 deletions(-) diff --git a/mhcflurry/class1_affinity_prediction/train_allele_specific_models_command.py b/mhcflurry/class1_affinity_prediction/train_allele_specific_models_command.py index 5cf79dec..f71147e0 100644 --- a/mhcflurry/class1_affinity_prediction/train_allele_specific_models_command.py +++ b/mhcflurry/class1_affinity_prediction/train_allele_specific_models_command.py @@ -9,8 +9,10 @@ import yaml import time import signal import traceback +from multiprocessing import Pool import pandas +from mhcnames import normalize_allele_name from .class1_affinity_predictor import Class1AffinityPredictor from ..common import configure_logging @@ -78,7 +80,14 @@ parser.add_argument( type=int, help="Keras verbosity. Default: %(default)s", default=1) - +parser.add_argument( + "--parallelization-num-jobs", + default=1, + type=int, + metavar="N", + help="Parallelization jobs. Experimental. Does NOT work with tensorflow. " + "Set to 1 for serial run. Set to 0 to use number of cores. " + "Default: %(default)s.") def run(argv=sys.argv[1:]): # On sigusr1 print stack trace @@ -110,7 +119,9 @@ def run(argv=sys.argv[1:]): allele_counts = df.allele.value_counts() if args.allele: - alleles = args.allele + alleles = [normalize_allele_name(a) for a in args.allele] + + # Allele names in data are assumed to be already normalized. df = df.ix[df.allele.isin(alleles)] else: alleles = list(allele_counts.ix[ @@ -121,6 +132,15 @@ def run(argv=sys.argv[1:]): print("Training data: %s" % (str(df.shape))) predictor = Class1AffinityPredictor() + if args.parallelization_num_jobs == 1: + # Serial run + worker_pool = None + else: + worker_pool = Pool( + processes=( + args.parallelization_num_jobs + if args.parallelization_num_jobs else None)) + print("Using worker pool: %s" % str(worker_pool)) if args.out_models_dir and not os.path.exists(args.out_models_dir): print("Attempting to create directory: %s" % args.out_models_dir) @@ -139,29 +159,42 @@ def run(argv=sys.argv[1:]): if args.max_epochs: hyperparameters['max_epochs'] = args.max_epochs - for model_group in range(n_models): - for (i, allele) in enumerate(alleles): - print( - "[%2d / %2d hyperparameters] " - "[%2d / %2d replicates] " - "[%4d / %4d alleles]: %s" % ( - h + 1, - len(hyperparameters_lst), - model_group + 1, - n_models, - i + 1, - len(alleles), allele)) - - train_data = df.ix[df.allele == allele].dropna().sample( - frac=1.0) - - predictor.fit_allele_specific_predictors( - n_models=1, - architecture_hyperparameters=hyperparameters, - allele=allele, - peptides=train_data.peptide.values, - affinities=train_data.measurement_value.values, - models_dir_for_save=args.out_models_dir) + work_items = [] + for (i, (allele, sub_df)) in enumerate(df.groupby("allele")): + for model_group in range(n_models): + work_dict = { + 'model_group': model_group, + 'n_models': n_models, + 'allele_num': i, + 'n_alleles': len(alleles), + 'hyperparameter_set_num': h, + 'num_hyperparameter_sets': len(hyperparameters_lst), + 'allele': allele, + 'sub_df': sub_df, + 'hyperparameters': hyperparameters, + 'predictor': predictor if not worker_pool else None, + 'save_to': args.out_models_dir if not worker_pool else None, + } + work_items.append(work_dict) + + if worker_pool: + print("Processing %d work items in parallel." % len(work_items)) + predictors = worker_pool.map(work_entrypoint, work_items, chunksize=1) + print("Merging %d predictors fit in parallel." % (len(predictors))) + predictor = Class1AffinityPredictor.merge([predictor] + predictors) + print("Saving merged predictor to: %s" % args.out_models_dir) + predictor.save(args.out_models_dir) + else: + # Run in serial. In this case, every worker is passed the same predictor, + # which it adds models to, so no merging is required. It also saves + # as it goes so no saving is required at the end. + for item in work_items: + work_predictor = work_entrypoint(item) + assert work_predictor is predictor + + if worker_pool: + worker_pool.close() + worker_pool.join() if args.percent_rank_calibration_num_peptides_per_length > 0: start = time.time() @@ -173,5 +206,50 @@ def run(argv=sys.argv[1:]): predictor.save(args.out_models_dir, model_names_to_write=[]) +def work_entrypoint(item): + return process_work(**item) + + +def process_work( + model_group, + n_models, + allele_num, + n_alleles, + hyperparameter_set_num, + num_hyperparameter_sets, + allele, + sub_df, + hyperparameters, + predictor, + save_to): + + if predictor is None: + predictor = Class1AffinityPredictor() + + print( + "[%2d / %2d hyperparameters] " + "[%2d / %2d replicates] " + "[%4d / %4d alleles]: %s" % ( + hyperparameter_set_num + 1, + num_hyperparameter_sets, + model_group + 1, + n_models, + allele_num + 1, + n_alleles, + allele)) + + train_data = sub_df.dropna().sample(frac=1.0) + predictor.fit_allele_specific_predictors( + n_models=1, + architecture_hyperparameters=hyperparameters, + allele=allele, + peptides=train_data.peptide.values, + affinities=train_data.measurement_value.values, + models_dir_for_save=save_to) + + return predictor + + + if __name__ == '__main__': run() diff --git a/test/test_train_allele_specific_models_command.py b/test/test_train_allele_specific_models_command.py index 19cde4f2..a556cc9a 100644 --- a/test/test_train_allele_specific_models_command.py +++ b/test/test_train_allele_specific_models_command.py @@ -50,7 +50,7 @@ HYPERPARAMETERS = [ ] -def test_run(): +def run_and_check(n_jobs=0): models_dir = tempfile.mkdtemp(prefix="mhcflurry-test-models") hyperparameters_filename = os.path.join( models_dir, "hyperparameters.yaml") @@ -63,6 +63,7 @@ def test_run(): "--allele", "HLA-A*02:01", "HLA-A*01:01", "HLA-A*03:01", "--out-models-dir", models_dir, "--percent-rank-calibration-num-peptides-per-length", "10000", + "--parallelization-num-jobs", str(n_jobs), ] print("Running with args: %s" % args) train_allele_specific_models_command.run(args) @@ -81,3 +82,10 @@ def test_run(): print("Deleting: %s" % models_dir) shutil.rmtree(models_dir) + +def Xtest_run_parallel(): + run_and_check(n_jobs=3) + + +def test_run_serial(): + run_and_check(n_jobs=1) \ No newline at end of file -- GitLab