Skip to content
Snippets Groups Projects
Commit 5b4dfbd2 authored by Tim O'Donnell's avatar Tim O'Donnell
Browse files

Experimental support for parallelization (Theano only) in train_allele_specific_models_command

parent c6990666
No related merge requests found
......@@ -9,8 +9,10 @@ import yaml
import time
import signal
import traceback
from multiprocessing import Pool
import pandas
from mhcnames import normalize_allele_name
from .class1_affinity_predictor import Class1AffinityPredictor
from ..common import configure_logging
......@@ -78,7 +80,14 @@ parser.add_argument(
type=int,
help="Keras verbosity. Default: %(default)s",
default=1)
parser.add_argument(
"--parallelization-num-jobs",
default=1,
type=int,
metavar="N",
help="Parallelization jobs. Experimental. Does NOT work with tensorflow. "
"Set to 1 for serial run. Set to 0 to use number of cores. "
"Default: %(default)s.")
def run(argv=sys.argv[1:]):
# On sigusr1 print stack trace
......@@ -110,7 +119,9 @@ def run(argv=sys.argv[1:]):
allele_counts = df.allele.value_counts()
if args.allele:
alleles = args.allele
alleles = [normalize_allele_name(a) for a in args.allele]
# Allele names in data are assumed to be already normalized.
df = df.ix[df.allele.isin(alleles)]
else:
alleles = list(allele_counts.ix[
......@@ -121,6 +132,15 @@ def run(argv=sys.argv[1:]):
print("Training data: %s" % (str(df.shape)))
predictor = Class1AffinityPredictor()
if args.parallelization_num_jobs == 1:
# Serial run
worker_pool = None
else:
worker_pool = Pool(
processes=(
args.parallelization_num_jobs
if args.parallelization_num_jobs else None))
print("Using worker pool: %s" % str(worker_pool))
if args.out_models_dir and not os.path.exists(args.out_models_dir):
print("Attempting to create directory: %s" % args.out_models_dir)
......@@ -139,29 +159,42 @@ def run(argv=sys.argv[1:]):
if args.max_epochs:
hyperparameters['max_epochs'] = args.max_epochs
for model_group in range(n_models):
for (i, allele) in enumerate(alleles):
print(
"[%2d / %2d hyperparameters] "
"[%2d / %2d replicates] "
"[%4d / %4d alleles]: %s" % (
h + 1,
len(hyperparameters_lst),
model_group + 1,
n_models,
i + 1,
len(alleles), allele))
train_data = df.ix[df.allele == allele].dropna().sample(
frac=1.0)
predictor.fit_allele_specific_predictors(
n_models=1,
architecture_hyperparameters=hyperparameters,
allele=allele,
peptides=train_data.peptide.values,
affinities=train_data.measurement_value.values,
models_dir_for_save=args.out_models_dir)
work_items = []
for (i, (allele, sub_df)) in enumerate(df.groupby("allele")):
for model_group in range(n_models):
work_dict = {
'model_group': model_group,
'n_models': n_models,
'allele_num': i,
'n_alleles': len(alleles),
'hyperparameter_set_num': h,
'num_hyperparameter_sets': len(hyperparameters_lst),
'allele': allele,
'sub_df': sub_df,
'hyperparameters': hyperparameters,
'predictor': predictor if not worker_pool else None,
'save_to': args.out_models_dir if not worker_pool else None,
}
work_items.append(work_dict)
if worker_pool:
print("Processing %d work items in parallel." % len(work_items))
predictors = worker_pool.map(work_entrypoint, work_items, chunksize=1)
print("Merging %d predictors fit in parallel." % (len(predictors)))
predictor = Class1AffinityPredictor.merge([predictor] + predictors)
print("Saving merged predictor to: %s" % args.out_models_dir)
predictor.save(args.out_models_dir)
else:
# Run in serial. In this case, every worker is passed the same predictor,
# which it adds models to, so no merging is required. It also saves
# as it goes so no saving is required at the end.
for item in work_items:
work_predictor = work_entrypoint(item)
assert work_predictor is predictor
if worker_pool:
worker_pool.close()
worker_pool.join()
if args.percent_rank_calibration_num_peptides_per_length > 0:
start = time.time()
......@@ -173,5 +206,50 @@ def run(argv=sys.argv[1:]):
predictor.save(args.out_models_dir, model_names_to_write=[])
def work_entrypoint(item):
return process_work(**item)
def process_work(
model_group,
n_models,
allele_num,
n_alleles,
hyperparameter_set_num,
num_hyperparameter_sets,
allele,
sub_df,
hyperparameters,
predictor,
save_to):
if predictor is None:
predictor = Class1AffinityPredictor()
print(
"[%2d / %2d hyperparameters] "
"[%2d / %2d replicates] "
"[%4d / %4d alleles]: %s" % (
hyperparameter_set_num + 1,
num_hyperparameter_sets,
model_group + 1,
n_models,
allele_num + 1,
n_alleles,
allele))
train_data = sub_df.dropna().sample(frac=1.0)
predictor.fit_allele_specific_predictors(
n_models=1,
architecture_hyperparameters=hyperparameters,
allele=allele,
peptides=train_data.peptide.values,
affinities=train_data.measurement_value.values,
models_dir_for_save=save_to)
return predictor
if __name__ == '__main__':
run()
......@@ -50,7 +50,7 @@ HYPERPARAMETERS = [
]
def test_run():
def run_and_check(n_jobs=0):
models_dir = tempfile.mkdtemp(prefix="mhcflurry-test-models")
hyperparameters_filename = os.path.join(
models_dir, "hyperparameters.yaml")
......@@ -63,6 +63,7 @@ def test_run():
"--allele", "HLA-A*02:01", "HLA-A*01:01", "HLA-A*03:01",
"--out-models-dir", models_dir,
"--percent-rank-calibration-num-peptides-per-length", "10000",
"--parallelization-num-jobs", str(n_jobs),
]
print("Running with args: %s" % args)
train_allele_specific_models_command.run(args)
......@@ -81,3 +82,10 @@ def test_run():
print("Deleting: %s" % models_dir)
shutil.rmtree(models_dir)
def Xtest_run_parallel():
run_and_check(n_jobs=3)
def test_run_serial():
run_and_check(n_jobs=1)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment