From 220aaf314eb6efc15f5d8ee48b102055dc090c70 Mon Sep 17 00:00:00 2001
From: Tim O'Donnell <timodonnell@gmail.com>
Date: Sat, 10 Feb 2018 11:23:00 -0500
Subject: [PATCH] Continually save models as they are trained, even when
 running in parallel

---
 .../models_class1_unselected/GENERATE.sh      |   2 -
 mhcflurry/class1_affinity_predictor.py        |  50 ++++++++
 .../train_allele_specific_models_command.py   | 111 +++++++++---------
 3 files changed, 107 insertions(+), 56 deletions(-)

diff --git a/downloads-generation/models_class1_unselected/GENERATE.sh b/downloads-generation/models_class1_unselected/GENERATE.sh
index f04e23dd..adf5c7ae 100755
--- a/downloads-generation/models_class1_unselected/GENERATE.sh
+++ b/downloads-generation/models_class1_unselected/GENERATE.sh
@@ -17,8 +17,6 @@ rm -rf "$SCRATCH_DIR/$DOWNLOAD_NAME"
 mkdir "$SCRATCH_DIR/$DOWNLOAD_NAME"
 
 # Send stdout and stderr to a logfile included with the archive.
-exec >  >(tee -ia "$SCRATCH_DIR/$DOWNLOAD_NAME/LOG.txt")
-exec 2> >(tee -ia "$SCRATCH_DIR/$DOWNLOAD_NAME/LOG.txt" >&2)
 
 # Log some environment info
 date
diff --git a/mhcflurry/class1_affinity_predictor.py b/mhcflurry/class1_affinity_predictor.py
index aa41f0cf..4a4144ea 100644
--- a/mhcflurry/class1_affinity_predictor.py
+++ b/mhcflurry/class1_affinity_predictor.py
@@ -156,6 +156,56 @@ class Class1AffinityPredictor(object):
             allele_to_fixed_length_sequence=allele_to_fixed_length_sequence
         )
 
+    def merge_in_place(self, others):
+        """
+        Add the models present other predictors into the current predictor.
+
+        Parameters
+        ----------
+        others : list of Class1AffinityPredictor
+            Other predictors to merge into the current predictor.
+
+        Returns
+        -------
+        list of string : names of newly added models
+        """
+
+        new_model_names = []
+        for predictor in others:
+            for model in predictor.class1_pan_allele_models:
+                model_name = self.model_name(
+                    "pan-class1",
+                    len(self.class1_pan_allele_models))
+                self.class1_pan_allele_models.append(model)
+                row = pandas.Series(collections.OrderedDict([
+                    ("model_name", model_name),
+                    ("allele", "pan-class1"),
+                    ("config_json", json.dumps(model.get_config())),
+                    ("model", model),
+                ])).to_frame().T
+                self.manifest_df = pandas.concat(
+                    [self.manifest_df, row], ignore_index=True)
+                new_model_names.append(model_name)
+
+            for allele in predictor.allele_to_allele_specific_models:
+                if allele not in self.allele_to_allele_specific_models:
+                    self.allele_to_allele_specific_models[allele] = []
+                current_models = self.allele_to_allele_specific_models[allele]
+                for model in predictor.allele_to_allele_specific_models[allele]:
+                    model_name = self.model_name(allele, len(current_models))
+                    row = pandas.Series(collections.OrderedDict([
+                        ("model_name", model_name),
+                        ("allele", allele),
+                        ("config_json", json.dumps(model.get_config())),
+                        ("model", model),
+                    ])).to_frame().T
+                    self.manifest_df = pandas.concat(
+                        [self.manifest_df, row], ignore_index=True)
+                    current_models.append(model)
+                    new_model_names.append(model_name)
+
+        return new_model_names
+
     @property
     def supported_alleles(self):
         """
diff --git a/mhcflurry/train_allele_specific_models_command.py b/mhcflurry/train_allele_specific_models_command.py
index dd180995..1711a8a2 100644
--- a/mhcflurry/train_allele_specific_models_command.py
+++ b/mhcflurry/train_allele_specific_models_command.py
@@ -125,7 +125,13 @@ parser.add_argument(
     default=1000,
     help="Maximum number of workers to assign to a GPU. Additional tasks will "
     "run on CPU.")
-
+parser.add_argument(
+    "--save-interval",
+    type=float,
+    metavar="N",
+    default=60,
+    help="Write models to disk every N seconds. Only affects parallel runs; "
+    "serial runs write each model to disk as it is trained.")
 
 def run(argv=sys.argv[1:]):
     global GLOBAL_DATA
@@ -136,6 +142,8 @@ def run(argv=sys.argv[1:]):
 
     args = parser.parse_args(argv)
 
+    args.out_models_dir = os.path.abspath(args.out_models_dir)
+
     configure_logging(verbose=args.verbosity > 1)
 
     hyperparameters_lst = yaml.load(open(args.hyperparameters))
@@ -223,7 +231,7 @@ def run(argv=sys.argv[1:]):
             processes=num_workers)
         print("Started pool of %d workers: %s" % (num_workers, str(worker_pool)))
 
-    if args.out_models_dir and not os.path.exists(args.out_models_dir):
+    if not os.path.exists(args.out_models_dir):
         print("Attempting to create directory: %s" % args.out_models_dir)
         os.mkdir(args.out_models_dir)
         print("Done.")
@@ -243,23 +251,21 @@ def run(argv=sys.argv[1:]):
             hyperparameters['max_epochs'] = args.max_epochs
 
         for (i, allele) in enumerate(df.allele.unique()):
-            for model_group in range(n_models):
-                work_dict = {
-                    'model_group': model_group,
-                    'n_models': n_models,
-                    'allele_num': i,
-                    'n_alleles': len(alleles),
-                    'hyperparameter_set_num': h,
-                    'num_hyperparameter_sets': len(hyperparameters_lst),
-                    'allele': allele,
-                    'data': None,  # subselect from GLOBAL_DATA["train_data"]
-                    'hyperparameters': hyperparameters,
-                    'verbose': args.verbosity,
-                    'progress_print_interval': None if worker_pool else 5.0,
-                    'predictor': predictor if not worker_pool else None,
-                    'save_to': args.out_models_dir if not worker_pool else None,
-                }
-                work_items.append(work_dict)
+            work_dict = {
+                'n_models': n_models,
+                'allele_num': i,
+                'n_alleles': len(alleles),
+                'hyperparameter_set_num': h,
+                'num_hyperparameter_sets': len(hyperparameters_lst),
+                'allele': allele,
+                'data': None,  # subselect from GLOBAL_DATA["train_data"]
+                'hyperparameters': hyperparameters,
+                'verbose': args.verbosity,
+                'progress_print_interval': None if worker_pool else 5.0,
+                'predictor': predictor if not worker_pool else None,
+                'save_to': args.out_models_dir if not worker_pool else None,
+            }
+            work_items.append(work_dict)
 
     if worker_pool:
         print("Processing %d work items in parallel." % len(work_items))
@@ -268,23 +274,35 @@ def run(argv=sys.argv[1:]):
         # the order of the work.
         random.shuffle(work_items)
 
-        # We sort here so the predictors are in order of hyperparameter set num.
-        # This is convenient so that the neural networks get merged for each
-        # allele in the same order.
-        predictors = [
-            predictor for (_, predictor)
-            in sorted(
-                tqdm.tqdm(
-                    worker_pool.imap_unordered(
-                        train_model_entrypoint, work_items, chunksize=1),
-                    total=len(work_items)),
-                key=lambda pair: pair[0])
-        ]
+        results_generator = worker_pool.imap_unordered(
+            train_model_entrypoint, work_items, chunksize=1)
+
+        unsaved_predictors = []
+        last_save_time = time.time()
+        for new_predictor in tqdm.tqdm(results_generator, total=len(work_items)):
+            unsaved_predictors.append(new_predictor)
+
+            if time.time() > last_save_time + args.save_interval:
+                # Save current predictor.
+                save_start = time.time()
+                new_model_names = predictor.merge_in_place(unsaved_predictors)
+                predictor.save(
+                    args.out_models_dir, model_names_to_write=new_model_names)
+                print(
+                    "Saved predictor (%d models total) including %d new models "
+                    "in %0.2f sec to %s" % (
+                        len(predictor.neural_networks),
+                        len(new_model_names),
+                        time.time() - save_start,
+                        args.out_models_dir))
+                unsaved_predictors.clear()
+                last_save_time = time.time()
+
+        print("Saving final predictor to: %s" % args.out_models_dir)
+        predictor.merge_in_place(unsaved_predictors)
+        predictor.save(args.out_models_dir)  # write all models just to be sure
+        print("Done.")
 
-        print("Merging %d predictors fit in parallel." % (len(predictors)))
-        predictor = Class1AffinityPredictor.merge([predictor] + predictors)
-        print("Saving merged predictor to: %s" % args.out_models_dir)
-        predictor.save(args.out_models_dir)
     else:
         # Run in serial. In this case, every worker is passed the same predictor,
         # which it adds models to, so no merging is required. It also saves
@@ -373,7 +391,6 @@ def train_model_entrypoint(item):
 
 
 def train_model(
-        model_group,
         n_models,
         allele_num,
         n_alleles,
@@ -404,23 +421,18 @@ def train_model(
     else:
         raise ValueError("Unsupported subset: %s" % subset)
 
-    hyperparameters.setdefault("train_data", {})["num_points"] = len(data)
-
     progress_preamble = (
         "[%2d / %2d hyperparameters] "
-        "[%4d / %4d alleles] "
-        "[%2d / %2d replicates]: %s " % (
+        "[%4d / %4d alleles] %s " % (
             hyperparameter_set_num + 1,
             num_hyperparameter_sets,
             allele_num + 1,
             n_alleles,
-            model_group + 1,
-            n_models,
             allele))
 
     train_data = data.sample(frac=1.0)
-    (model,) = predictor.fit_allele_specific_predictors(
-        n_models=1,
+    predictor.fit_allele_specific_predictors(
+        n_models=n_models,
         architecture_hyperparameters_list=[hyperparameters],
         allele=allele,
         peptides=train_data.peptide.values,
@@ -433,16 +445,7 @@ def train_model(
         progress_print_interval=progress_print_interval,
         verbose=verbose)
 
-    if allele_num == 0 and model_group == 0:
-        # For the first model for the first allele, print the architecture.
-        print("*** HYPERPARAMETER SET %d***" %
-              (hyperparameter_set_num + 1))
-        pprint(hyperparameters)
-        print("*** ARCHITECTURE FOR HYPERPARAMETER SET %d***" %
-              (hyperparameter_set_num + 1))
-        model.network(borrow=True).summary()
-
-    return (hyperparameter_set_num, predictor)
+    return predictor
 
 
 def calibrate_percentile_ranks(allele, predictor, peptides=None):
-- 
GitLab