Newer
Older
Tim O'Donnell
committed
import traceback
from functools import partial
Tim O'Donnell
committed
from mhcnames import normalize_allele_name
import tqdm # progress bar
tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481
from .class1_affinity_predictor import Class1AffinityPredictor
from .common import configure_logging, set_keras_backend
from .parallelism import make_worker_pool, cpu_count
# To avoid pickling large matrices to send to child processes when running in
# parallel, we use this global variable as a place to store data. Data that is
# stored here before creating the thread pool will be inherited to the child
# processes upon fork() call, allowing us to share large data with the workers
GLOBAL_DATA = {}
# Note on parallelization:
# It seems essential currently (tensorflow==1.4.1) that no processes are forked
# after tensorflow has been used at all, which includes merely importing
# keras.backend. So we must make sure not to use tensorflow in the main process
# if we are running in parallel.
parser = argparse.ArgumentParser(usage=__doc__)
parser.add_argument(
help=(
"Training data CSV. Expected columns: "
"allele, peptide, measurement_value"))
help="Directory to write models and manifest")
parser.add_argument(
"--hyperparameters",
help="JSON or YAML of hyperparameters")
parser.add_argument(
"--allele",
default=None,
nargs="+",
help="Alleles to train models for. If not specified, all alleles with "
"enough measurements will be used.")
parser.add_argument(
"--min-measurements-per-allele",
type=int,
help="Train models for alleles with >=N measurements.")
parser.add_argument(
"--ignore-inequalities",
action="store_true",
default=False,
help="Do not use affinity value inequalities even when present in data")
parser.add_argument(
"--percent-rank-calibration-num-peptides-per-length",
type=int,
default=int(1e5),
help="Number of peptides per length to use to calibrate percent ranks. "
"Set to 0 to disable percent rank calibration. The resulting models will "
parser.add_argument(
"--n-models",
type=int,
metavar="N",
help="Ensemble size, i.e. how many models to train for each architecture. "
"If specified here it overrides any 'n_models' specified in the "
"hyperparameters.")
parser.add_argument(
"--max-epochs",
type=int,
metavar="N",
help="Max training epochs. If specified here it overrides any 'max_epochs' "
"specified in the hyperparameters.")
Tim O'Donnell
committed
parser.add_argument(
default=[1],
Tim O'Donnell
committed
type=int,
metavar="N",
nargs="+",
help="Number of processes to parallelize training and percent rank "
"calibration over, respectively. Experimental. If only one value is specified "
"then the same number of jobs is used for both phases."
"Set to 1 for serial run. Set to 0 to use number of cores. Default: %(default)s.")
parser.add_argument(
"--backend",
choices=("tensorflow-gpu", "tensorflow-cpu", "tensorflow-default"),
help="Keras backend. If not specified will use system default.")
type=int,
metavar="N",
help="Number of GPUs to attempt to parallelize across. Requires running "
"in parallel.")
parser.add_argument(
"--max-workers-per-gpu",
type=int,
metavar="N",
default=1000,
help="Maximum number of workers to assign to a GPU. Additional tasks will "
"run on CPU.")
parser.add_argument(
"--save-interval",
type=float,
metavar="N",
default=60,
help="Write models to disk every N seconds. Only affects parallel runs; "
"serial runs write each model to disk as it is trained.")
parser.add_argument(
"--max-tasks-per-worker",
type=int,
metavar="N",
default=None,
help="Restart workers after N tasks. Workaround for tensorflow memory "
"leaks. Requires Python >=3.2.")
global GLOBAL_DATA
Tim O'Donnell
committed
# On sigusr1 print stack trace
print("To show stack trace, run:\nkill -s USR1 %d" % os.getpid())
signal.signal(signal.SIGUSR1, lambda sig, frame: traceback.print_stack())
args.out_models_dir = os.path.abspath(args.out_models_dir)
hyperparameters_lst = yaml.load(open(args.hyperparameters))
assert isinstance(hyperparameters_lst, list)
print("Loaded hyperparameters list: %s" % str(hyperparameters_lst))
df = pandas.read_csv(args.data)
print("Loaded training data: %s" % (str(df.shape)))
df = df.ix[
(df.peptide.str.len() >= 8) & (df.peptide.str.len() <= 15)
]
print("Subselected to 8-15mers: %s" % (str(df.shape)))
if args.ignore_inequalities and "measurement_inequality" in df.columns:
print("Dropping measurement_inequality column")
del df["measurement_inequality"]
# Allele counts are in terms of quantitative data only.
allele_counts = (
df.loc[df.measurement_type == "quantitative"].allele.value_counts())
if args.allele:
Tim O'Donnell
committed
alleles = [normalize_allele_name(a) for a in args.allele]
else:
alleles = list(allele_counts.ix[
allele_counts > args.min_measurements_per_allele
].index)
# Allele names in data are assumed to be already normalized.
df = df.loc[df.allele.isin(alleles)].dropna()
print("Selected %d alleles: %s" % (len(alleles), ' '.join(alleles)))
print("Training data: %s" % (str(df.shape)))
if args.num_jobs[0] == 1:
Tim O'Donnell
committed
worker_pool = None
if args.backend:
set_keras_backend(args.backend)
Tim O'Donnell
committed
else:
num_workers = args.num_jobs[0] if args.num_jobs[0] else cpu_count()
print("Attempting to round-robin assign each worker a GPU.")
if args.backend != "tensorflow-default":
print("Forcing keras backend to be tensorflow-default")
args.backend = "tensorflow-default"
gpu_assignments_remaining = dict((
(gpu, args.max_workers_per_gpu) for gpu in range(args.gpus)
))
for worker_num in range(num_workers):
if gpu_assignments_remaining:
# Use a GPU
gpu_num = sorted(
gpu_assignments_remaining,
key=lambda key: gpu_assignments_remaining[key])[0]
gpu_assignments_remaining[gpu_num] -= 1
if not gpu_assignments_remaining[gpu_num]:
del gpu_assignments_remaining[gpu_num]
else:
# Use CPU
gpu_assignment = []
'gpu_device_nums': gpu_assignment,
'keras_backend': args.backend
})
print("Worker %d assigned GPUs: %s" % (
worker_num, gpu_assignment))
worker_pool = make_worker_pool(
processes=num_workers,
initializer=worker_init,
initializer_kwargs_per_process=worker_init_kwargs,
max_tasks_per_worker=args.max_tasks_per_worker)
if not os.path.exists(args.out_models_dir):
print("Attempting to create directory: %s" % args.out_models_dir)
os.mkdir(args.out_models_dir)
print("Done.")
for (h, hyperparameters) in enumerate(hyperparameters_lst):
n_models = None
if 'n_models' in hyperparameters:
n_models = hyperparameters.pop("n_models")
if args.n_models:
n_models = args.n_models
if not n_models:
raise ValueError("Specify --ensemble-size or n_models hyperparameter")
if args.max_epochs:
hyperparameters['max_epochs'] = args.max_epochs
for (i, allele) in enumerate(df.allele.unique()):
for model_num in range(n_models):
work_dict = {
'n_models': 1,
'allele_num': i,
'n_alleles': len(alleles),
'hyperparameter_set_num': h,
'num_hyperparameter_sets': len(hyperparameters_lst),
'allele': allele,
'data': None, # subselect from GLOBAL_DATA["train_data"]
'hyperparameters': hyperparameters,
'verbose': args.verbosity,
'progress_print_interval': None if worker_pool else 5.0,
'predictor': predictor if not worker_pool else None,
'save_to': args.out_models_dir if not worker_pool else None,
}
work_items.append(work_dict)
Tim O'Donnell
committed
if worker_pool:
print("Processing %d work items in parallel." % len(work_items))
# The estimated time to completion is more accurate if we randomize
# the order of the work.
random.shuffle(work_items)
results_generator = worker_pool.imap_unordered(
train_model_entry_point, work_items, chunksize=1)
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
unsaved_predictors = []
last_save_time = time.time()
for new_predictor in tqdm.tqdm(results_generator, total=len(work_items)):
unsaved_predictors.append(new_predictor)
if time.time() > last_save_time + args.save_interval:
# Save current predictor.
save_start = time.time()
new_model_names = predictor.merge_in_place(unsaved_predictors)
predictor.save(
args.out_models_dir, model_names_to_write=new_model_names)
print(
"Saved predictor (%d models total) including %d new models "
"in %0.2f sec to %s" % (
len(predictor.neural_networks),
len(new_model_names),
time.time() - save_start,
args.out_models_dir))
unsaved_predictors.clear()
last_save_time = time.time()
print("Saving final predictor to: %s" % args.out_models_dir)
predictor.merge_in_place(unsaved_predictors)
predictor.save(args.out_models_dir) # write all models just to be sure
print("Done.")
else:
# Run in serial. In this case, every worker is passed the same predictor,
# which it adds models to, so no merging is required. It also saves
# as it goes so no saving is required at the end.
for _ in tqdm.trange(len(work_items)):
item = work_items.pop(0) # want to keep freeing up memory
work_predictor = train_model_entry_point(item)
assert work_predictor is predictor
assert not work_items
Tim O'Donnell
committed
print("Trained affinity predictor with %d networks in %0.2f min." % (
len(predictor.neural_networks), training_time / 60.0))
if args.percent_rank_calibration_num_peptides_per_length > 0:
alleles = list(predictor.supported_alleles)
print("Performing percent rank calibration. Encoding peptides.")
encoded_peptides = predictor.calibrate_percentile_ranks(
alleles=[], # don't actually do any calibration, just return peptides
num_peptides_per_length=args.percent_rank_calibration_num_peptides_per_length)
# Now we encode the peptides for each neural network, so the encoding
# becomes cached.
for network in predictor.neural_networks:
network.peptides_to_network_input(encoded_peptides)
assert encoded_peptides.encoding_cache # must have cached the encoding
print("Finished encoding peptides for percent ranks in %0.2f sec." % (
print("Calibrating percent rank calibration for %d alleles." % len(alleles))
if args.num_jobs[-1] == 1:
results = (
calibrate_percentile_ranks(
allele=allele,
peptides=encoded_peptides)
for allele in alleles)
# Parallel run
# Store peptides in global variable so they are in shared memory
# after fork, instead of needing to be pickled.
GLOBAL_DATA["calibration_peptides"] = encoded_peptides
args.num_jobs[-1]
if args.num_jobs[-1] else None),
max_tasks_per_worker=args.max_tasks_per_worker)
results = worker_pool.imap_unordered(
partial(
calibrate_percentile_ranks,
for result in tqdm.tqdm(results, total=len(alleles)):
predictor.allele_to_percent_rank_transform.update(result)
print("Done calibrating %d additional alleles." % len(alleles))
predictor.save(args.out_models_dir, model_names_to_write=[])
if worker_pool:
worker_pool.close()
worker_pool.join()
print("Train time: %0.2f min. Percent rank calibration time: %0.2f min." % (
training_time / 60.0, percent_rank_calibration_time / 60.0))
print("Predictor written to: %s" % args.out_models_dir)
return train_model(**item)
Tim O'Donnell
committed
def train_model(
Tim O'Donnell
committed
n_models,
allele_num,
n_alleles,
hyperparameter_set_num,
num_hyperparameter_sets,
allele,
Tim O'Donnell
committed
hyperparameters,
Tim O'Donnell
committed
predictor,
save_to):
if predictor is None:
predictor = Class1AffinityPredictor()
if data is None:
full_data = GLOBAL_DATA["train_data"]
data = full_data.loc[full_data.allele == allele]
subset = hyperparameters.get("train_data", {}).get("subset", "all")
if subset == "quantitative":
data = data.loc[
data.measurement_type == "quantitative"
]
elif subset == "all":
pass
else:
raise ValueError("Unsupported subset: %s" % subset)
Tim O'Donnell
committed
"[%2d / %2d hyperparameters] "
"[%4d / %4d alleles] %s " % (
Tim O'Donnell
committed
hyperparameter_set_num + 1,
num_hyperparameter_sets,
allele_num + 1,
n_alleles,
allele))
train_data = data.sample(frac=1.0)
predictor.fit_allele_specific_predictors(
n_models=n_models,
architecture_hyperparameters_list=[hyperparameters],
Tim O'Donnell
committed
allele=allele,
peptides=train_data.peptide.values,
affinities=train_data.measurement_value.values,
inequalities=(
train_data.measurement_inequality.values
if "measurement_inequality" in train_data.columns else None),
models_dir_for_save=save_to,
progress_preamble=progress_preamble,
Tim O'Donnell
committed
return predictor
Tim O'Donnell
committed
def calibrate_percentile_ranks(allele, predictor, peptides=None):
"""
Private helper function.
"""
if peptides is None:
peptides = GLOBAL_DATA["calibration_peptides"]
predictor.calibrate_percentile_ranks(
peptides=peptides,
alleles=[allele])
return {
allele: predictor.allele_to_percent_rank_transform[allele],
}
def worker_init(keras_backend=None, gpu_device_nums=None):
# Each worker needs distinct random numbers
numpy.random.seed()
random.seed()
if keras_backend or gpu_device_nums:
print("WORKER pid=%d assigned GPU devices: %s" % (
set_keras_backend(
keras_backend, gpu_device_nums=gpu_device_nums)