Skip to content
Snippets Groups Projects
Commit 220aaf31 authored by Tim O'Donnell's avatar Tim O'Donnell
Browse files

Continually save models as they are trained, even when running in parallel

parent 9acc68b5
No related branches found
No related tags found
No related merge requests found
......@@ -17,8 +17,6 @@ rm -rf "$SCRATCH_DIR/$DOWNLOAD_NAME"
mkdir "$SCRATCH_DIR/$DOWNLOAD_NAME"
# Send stdout and stderr to a logfile included with the archive.
exec > >(tee -ia "$SCRATCH_DIR/$DOWNLOAD_NAME/LOG.txt")
exec 2> >(tee -ia "$SCRATCH_DIR/$DOWNLOAD_NAME/LOG.txt" >&2)
# Log some environment info
date
......
......@@ -156,6 +156,56 @@ class Class1AffinityPredictor(object):
allele_to_fixed_length_sequence=allele_to_fixed_length_sequence
)
def merge_in_place(self, others):
"""
Add the models present other predictors into the current predictor.
Parameters
----------
others : list of Class1AffinityPredictor
Other predictors to merge into the current predictor.
Returns
-------
list of string : names of newly added models
"""
new_model_names = []
for predictor in others:
for model in predictor.class1_pan_allele_models:
model_name = self.model_name(
"pan-class1",
len(self.class1_pan_allele_models))
self.class1_pan_allele_models.append(model)
row = pandas.Series(collections.OrderedDict([
("model_name", model_name),
("allele", "pan-class1"),
("config_json", json.dumps(model.get_config())),
("model", model),
])).to_frame().T
self.manifest_df = pandas.concat(
[self.manifest_df, row], ignore_index=True)
new_model_names.append(model_name)
for allele in predictor.allele_to_allele_specific_models:
if allele not in self.allele_to_allele_specific_models:
self.allele_to_allele_specific_models[allele] = []
current_models = self.allele_to_allele_specific_models[allele]
for model in predictor.allele_to_allele_specific_models[allele]:
model_name = self.model_name(allele, len(current_models))
row = pandas.Series(collections.OrderedDict([
("model_name", model_name),
("allele", allele),
("config_json", json.dumps(model.get_config())),
("model", model),
])).to_frame().T
self.manifest_df = pandas.concat(
[self.manifest_df, row], ignore_index=True)
current_models.append(model)
new_model_names.append(model_name)
return new_model_names
@property
def supported_alleles(self):
"""
......
......@@ -125,7 +125,13 @@ parser.add_argument(
default=1000,
help="Maximum number of workers to assign to a GPU. Additional tasks will "
"run on CPU.")
parser.add_argument(
"--save-interval",
type=float,
metavar="N",
default=60,
help="Write models to disk every N seconds. Only affects parallel runs; "
"serial runs write each model to disk as it is trained.")
def run(argv=sys.argv[1:]):
global GLOBAL_DATA
......@@ -136,6 +142,8 @@ def run(argv=sys.argv[1:]):
args = parser.parse_args(argv)
args.out_models_dir = os.path.abspath(args.out_models_dir)
configure_logging(verbose=args.verbosity > 1)
hyperparameters_lst = yaml.load(open(args.hyperparameters))
......@@ -223,7 +231,7 @@ def run(argv=sys.argv[1:]):
processes=num_workers)
print("Started pool of %d workers: %s" % (num_workers, str(worker_pool)))
if args.out_models_dir and not os.path.exists(args.out_models_dir):
if not os.path.exists(args.out_models_dir):
print("Attempting to create directory: %s" % args.out_models_dir)
os.mkdir(args.out_models_dir)
print("Done.")
......@@ -243,23 +251,21 @@ def run(argv=sys.argv[1:]):
hyperparameters['max_epochs'] = args.max_epochs
for (i, allele) in enumerate(df.allele.unique()):
for model_group in range(n_models):
work_dict = {
'model_group': model_group,
'n_models': n_models,
'allele_num': i,
'n_alleles': len(alleles),
'hyperparameter_set_num': h,
'num_hyperparameter_sets': len(hyperparameters_lst),
'allele': allele,
'data': None, # subselect from GLOBAL_DATA["train_data"]
'hyperparameters': hyperparameters,
'verbose': args.verbosity,
'progress_print_interval': None if worker_pool else 5.0,
'predictor': predictor if not worker_pool else None,
'save_to': args.out_models_dir if not worker_pool else None,
}
work_items.append(work_dict)
work_dict = {
'n_models': n_models,
'allele_num': i,
'n_alleles': len(alleles),
'hyperparameter_set_num': h,
'num_hyperparameter_sets': len(hyperparameters_lst),
'allele': allele,
'data': None, # subselect from GLOBAL_DATA["train_data"]
'hyperparameters': hyperparameters,
'verbose': args.verbosity,
'progress_print_interval': None if worker_pool else 5.0,
'predictor': predictor if not worker_pool else None,
'save_to': args.out_models_dir if not worker_pool else None,
}
work_items.append(work_dict)
if worker_pool:
print("Processing %d work items in parallel." % len(work_items))
......@@ -268,23 +274,35 @@ def run(argv=sys.argv[1:]):
# the order of the work.
random.shuffle(work_items)
# We sort here so the predictors are in order of hyperparameter set num.
# This is convenient so that the neural networks get merged for each
# allele in the same order.
predictors = [
predictor for (_, predictor)
in sorted(
tqdm.tqdm(
worker_pool.imap_unordered(
train_model_entrypoint, work_items, chunksize=1),
total=len(work_items)),
key=lambda pair: pair[0])
]
results_generator = worker_pool.imap_unordered(
train_model_entrypoint, work_items, chunksize=1)
unsaved_predictors = []
last_save_time = time.time()
for new_predictor in tqdm.tqdm(results_generator, total=len(work_items)):
unsaved_predictors.append(new_predictor)
if time.time() > last_save_time + args.save_interval:
# Save current predictor.
save_start = time.time()
new_model_names = predictor.merge_in_place(unsaved_predictors)
predictor.save(
args.out_models_dir, model_names_to_write=new_model_names)
print(
"Saved predictor (%d models total) including %d new models "
"in %0.2f sec to %s" % (
len(predictor.neural_networks),
len(new_model_names),
time.time() - save_start,
args.out_models_dir))
unsaved_predictors.clear()
last_save_time = time.time()
print("Saving final predictor to: %s" % args.out_models_dir)
predictor.merge_in_place(unsaved_predictors)
predictor.save(args.out_models_dir) # write all models just to be sure
print("Done.")
print("Merging %d predictors fit in parallel." % (len(predictors)))
predictor = Class1AffinityPredictor.merge([predictor] + predictors)
print("Saving merged predictor to: %s" % args.out_models_dir)
predictor.save(args.out_models_dir)
else:
# Run in serial. In this case, every worker is passed the same predictor,
# which it adds models to, so no merging is required. It also saves
......@@ -373,7 +391,6 @@ def train_model_entrypoint(item):
def train_model(
model_group,
n_models,
allele_num,
n_alleles,
......@@ -404,23 +421,18 @@ def train_model(
else:
raise ValueError("Unsupported subset: %s" % subset)
hyperparameters.setdefault("train_data", {})["num_points"] = len(data)
progress_preamble = (
"[%2d / %2d hyperparameters] "
"[%4d / %4d alleles] "
"[%2d / %2d replicates]: %s " % (
"[%4d / %4d alleles] %s " % (
hyperparameter_set_num + 1,
num_hyperparameter_sets,
allele_num + 1,
n_alleles,
model_group + 1,
n_models,
allele))
train_data = data.sample(frac=1.0)
(model,) = predictor.fit_allele_specific_predictors(
n_models=1,
predictor.fit_allele_specific_predictors(
n_models=n_models,
architecture_hyperparameters_list=[hyperparameters],
allele=allele,
peptides=train_data.peptide.values,
......@@ -433,16 +445,7 @@ def train_model(
progress_print_interval=progress_print_interval,
verbose=verbose)
if allele_num == 0 and model_group == 0:
# For the first model for the first allele, print the architecture.
print("*** HYPERPARAMETER SET %d***" %
(hyperparameter_set_num + 1))
pprint(hyperparameters)
print("*** ARCHITECTURE FOR HYPERPARAMETER SET %d***" %
(hyperparameter_set_num + 1))
model.network(borrow=True).summary()
return (hyperparameter_set_num, predictor)
return predictor
def calibrate_percentile_ranks(allele, predictor, peptides=None):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment