From f56a579c631ffba6d73ead99ae10c2764ca6b2a8 Mon Sep 17 00:00:00 2001
From: Tim O'Donnell <timodonnell@gmail.com>
Date: Wed, 10 Jul 2019 17:29:04 -0400
Subject: [PATCH] cluster parallelism

---
 .../GENERATE.WITH_HPC_CLUSTER.sh              | 56 +++++++++++++
 .../cluster_submit_script_header.mssm_hpc.lsf | 15 ++++
 .../calibrate_percentile_ranks_command.py     |  6 +-
 .../{parallelism.py => local_parallelism.py}  |  4 +-
 .../select_allele_specific_models_command.py  |  4 +-
 mhcflurry/select_pan_allele_models_command.py |  4 +-
 .../train_allele_specific_models_command.py   |  6 +-
 mhcflurry/train_pan_allele_models_command.py  | 83 +++++++++++--------
 setup.py                                      |  2 +
 test/test_train_pan_allele_models_command.py  | 14 +++-
 10 files changed, 146 insertions(+), 48 deletions(-)
 create mode 100755 downloads-generation/models_class1_pan_unselected/GENERATE.WITH_HPC_CLUSTER.sh
 create mode 100644 downloads-generation/models_class1_pan_unselected/cluster_submit_script_header.mssm_hpc.lsf
 rename mhcflurry/{parallelism.py => local_parallelism.py} (98%)

diff --git a/downloads-generation/models_class1_pan_unselected/GENERATE.WITH_HPC_CLUSTER.sh b/downloads-generation/models_class1_pan_unselected/GENERATE.WITH_HPC_CLUSTER.sh
new file mode 100755
index 00000000..0a064dfe
--- /dev/null
+++ b/downloads-generation/models_class1_pan_unselected/GENERATE.WITH_HPC_CLUSTER.sh
@@ -0,0 +1,56 @@
+#!/bin/bash
+#
+# Train pan-allele MHCflurry Class I models.
+#
+# Uses an HPC cluster (Mount Sinai chimera cluster, which uses lsf job
+# scheduler). This would need to be modified for other sites.
+#
+set -e
+set -x
+
+DOWNLOAD_NAME=models_class1_pan_unselected
+SCRATCH_DIR=${TMPDIR-/tmp}/mhcflurry-downloads-generation
+SCRIPT_ABSOLUTE_PATH="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/$(basename "${BASH_SOURCE[0]}")"
+SCRIPT_DIR=$(dirname "$SCRIPT_ABSOLUTE_PATH")
+
+mkdir -p "$SCRATCH_DIR"
+rm -rf "$SCRATCH_DIR/$DOWNLOAD_NAME"
+mkdir "$SCRATCH_DIR/$DOWNLOAD_NAME"
+
+# Send stdout and stderr to a logfile included with the archive.
+exec >  >(tee -ia "$SCRATCH_DIR/$DOWNLOAD_NAME/LOG.txt")
+exec 2> >(tee -ia "$SCRATCH_DIR/$DOWNLOAD_NAME/LOG.txt" >&2)
+
+# Log some environment info
+date
+pip freeze
+git status
+
+cd $SCRATCH_DIR/$DOWNLOAD_NAME
+
+cp $SCRIPT_DIR/generate_hyperparameters.py .
+python generate_hyperparameters.py > hyperparameters.yaml
+
+for kind in with_mass_spec no_mass_spec
+do
+    mhcflurry-class1-train-pan-allele-models \
+        --data "$(mhcflurry-downloads path data_curated)/curated_training_data.${kind}.csv.bz2" \
+        --allele-sequences "$(mhcflurry-downloads path allele_sequences)/allele_sequences.csv" \
+        --pretrain-data "$(mhcflurry-downloads path random_peptide_predictions)/predictions.csv.bz2" \
+        --held-out-measurements-per-allele-fraction-and-max 0.25 100 \
+        --ensemble-size 4 \
+        --hyperparameters hyperparameters.yaml \
+        --out-models-dir models.${kind} \
+        --worker-log-dir "$SCRATCH_DIR/$DOWNLOAD_NAME" \
+        --verbosity 1 \
+        --cluster-parallelism \
+        --cluster-submit-command bsub \
+        --cluster-results-workdir ~/mhcflurry-model-training \
+        --cluster-script-prefix-path ./cluster_submit_script_header.mssm_hpc.lsf
+done
+
+cp $SCRIPT_ABSOLUTE_PATH .
+bzip2 LOG.txt
+for i in $(ls LOG-worker.*.txt) ; do bzip2 $i ; done
+tar -cjf "../${DOWNLOAD_NAME}.tar.bz2" *
+echo "Created archive: $SCRATCH_DIR/${DOWNLOAD_NAME}.tar.bz2"
diff --git a/downloads-generation/models_class1_pan_unselected/cluster_submit_script_header.mssm_hpc.lsf b/downloads-generation/models_class1_pan_unselected/cluster_submit_script_header.mssm_hpc.lsf
new file mode 100644
index 00000000..14b5fcbc
--- /dev/null
+++ b/downloads-generation/models_class1_pan_unselected/cluster_submit_script_header.mssm_hpc.lsf
@@ -0,0 +1,15 @@
+#!/bin/bash
+#BSUB -J mhcf-{work_item_num} # Job name
+#BSUB -P acc_nkcancer # allocation account or Unix group
+#BSUB -q gpu # queue
+#BSUB -R span[hosts=1] # one node
+#BSUB -n 1 # number of compute cores
+#BSUB -W 36:00 # walltime in HH:MM
+#BSUB -R rusage[mem=60000] # mb memory requested
+#BSUB -o {work_dir}/%J.stdout # output log (%J : JobID)
+#BSUB -eo {work_dir}/%J.stderr # error log
+#BSUB -L /bin/bash # Initialize the execution environment
+
+set -e
+free -m
+cd {work_dir}
diff --git a/mhcflurry/calibrate_percentile_ranks_command.py b/mhcflurry/calibrate_percentile_ranks_command.py
index 74dee9d4..87243bdb 100644
--- a/mhcflurry/calibrate_percentile_ranks_command.py
+++ b/mhcflurry/calibrate_percentile_ranks_command.py
@@ -15,8 +15,8 @@ tqdm.monitor_interval = 0  # see https://github.com/tqdm/tqdm/issues/481
 
 from .class1_affinity_predictor import Class1AffinityPredictor
 from .common import configure_logging
-from .parallelism import (
-    add_worker_pool_args,
+from .local_parallelism import (
+    add_local_parallelism_args,
     worker_pool_with_gpu_assignments_from_args,
     call_wrapped)
 
@@ -54,7 +54,7 @@ parser.add_argument(
     help="Keras verbosity. Default: %(default)s",
     default=0)
 
-add_worker_pool_args(parser)
+add_local_parallelism_args(parser)
 
 def run(argv=sys.argv[1:]):
     global GLOBAL_DATA
diff --git a/mhcflurry/parallelism.py b/mhcflurry/local_parallelism.py
similarity index 98%
rename from mhcflurry/parallelism.py
rename to mhcflurry/local_parallelism.py
index 3786b9c6..5a3d311b 100644
--- a/mhcflurry/parallelism.py
+++ b/mhcflurry/local_parallelism.py
@@ -13,8 +13,8 @@ import numpy
 from .common import set_keras_backend
 
 
-def add_worker_pool_args(parser):
-    group = parser.add_argument_group("Worker pool")
+def add_local_parallelism_args(parser):
+    group = parser.add_argument_group("Local parallelism")
 
     group.add_argument(
         "--num-jobs",
diff --git a/mhcflurry/select_allele_specific_models_command.py b/mhcflurry/select_allele_specific_models_command.py
index 22618555..ecfc5f23 100644
--- a/mhcflurry/select_allele_specific_models_command.py
+++ b/mhcflurry/select_allele_specific_models_command.py
@@ -22,7 +22,7 @@ tqdm.monitor_interval = 0  # see https://github.com/tqdm/tqdm/issues/481
 from .class1_affinity_predictor import Class1AffinityPredictor
 from .encodable_sequences import EncodableSequences
 from .common import configure_logging, random_peptides
-from .parallelism import worker_pool_with_gpu_assignments_from_args, add_worker_pool_args
+from .local_parallelism import worker_pool_with_gpu_assignments_from_args, add_local_parallelism_args
 from .regression_target import from_ic50
 
 
@@ -176,7 +176,7 @@ parser.add_argument(
     help="Keras verbosity. Default: %(default)s",
     default=0)
 
-add_worker_pool_args(parser)
+add_local_parallelism_args(parser)
 
 
 def run(argv=sys.argv[1:]):
diff --git a/mhcflurry/select_pan_allele_models_command.py b/mhcflurry/select_pan_allele_models_command.py
index c564e7c1..12f53a36 100644
--- a/mhcflurry/select_pan_allele_models_command.py
+++ b/mhcflurry/select_pan_allele_models_command.py
@@ -22,7 +22,7 @@ from .class1_affinity_predictor import Class1AffinityPredictor
 from .encodable_sequences import EncodableSequences
 from .allele_encoding import AlleleEncoding
 from .common import configure_logging, random_peptides
-from .parallelism import worker_pool_with_gpu_assignments_from_args, add_worker_pool_args
+from .local_parallelism import worker_pool_with_gpu_assignments_from_args, add_local_parallelism_args
 from .regression_target import from_ic50
 
 
@@ -82,7 +82,7 @@ parser.add_argument(
     help="Keras verbosity. Default: %(default)s",
     default=0)
 
-add_worker_pool_args(parser)
+add_local_parallelism_args(parser)
 
 
 def mse(
diff --git a/mhcflurry/train_allele_specific_models_command.py b/mhcflurry/train_allele_specific_models_command.py
index 2b5a5212..c1b057c8 100644
--- a/mhcflurry/train_allele_specific_models_command.py
+++ b/mhcflurry/train_allele_specific_models_command.py
@@ -21,8 +21,8 @@ tqdm.monitor_interval = 0  # see https://github.com/tqdm/tqdm/issues/481
 
 from .class1_affinity_predictor import Class1AffinityPredictor
 from .common import configure_logging
-from .parallelism import (
-    add_worker_pool_args,
+from .local_parallelism import (
+    add_local_parallelism_args,
     worker_pool_with_gpu_assignments_from_args,
     call_wrapped_kwargs)
 from .hyperparameters import HyperparameterDefaults
@@ -122,7 +122,7 @@ parser.add_argument(
     help="Keras verbosity. Default: %(default)s",
     default=0)
 
-add_worker_pool_args(parser)
+add_local_parallelism_args(parser)
 
 TRAIN_DATA_HYPERPARAMETER_DEFAULTS = HyperparameterDefaults(
     subset="all",
diff --git a/mhcflurry/train_pan_allele_models_command.py b/mhcflurry/train_pan_allele_models_command.py
index 1ec1118c..b5aaf36e 100644
--- a/mhcflurry/train_pan_allele_models_command.py
+++ b/mhcflurry/train_pan_allele_models_command.py
@@ -10,6 +10,8 @@ import traceback
 import random
 import pprint
 import hashlib
+import pickle
+import subprocess
 from functools import partial
 
 import numpy
@@ -22,10 +24,13 @@ tqdm.monitor_interval = 0  # see https://github.com/tqdm/tqdm/issues/481
 from .class1_affinity_predictor import Class1AffinityPredictor
 from .class1_neural_network import Class1NeuralNetwork
 from .common import configure_logging
-from .parallelism import (
-    add_worker_pool_args,
+from .local_parallelism import (
+    add_local_parallelism_args,
     worker_pool_with_gpu_assignments_from_args,
     call_wrapped_kwargs)
+from .cluster_parallelism import (
+    add_cluster_parallelism_args,
+    cluster_results_from_args)
 from .allele_encoding import AlleleEncoding
 from .encodable_sequences import EncodableSequences
 
@@ -121,8 +126,8 @@ parser.add_argument(
     default=False,
     help="Launch python debugger on error")
 
-add_worker_pool_args(parser)
-
+add_local_parallelism_args(parser)
+add_cluster_parallelism_args(parser)
 
 def assign_folds(df, num_folds, held_out_fraction, held_out_max):
     result_df = pandas.DataFrame(index=df.index)
@@ -335,9 +340,6 @@ def main(args):
 
     start = time.time()
 
-    worker_pool = worker_pool_with_gpu_assignments_from_args(args)
-    print("Worker pool", worker_pool)
-
     # The estimated time to completion is more accurate if we randomize
     # the order of the work.
     random.shuffle(work_items)
@@ -345,15 +347,41 @@ def main(args):
         item['work_item_num'] = work_item_num
         item['num_work_items'] = len(work_items)
 
-    if worker_pool:
-        print("Processing %d work items in parallel." % len(work_items))
-        assert not serial_run
-
-        results_generator = worker_pool.imap_unordered(
-            partial(call_wrapped_kwargs, train_model),
-            work_items,
-            chunksize=1)
-
+    if args.cluster_parallelism:
+        # Run using separate processes HPC cluster.
+        results_generator = cluster_results_from_args(
+            args,
+            work_function=train_model,
+            work_items=work_items,
+            constant_data=GLOBAL_DATA,
+            result_serialization_method="save_predictor")
+        worker_pool = None
+    else:
+        worker_pool = worker_pool_with_gpu_assignments_from_args(args)
+        print("Worker pool", worker_pool)
+
+        if worker_pool:
+            print("Processing %d work items in parallel." % len(work_items))
+            assert not serial_run
+
+            results_generator = worker_pool.imap_unordered(
+                partial(call_wrapped_kwargs, train_model),
+                work_items,
+                chunksize=1)
+        else:
+            # Run in serial. In this case, every worker is passed the same predictor,
+            # which it adds models to, so no merging is required. It also saves
+            # as it goes so no saving is required at the end.
+            print("Processing %d work items in serial." % len(work_items))
+            assert serial_run
+            for _ in tqdm.trange(len(work_items)):
+                item = work_items.pop(0)  # want to keep freeing up memory
+                work_predictor = train_model(**item)
+                assert work_predictor is predictor
+            assert not work_items
+            results_generator = None
+
+    if results_generator:
         unsaved_predictors = []
         last_save_time = time.time()
         for new_predictor in tqdm.tqdm(results_generator, total=len(work_items)):
@@ -379,18 +407,6 @@ def main(args):
 
         predictor.merge_in_place(unsaved_predictors)
 
-    else:
-        # Run in serial. In this case, every worker is passed the same predictor,
-        # which it adds models to, so no merging is required. It also saves
-        # as it goes so no saving is required at the end.
-        print("Processing %d work items in serial." % len(work_items))
-        assert serial_run
-        for _ in tqdm.trange(len(work_items)):
-            item = work_items.pop(0)  # want to keep freeing up memory
-            work_predictor = train_model(**item)
-            assert work_predictor is predictor
-        assert not work_items
-
     print("Saving final predictor to: %s" % args.out_models_dir)
     predictor.save(args.out_models_dir)  # write all models just to be sure
     print("Done.")
@@ -422,12 +438,13 @@ def train_model(
         verbose,
         progress_print_interval,
         predictor,
-        save_to):
+        save_to,
+        constant_data=GLOBAL_DATA):
 
-    df = GLOBAL_DATA["train_data"]
-    folds_df = GLOBAL_DATA["folds_df"]
-    allele_encoding = GLOBAL_DATA["allele_encoding"]
-    args = GLOBAL_DATA["args"]
+    df = constant_data["train_data"]
+    folds_df = constant_data["folds_df"]
+    allele_encoding = constant_data["allele_encoding"]
+    args = constant_data["args"]
 
     if predictor is None:
         predictor = Class1AffinityPredictor(
diff --git a/setup.py b/setup.py
index 173eb5a3..1a0cde43 100644
--- a/setup.py
+++ b/setup.py
@@ -87,6 +87,8 @@ if __name__ == '__main__':
                     'mhcflurry.select_pan_allele_models_command:run',
                 'mhcflurry-calibrate-percentile-ranks = '
                     'mhcflurry.calibrate_percentile_ranks_command:run',
+                '_mhcflurry-cluster-worker-entry-point = '
+                    'mhcflurry.cluster_parallelism:worker_entry_point',
             ]
         },
         classifiers=[
diff --git a/test/test_train_pan_allele_models_command.py b/test/test_train_pan_allele_models_command.py
index f98599ff..1ecbe85f 100644
--- a/test/test_train_pan_allele_models_command.py
+++ b/test/test_train_pan_allele_models_command.py
@@ -101,7 +101,7 @@ HYPERPARAMETERS_LIST = [
 ][1:]
 
 
-def run_and_check(n_jobs=0, delete=True):
+def run_and_check(n_jobs=0, delete=True, additional_args=[]):
     models_dir = tempfile.mkdtemp(prefix="mhcflurry-test-models")
     hyperparameters_filename = os.path.join(
         models_dir, "hyperparameters.yaml")
@@ -125,7 +125,7 @@ def run_and_check(n_jobs=0, delete=True):
         "--verbosity", "1",
         "--pretrain-data", get_path(
             "random_peptide_predictions", "predictions.csv.bz2"),
-    ]
+    ] + additional_args
     print("Running with args: %s" % args)
     subprocess.check_call(args)
 
@@ -144,7 +144,7 @@ def run_and_check(n_jobs=0, delete=True):
         print("Deleting: %s" % models_dir)
         shutil.rmtree(models_dir)
 
-
+"""
 if os.environ.get("KERAS_BACKEND") != "theano":
     def test_run_parallel():
         run_and_check(n_jobs=1)
@@ -153,6 +153,14 @@ if os.environ.get("KERAS_BACKEND") != "theano":
 
 def test_run_serial():
     run_and_check(n_jobs=0)
+"""
+
+
+def test_run_cluster_parallelism():
+    run_and_check(n_jobs=0, additional_args=[
+        '--cluster-parallelism',
+        '--cluster-results-workdir', '/tmp/'
+    ])
 
 
 if __name__ == "__main__":
-- 
GitLab