"downloads-generation/models_class1_allele_specific_single" did not exist on "925d42fa7cd17ed98b7a6304461e3ab423c0abe5"
Newer
Older
Tim O'Donnell
committed
import traceback
import itertools
from multiprocessing import Pool, Queue
from functools import partial
from pprint import pprint
Tim O'Donnell
committed
from mhcnames import normalize_allele_name
import tqdm # progress bar
tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481
from .class1_affinity_predictor import Class1AffinityPredictor
from .common import configure_logging, set_keras_backend
# To avoid pickling large matrices to send to child processes when running in
# parallel, we use this global variable as a place to store data. Data that is
# stored here before creating the thread pool will be inherited to the child
# processes upon fork() call, allowing us to share large data with the workers
GLOBAL_DATA = {}
# Note on parallelization:
# It seems essential currently (tensorflow==1.4.1) that no processes are forked
# after tensorflow has been used at all, which includes merely importing
# keras.backend. So we must make sure not to use tensorflow in the main process
# if we are running in parallel.
parser = argparse.ArgumentParser(usage=__doc__)
parser.add_argument(
help=(
"Training data CSV. Expected columns: "
"allele, peptide, measurement_value"))
help="Directory to write models and manifest")
parser.add_argument(
"--hyperparameters",
help="JSON or YAML of hyperparameters")
parser.add_argument(
"--allele",
default=None,
nargs="+",
help="Alleles to train models for. If not specified, all alleles with "
"enough measurements will be used.")
parser.add_argument(
"--min-measurements-per-allele",
type=int,
help="Train models for alleles with >=N measurements.")
parser.add_argument(
"--only-quantitative",
action="store_true",
default=False,
help="Use only quantitative training data")
parser.add_argument(
"--ignore-inequalities",
action="store_true",
default=False,
help="Do not use affinity value inequalities even when present in data")
parser.add_argument(
"--percent-rank-calibration-num-peptides-per-length",
type=int,
default=int(1e5),
help="Number of peptides per length to use to calibrate percent ranks. "
"Set to 0 to disable percent rank calibration. The resulting models will "
parser.add_argument(
"--n-models",
type=int,
metavar="N",
help="Ensemble size, i.e. how many models to train for each architecture. "
"If specified here it overrides any 'n_models' specified in the "
"hyperparameters.")
parser.add_argument(
"--max-epochs",
type=int,
metavar="N",
help="Max training epochs. If specified here it overrides any 'max_epochs' "
"specified in the hyperparameters.")
"--verbosity",
Tim O'Donnell
committed
parser.add_argument(
default=[1],
Tim O'Donnell
committed
type=int,
metavar="N",
nargs="+",
help="Number of processes to parallelize training and percent rank "
"calibration over, respectively. Experimental. If only one value is specified "
"then the same number of jobs is used for both phases."
"Set to 1 for serial run. Set to 0 to use number of cores. Default: %(default)s.")
parser.add_argument(
"--backend",
choices=("tensorflow-gpu", "tensorflow-cpu", "tensorflow-default"),
help="Keras backend. If not specified will use system default.")
type=int,
metavar="N",
help="Number of GPUs to attempt to parallelize across. Requires running "
"in parallel.")
global GLOBAL_DATA
Tim O'Donnell
committed
# On sigusr1 print stack trace
print("To show stack trace, run:\nkill -s USR1 %d" % os.getpid())
signal.signal(signal.SIGUSR1, lambda sig, frame: traceback.print_stack())
hyperparameters_lst = yaml.load(open(args.hyperparameters))
assert isinstance(hyperparameters_lst, list)
print("Loaded hyperparameters list: %s" % str(hyperparameters_lst))
df = pandas.read_csv(args.data)
print("Loaded training data: %s" % (str(df.shape)))
df = df.ix[
(df.peptide.str.len() >= 8) & (df.peptide.str.len() <= 15)
]
print("Subselected to 8-15mers: %s" % (str(df.shape)))
if args.only_quantitative:
df = df.loc[
df.measurement_type == "quantitative"
]
print("Subselected to quantitative: %s" % (str(df.shape)))
if args.ignore_inequalities and "measurement_inequality" in df.columns:
print("Dropping measurement_inequality column")
del df["measurement_inequality"]
if args.allele:
Tim O'Donnell
committed
alleles = [normalize_allele_name(a) for a in args.allele]
else:
alleles = list(allele_counts.ix[
allele_counts > args.min_measurements_per_allele
].index)
# Allele names in data are assumed to be already normalized.
df = df.loc[df.allele.isin(alleles)].dropna()
print("Selected %d alleles: %s" % (len(alleles), ' '.join(alleles)))
print("Training data: %s" % (str(df.shape)))
if args.num_jobs[0] == 1:
Tim O'Donnell
committed
worker_pool = None
if args.backend:
set_keras_backend(args.backend)
Tim O'Donnell
committed
else:
print("Attempting to round-robin assign each worker a GPU.")
# We assign each worker to a GPU using the CUDA_VISIBLE_DEVICES
# environment variable. To do this, we push environment variables
# onto a queue. Each worker reads a single item from the queue,
# which is a list of environment variables to set.
Tim O'Donnell
committed
worker_pool = Pool(
Tim O'Donnell
committed
processes=(
args.num_jobs[0]
if args.num_jobs[0] else None))
Tim O'Donnell
committed
print("Using worker pool: %s" % str(worker_pool))
if args.out_models_dir and not os.path.exists(args.out_models_dir):
print("Attempting to create directory: %s" % args.out_models_dir)
os.mkdir(args.out_models_dir)
print("Done.")
for (h, hyperparameters) in enumerate(hyperparameters_lst):
n_models = None
if 'n_models' in hyperparameters:
n_models = hyperparameters.pop("n_models")
if args.n_models:
n_models = args.n_models
if not n_models:
raise ValueError("Specify --ensemble-size or n_models hyperparameter")
if args.max_epochs:
hyperparameters['max_epochs'] = args.max_epochs
for (i, allele) in enumerate(df.allele.unique()):
Tim O'Donnell
committed
for model_group in range(n_models):
work_dict = {
'model_group': model_group,
'n_models': n_models,
'allele_num': i,
'n_alleles': len(alleles),
'hyperparameter_set_num': h,
'num_hyperparameter_sets': len(hyperparameters_lst),
'allele': allele,
'data': None, # subselect from GLOBAL_DATA["train_data"]
Tim O'Donnell
committed
'hyperparameters': hyperparameters,
'progress_print_interval': None if worker_pool else 5.0,
Tim O'Donnell
committed
'predictor': predictor if not worker_pool else None,
'save_to': args.out_models_dir if not worker_pool else None,
}
work_items.append(work_dict)
if worker_pool:
print("Processing %d work items in parallel." % len(work_items))
# We sort here so the predictors are in order of hyperparameter set num.
# This is convenient so that the neural networks get merged for each
# allele in the same order.
predictors = [
predictor for (_, predictor)
in sorted(
tqdm.tqdm(
worker_pool.imap_unordered(
train_model_entrypoint, work_items, chunksize=1),
]
print("Merging %d predictors fit in parallel." % (len(predictors)))
predictor = Class1AffinityPredictor.merge([predictor] + predictors)
print("Saving merged predictor to: %s" % args.out_models_dir)
predictor.save(args.out_models_dir)
else:
# Run in serial. In this case, every worker is passed the same predictor,
# which it adds models to, so no merging is required. It also saves
# as it goes so no saving is required at the end.
for _ in tqdm.trange(len(work_items)):
item = work_items.pop(0) # want to keep freeing up memory
(_, work_predictor) = train_model_entrypoint(item)
assert work_predictor is predictor
assert not work_items
Tim O'Donnell
committed
print("Trained affinity predictor with %d networks in %0.2f min." % (
len(predictor.neural_networks), training_time / 60.0))
if args.percent_rank_calibration_num_peptides_per_length > 0:
alleles = list(predictor.supported_alleles)
print("Performing percent rank calibration. Encoding peptides.")
encoded_peptides = predictor.calibrate_percentile_ranks(
alleles=[], # don't actually do any calibration, just return peptides
num_peptides_per_length=args.percent_rank_calibration_num_peptides_per_length)
# Now we encode the peptides for each neural network, so the encoding
# becomes cached.
for network in predictor.neural_networks:
network.peptides_to_network_input(encoded_peptides)
assert encoded_peptides.encoding_cache # must have cached the encoding
print("Finished encoding peptides for percent ranks in %0.2f sec." % (
print("Calibrating percent rank calibration for %d alleles." % len(alleles))
if args.num_jobs[-1] == 1:
results = (
calibrate_percentile_ranks(
allele=allele,
peptides=encoded_peptides)
for allele in alleles)
# Parallel run
# Store peptides in global variable so they are in shared memory
# after fork, instead of needing to be pickled.
GLOBAL_DATA["calibration_peptides"] = encoded_peptides
args.num_jobs[-1]
if args.num_jobs[-1] else None))
results = worker_pool.imap_unordered(
partial(
calibrate_percentile_ranks,
for result in tqdm.tqdm(results, total=len(alleles)):
predictor.allele_to_percent_rank_transform.update(result)
print("Done calibrating %d additional alleles." % len(alleles))
predictor.save(args.out_models_dir, model_names_to_write=[])
if worker_pool:
worker_pool.close()
worker_pool.join()
print("Train time: %0.2f min. Percent rank calibration time: %0.2f min." % (
training_time / 60.0, percent_rank_calibration_time / 60.0))
print("Predictor written to: %s" % args.out_models_dir)
def train_model_entrypoint(item):
return train_model(**item)
Tim O'Donnell
committed
def train_model(
Tim O'Donnell
committed
model_group,
n_models,
allele_num,
n_alleles,
hyperparameter_set_num,
num_hyperparameter_sets,
allele,
Tim O'Donnell
committed
hyperparameters,
Tim O'Donnell
committed
predictor,
save_to):
if predictor is None:
predictor = Class1AffinityPredictor()
if data is None:
full_data = GLOBAL_DATA["train_data"]
data = full_data.loc[full_data.allele == allele]
Tim O'Donnell
committed
"[%2d / %2d hyperparameters] "
"[%4d / %4d alleles] "
"[%2d / %2d replicates]: %s " % (
Tim O'Donnell
committed
hyperparameter_set_num + 1,
num_hyperparameter_sets,
allele_num + 1,
n_alleles,
Tim O'Donnell
committed
allele))
train_data = data.sample(frac=1.0)
(model,) = predictor.fit_allele_specific_predictors(
Tim O'Donnell
committed
n_models=1,
architecture_hyperparameters_list=[hyperparameters],
Tim O'Donnell
committed
allele=allele,
peptides=train_data.peptide.values,
affinities=train_data.measurement_value.values,
inequalities=(
train_data.measurement_inequality.values
if "measurement_inequality" in train_data.columns else None),
models_dir_for_save=save_to,
progress_preamble=progress_preamble,
Tim O'Donnell
committed
if allele_num == 0 and model_group == 0:
# For the first model for the first allele, print the architecture.
print("*** HYPERPARAMETER SET %d***" %
(hyperparameter_set_num + 1))
pprint(hyperparameters)
print("*** ARCHITECTURE FOR HYPERPARAMETER SET %d***" %
(hyperparameter_set_num + 1))
model.network(borrow=True).summary()
return (hyperparameter_set_num, predictor)
Tim O'Donnell
committed
def calibrate_percentile_ranks(allele, predictor, peptides=None):
"""
Private helper function.
"""
if peptides is None:
peptides = GLOBAL_DATA["calibration_peptides"]
predictor.calibrate_percentile_ranks(
peptides=peptides,
alleles=[allele])
return {
allele: predictor.allele_to_percent_rank_transform[allele],
}
def worker_init(env_queue=None):
global GLOBAL_DATA
# The env_queue provides a way for each worker to be configured with a
# specific set of environment variables. We use it to assign GPUs to workers.
if env_queue:
settings = env_queue.get()
print("Setting: ", settings)
os.environ.update(settings)
command_args = GLOBAL_DATA['args']
if command_args.backend:
set_keras_backend(command_args.backend)