diff --git a/downloads-generation/models_class1_unselected/GENERATE.sh b/downloads-generation/models_class1_unselected/GENERATE.sh index f04e23ddf9d603a6a255a86b1b9d0f6bf132e241..adf5c7ae230c44aecf78bc6506a4ec0f13464211 100755 --- a/downloads-generation/models_class1_unselected/GENERATE.sh +++ b/downloads-generation/models_class1_unselected/GENERATE.sh @@ -17,8 +17,6 @@ rm -rf "$SCRATCH_DIR/$DOWNLOAD_NAME" mkdir "$SCRATCH_DIR/$DOWNLOAD_NAME" # Send stdout and stderr to a logfile included with the archive. -exec > >(tee -ia "$SCRATCH_DIR/$DOWNLOAD_NAME/LOG.txt") -exec 2> >(tee -ia "$SCRATCH_DIR/$DOWNLOAD_NAME/LOG.txt" >&2) # Log some environment info date diff --git a/mhcflurry/class1_affinity_predictor.py b/mhcflurry/class1_affinity_predictor.py index aa41f0cf739f340293f6710b45ab29a54869c4fa..4a4144ea6ba7b9dbd1491c1b24346390e8cb080f 100644 --- a/mhcflurry/class1_affinity_predictor.py +++ b/mhcflurry/class1_affinity_predictor.py @@ -156,6 +156,56 @@ class Class1AffinityPredictor(object): allele_to_fixed_length_sequence=allele_to_fixed_length_sequence ) + def merge_in_place(self, others): + """ + Add the models present other predictors into the current predictor. + + Parameters + ---------- + others : list of Class1AffinityPredictor + Other predictors to merge into the current predictor. + + Returns + ------- + list of string : names of newly added models + """ + + new_model_names = [] + for predictor in others: + for model in predictor.class1_pan_allele_models: + model_name = self.model_name( + "pan-class1", + len(self.class1_pan_allele_models)) + self.class1_pan_allele_models.append(model) + row = pandas.Series(collections.OrderedDict([ + ("model_name", model_name), + ("allele", "pan-class1"), + ("config_json", json.dumps(model.get_config())), + ("model", model), + ])).to_frame().T + self.manifest_df = pandas.concat( + [self.manifest_df, row], ignore_index=True) + new_model_names.append(model_name) + + for allele in predictor.allele_to_allele_specific_models: + if allele not in self.allele_to_allele_specific_models: + self.allele_to_allele_specific_models[allele] = [] + current_models = self.allele_to_allele_specific_models[allele] + for model in predictor.allele_to_allele_specific_models[allele]: + model_name = self.model_name(allele, len(current_models)) + row = pandas.Series(collections.OrderedDict([ + ("model_name", model_name), + ("allele", allele), + ("config_json", json.dumps(model.get_config())), + ("model", model), + ])).to_frame().T + self.manifest_df = pandas.concat( + [self.manifest_df, row], ignore_index=True) + current_models.append(model) + new_model_names.append(model_name) + + return new_model_names + @property def supported_alleles(self): """ diff --git a/mhcflurry/train_allele_specific_models_command.py b/mhcflurry/train_allele_specific_models_command.py index dd180995443112238b6b43dd2f4e22d1d0085161..1711a8a2097a52ed575ecf428de6eabe88c88e00 100644 --- a/mhcflurry/train_allele_specific_models_command.py +++ b/mhcflurry/train_allele_specific_models_command.py @@ -125,7 +125,13 @@ parser.add_argument( default=1000, help="Maximum number of workers to assign to a GPU. Additional tasks will " "run on CPU.") - +parser.add_argument( + "--save-interval", + type=float, + metavar="N", + default=60, + help="Write models to disk every N seconds. Only affects parallel runs; " + "serial runs write each model to disk as it is trained.") def run(argv=sys.argv[1:]): global GLOBAL_DATA @@ -136,6 +142,8 @@ def run(argv=sys.argv[1:]): args = parser.parse_args(argv) + args.out_models_dir = os.path.abspath(args.out_models_dir) + configure_logging(verbose=args.verbosity > 1) hyperparameters_lst = yaml.load(open(args.hyperparameters)) @@ -223,7 +231,7 @@ def run(argv=sys.argv[1:]): processes=num_workers) print("Started pool of %d workers: %s" % (num_workers, str(worker_pool))) - if args.out_models_dir and not os.path.exists(args.out_models_dir): + if not os.path.exists(args.out_models_dir): print("Attempting to create directory: %s" % args.out_models_dir) os.mkdir(args.out_models_dir) print("Done.") @@ -243,23 +251,21 @@ def run(argv=sys.argv[1:]): hyperparameters['max_epochs'] = args.max_epochs for (i, allele) in enumerate(df.allele.unique()): - for model_group in range(n_models): - work_dict = { - 'model_group': model_group, - 'n_models': n_models, - 'allele_num': i, - 'n_alleles': len(alleles), - 'hyperparameter_set_num': h, - 'num_hyperparameter_sets': len(hyperparameters_lst), - 'allele': allele, - 'data': None, # subselect from GLOBAL_DATA["train_data"] - 'hyperparameters': hyperparameters, - 'verbose': args.verbosity, - 'progress_print_interval': None if worker_pool else 5.0, - 'predictor': predictor if not worker_pool else None, - 'save_to': args.out_models_dir if not worker_pool else None, - } - work_items.append(work_dict) + work_dict = { + 'n_models': n_models, + 'allele_num': i, + 'n_alleles': len(alleles), + 'hyperparameter_set_num': h, + 'num_hyperparameter_sets': len(hyperparameters_lst), + 'allele': allele, + 'data': None, # subselect from GLOBAL_DATA["train_data"] + 'hyperparameters': hyperparameters, + 'verbose': args.verbosity, + 'progress_print_interval': None if worker_pool else 5.0, + 'predictor': predictor if not worker_pool else None, + 'save_to': args.out_models_dir if not worker_pool else None, + } + work_items.append(work_dict) if worker_pool: print("Processing %d work items in parallel." % len(work_items)) @@ -268,23 +274,35 @@ def run(argv=sys.argv[1:]): # the order of the work. random.shuffle(work_items) - # We sort here so the predictors are in order of hyperparameter set num. - # This is convenient so that the neural networks get merged for each - # allele in the same order. - predictors = [ - predictor for (_, predictor) - in sorted( - tqdm.tqdm( - worker_pool.imap_unordered( - train_model_entrypoint, work_items, chunksize=1), - total=len(work_items)), - key=lambda pair: pair[0]) - ] + results_generator = worker_pool.imap_unordered( + train_model_entrypoint, work_items, chunksize=1) + + unsaved_predictors = [] + last_save_time = time.time() + for new_predictor in tqdm.tqdm(results_generator, total=len(work_items)): + unsaved_predictors.append(new_predictor) + + if time.time() > last_save_time + args.save_interval: + # Save current predictor. + save_start = time.time() + new_model_names = predictor.merge_in_place(unsaved_predictors) + predictor.save( + args.out_models_dir, model_names_to_write=new_model_names) + print( + "Saved predictor (%d models total) including %d new models " + "in %0.2f sec to %s" % ( + len(predictor.neural_networks), + len(new_model_names), + time.time() - save_start, + args.out_models_dir)) + unsaved_predictors.clear() + last_save_time = time.time() + + print("Saving final predictor to: %s" % args.out_models_dir) + predictor.merge_in_place(unsaved_predictors) + predictor.save(args.out_models_dir) # write all models just to be sure + print("Done.") - print("Merging %d predictors fit in parallel." % (len(predictors))) - predictor = Class1AffinityPredictor.merge([predictor] + predictors) - print("Saving merged predictor to: %s" % args.out_models_dir) - predictor.save(args.out_models_dir) else: # Run in serial. In this case, every worker is passed the same predictor, # which it adds models to, so no merging is required. It also saves @@ -373,7 +391,6 @@ def train_model_entrypoint(item): def train_model( - model_group, n_models, allele_num, n_alleles, @@ -404,23 +421,18 @@ def train_model( else: raise ValueError("Unsupported subset: %s" % subset) - hyperparameters.setdefault("train_data", {})["num_points"] = len(data) - progress_preamble = ( "[%2d / %2d hyperparameters] " - "[%4d / %4d alleles] " - "[%2d / %2d replicates]: %s " % ( + "[%4d / %4d alleles] %s " % ( hyperparameter_set_num + 1, num_hyperparameter_sets, allele_num + 1, n_alleles, - model_group + 1, - n_models, allele)) train_data = data.sample(frac=1.0) - (model,) = predictor.fit_allele_specific_predictors( - n_models=1, + predictor.fit_allele_specific_predictors( + n_models=n_models, architecture_hyperparameters_list=[hyperparameters], allele=allele, peptides=train_data.peptide.values, @@ -433,16 +445,7 @@ def train_model( progress_print_interval=progress_print_interval, verbose=verbose) - if allele_num == 0 and model_group == 0: - # For the first model for the first allele, print the architecture. - print("*** HYPERPARAMETER SET %d***" % - (hyperparameter_set_num + 1)) - pprint(hyperparameters) - print("*** ARCHITECTURE FOR HYPERPARAMETER SET %d***" % - (hyperparameter_set_num + 1)) - model.network(borrow=True).summary() - - return (hyperparameter_set_num, predictor) + return predictor def calibrate_percentile_ranks(allele, predictor, peptides=None):