From aa05cea933db0b5f70ba0d1e771a3b9b17dbdf7f Mon Sep 17 00:00:00 2001
From: Tim O'Donnell <timodonnell@gmail.com>
Date: Tue, 20 Feb 2018 00:03:17 -0500
Subject: [PATCH] factor out parallelism related argparse arguments

---
 .../models_class1_consensus/GENERATE.sh       |   9 +-
 .../calibrate_percentile_ranks_command.py     |  39 ++-----
 mhcflurry/parallelism.py                      | 101 ++++++++++++++++++
 .../select_allele_specific_models_command.py  |  27 ++---
 .../train_allele_specific_models_command.py   |  88 ++-------------
 5 files changed, 135 insertions(+), 129 deletions(-)

diff --git a/downloads-generation/models_class1_consensus/GENERATE.sh b/downloads-generation/models_class1_consensus/GENERATE.sh
index e4ded73d..f1db2b21 100755
--- a/downloads-generation/models_class1_consensus/GENERATE.sh
+++ b/downloads-generation/models_class1_consensus/GENERATE.sh
@@ -28,6 +28,9 @@ cd $SCRATCH_DIR/$DOWNLOAD_NAME
 
 mkdir models
 
+GPUS=$(nvidia-smi -L 2> /dev/null | wc -l) || GPUS=0
+echo "Detected GPUS: $GPUS"
+
 PROCESSORS=$(getconf _NPROCESSORS_ONLN)
 echo "Detected processors: $PROCESSORS"
 
@@ -35,12 +38,12 @@ time mhcflurry-class1-select-allele-specific-models \
     --models-dir "$(mhcflurry-downloads path models_class1_unselected)/models" \
     --out-models-dir models \
     --scoring consensus \
-    --num-jobs $(expr $PROCESSORS \* 2)
+    --num-jobs $(expr $PROCESSORS \* 2) --gpus $GPUS --max-workers-per-gpu 2 --max-tasks-per-worker 50
 
 time mhcflurry-calibrate-percentile-ranks \
     --models-dir models \
-    --num-jobs $(expr $PROCESSORS \* 2)
-    --num-peptides-per-length 100000
+    --num-peptides-per-length 100000 \
+    --num-jobs $(expr $PROCESSORS \* 2) --gpus $GPUS --max-workers-per-gpu 2 --max-tasks-per-worker 50
 
 cp $SCRIPT_ABSOLUTE_PATH .
 bzip2 LOG.txt
diff --git a/mhcflurry/calibrate_percentile_ranks_command.py b/mhcflurry/calibrate_percentile_ranks_command.py
index 073f7ff5..4a986e5c 100644
--- a/mhcflurry/calibrate_percentile_ranks_command.py
+++ b/mhcflurry/calibrate_percentile_ranks_command.py
@@ -21,7 +21,9 @@ tqdm.monitor_interval = 0  # see https://github.com/tqdm/tqdm/issues/481
 from .class1_affinity_predictor import Class1AffinityPredictor
 from .common import configure_logging
 from .parallelism import (
-    make_worker_pool, call_wrapped)
+    add_worker_pool_args,
+    worker_pool_with_gpu_assignments_from_args,
+    call_wrapped)
 
 
 # To avoid pickling large matrices to send to child processes when running in
@@ -33,7 +35,6 @@ GLOBAL_DATA = {}
 
 parser = argparse.ArgumentParser(usage=__doc__)
 
-
 parser.add_argument(
     "--models-dir",
     metavar="DIR",
@@ -52,26 +53,13 @@ parser.add_argument(
     default=int(1e5),
     help="Number of peptides per length to use to calibrate percent ranks. "
     "Default: %(default)s.")
-parser.add_argument(
-    "--num-jobs",
-    default=1,
-    type=int,
-    metavar="N",
-    help="Number of processes to parallelize over. "
-    "Set to 1 for serial run. Set to 0 to use number of cores. Default: %(default)s.")
-parser.add_argument(
-    "--max-tasks-per-worker",
-    type=int,
-    metavar="N",
-    default=None,
-    help="Restart workers after N tasks. Workaround for tensorflow memory "
-    "leaks. Requires Python >=3.2.")
 parser.add_argument(
     "--verbosity",
     type=int,
     help="Keras verbosity. Default: %(default)s",
     default=0)
 
+add_worker_pool_args(parser)
 
 def run(argv=sys.argv[1:]):
     global GLOBAL_DATA
@@ -109,10 +97,15 @@ def run(argv=sys.argv[1:]):
         time.time() - start))
     print("Calibrating percent rank calibration for %d alleles." % len(alleles))
 
-    if args.num_jobs == 1:
+    # Store peptides in global variable so they are in shared memory
+    # after fork, instead of needing to be pickled (when doing a parallel run).
+    GLOBAL_DATA["calibration_peptides"] = encoded_peptides
+
+    worker_pool = worker_pool_with_gpu_assignments_from_args(args)
+
+    if worker_pool is None:
         # Serial run
         print("Running in serial.")
-        worker_pool = None
         results = (
             calibrate_percentile_ranks(
                 allele=allele,
@@ -121,16 +114,6 @@ def run(argv=sys.argv[1:]):
             for allele in alleles)
     else:
         # Parallel run
-        # Store peptides in global variable so they are in shared memory
-        # after fork, instead of needing to be pickled.
-        GLOBAL_DATA["calibration_peptides"] = encoded_peptides
-
-        worker_pool = make_worker_pool(
-            processes=(
-                args.num_jobs
-                if args.num_jobs else None),
-            max_tasks_per_worker=args.max_tasks_per_worker)
-
         results = worker_pool.imap_unordered(
             partial(
                 partial(call_wrapped, calibrate_percentile_ranks),
diff --git a/mhcflurry/parallelism.py b/mhcflurry/parallelism.py
index 10da1f80..ccf9d005 100644
--- a/mhcflurry/parallelism.py
+++ b/mhcflurry/parallelism.py
@@ -1,5 +1,6 @@
 import traceback
 import sys
+import os
 from multiprocessing import Pool, Queue, cpu_count
 from six.moves import queue
 from multiprocessing.util import Finalize
@@ -11,6 +12,106 @@ import numpy
 from .common import set_keras_backend
 
 
+def add_worker_pool_args(parser):
+    group = parser.add_argument_group("Worker pool")
+
+    group.add_argument(
+        "--num-jobs",
+        default=1,
+        type=int,
+        metavar="N",
+        help="Number of processes to parallelize training over. Experimental. "
+             "Set to 1 for serial run. Set to 0 to use number of cores. Default: %(default)s.")
+    group.add_argument(
+        "--backend",
+        choices=("tensorflow-gpu", "tensorflow-cpu", "tensorflow-default"),
+        help="Keras backend. If not specified will use system default.")
+    group.add_argument(
+        "--gpus",
+        type=int,
+        metavar="N",
+        help="Number of GPUs to attempt to parallelize across. Requires running "
+             "in parallel.")
+    group.add_argument(
+        "--max-workers-per-gpu",
+        type=int,
+        metavar="N",
+        default=1000,
+        help="Maximum number of workers to assign to a GPU. Additional tasks will "
+             "run on CPU.")
+    group.add_argument(
+        "--max-tasks-per-worker",
+        type=int,
+        metavar="N",
+        default=None,
+        help="Restart workers after N tasks. Workaround for tensorflow memory "
+             "leaks. Requires Python >=3.2.")
+
+
+def worker_pool_with_gpu_assignments_from_args(args):
+    return worker_pool_with_gpu_assignments(
+        num_jobs=args.num_jobs,
+        num_gpus=args.gpus,
+        backend=args.backend,
+        max_workers_per_gpu=args.max_workers_per_gpu,
+        max_tasks_per_worker=args.max_tasks_per_worker
+    )
+
+
+def worker_pool_with_gpu_assignments(
+        num_jobs,
+        num_gpus=0,
+        backend=None,
+        max_workers_per_gpu=1,
+        max_tasks_per_worker=None):
+
+    num_workers = num_jobs if num_jobs else cpu_count()
+
+    if num_workers == 1:
+        if backend:
+            set_keras_backend(backend)
+        return None
+
+    worker_init_kwargs = None
+    if num_gpus:
+        print("Attempting to round-robin assign each worker a GPU.")
+        if backend != "tensorflow-default":
+            print("Forcing keras backend to be tensorflow-default")
+            backend = "tensorflow-default"
+
+        gpu_assignments_remaining = dict((
+            (gpu, max_workers_per_gpu) for gpu in range(num_gpus)
+        ))
+        worker_init_kwargs = []
+        for worker_num in range(num_workers):
+            if gpu_assignments_remaining:
+                # Use a GPU
+                gpu_num = sorted(
+                    gpu_assignments_remaining,
+                    key=lambda key: gpu_assignments_remaining[key])[0]
+                gpu_assignments_remaining[gpu_num] -= 1
+                if not gpu_assignments_remaining[gpu_num]:
+                    del gpu_assignments_remaining[gpu_num]
+                gpu_assignment = [gpu_num]
+            else:
+                # Use CPU
+                gpu_assignment = []
+
+            worker_init_kwargs.append({
+                'gpu_device_nums': gpu_assignment,
+                'keras_backend': backend
+            })
+            print("Worker %d assigned GPUs: %s" % (
+                worker_num, gpu_assignment))
+
+    worker_pool = make_worker_pool(
+        processes=num_workers,
+        initializer=worker_init,
+        initializer_kwargs_per_process=worker_init_kwargs,
+        max_tasks_per_worker=max_tasks_per_worker)
+    return worker_pool
+
+
 def make_worker_pool(
         processes=None,
         initializer=None,
diff --git a/mhcflurry/select_allele_specific_models_command.py b/mhcflurry/select_allele_specific_models_command.py
index e374b050..722c82ab 100644
--- a/mhcflurry/select_allele_specific_models_command.py
+++ b/mhcflurry/select_allele_specific_models_command.py
@@ -20,7 +20,7 @@ tqdm.monitor_interval = 0  # see https://github.com/tqdm/tqdm/issues/481
 from .class1_affinity_predictor import Class1AffinityPredictor
 from .encodable_sequences import EncodableSequences
 from .common import configure_logging, random_peptides
-from .parallelism import make_worker_pool
+from .parallelism import worker_pool_with_gpu_assignments_from_args, add_worker_pool_args
 from .regression_target import from_ic50
 
 
@@ -93,23 +93,14 @@ parser.add_argument(
     type=int,
     default=100000,
     help="Num peptides per length to use for consensus scoring")
-parser.add_argument(
-    "--num-jobs",
-    default=1,
-    type=int,
-    metavar="N",
-    help="Number of processes to parallelize selection over. "
-    "Set to 1 for serial run. Set to 0 to use number of cores. Default: %(default)s.")
-parser.add_argument(
-    "--backend",
-    choices=("tensorflow-gpu", "tensorflow-cpu", "tensorflow-default"),
-    help="Keras backend. If not specified will use system default.")
 parser.add_argument(
     "--verbosity",
     type=int,
     help="Keras verbosity. Default: %(default)s",
     default=0)
 
+add_worker_pool_args(parser)
+
 
 def run(argv=sys.argv[1:]):
     global GLOBAL_DATA
@@ -208,19 +199,17 @@ def run(argv=sys.argv[1:]):
     metadata_dfs["model_selection_data"] = df
     result_predictor = Class1AffinityPredictor(metadata_dataframes=metadata_dfs)
 
+    worker_pool = worker_pool_with_gpu_assignments_from_args(args)
+
     start = time.time()
-    if args.num_jobs == 1:
+
+    if worker_pool is None:
         # Serial run
         print("Running in serial.")
-        worker_pool = None
         results = (
             model_select(allele) for allele in alleles)
     else:
-        worker_pool = make_worker_pool(
-            processes=(
-                args.num_jobs
-                if args.num_jobs else None))
-
+        # Parallel run
         random.shuffle(alleles)
         results = worker_pool.imap_unordered(
             model_select,
diff --git a/mhcflurry/train_allele_specific_models_command.py b/mhcflurry/train_allele_specific_models_command.py
index d3d2adfc..fdc1ae22 100644
--- a/mhcflurry/train_allele_specific_models_command.py
+++ b/mhcflurry/train_allele_specific_models_command.py
@@ -21,7 +21,9 @@ tqdm.monitor_interval = 0  # see https://github.com/tqdm/tqdm/issues/481
 from .class1_affinity_predictor import Class1AffinityPredictor
 from .common import configure_logging, set_keras_backend
 from .parallelism import (
-    make_worker_pool, cpu_count, call_wrapped_kwargs, worker_init)
+    add_worker_pool_args,
+    worker_pool_with_gpu_assignments_from_args,
+    call_wrapped_kwargs)
 from .hyperparameters import HyperparameterDefaults
 from .allele_encoding import AlleleEncoding
 
@@ -58,12 +60,14 @@ parser.add_argument(
     metavar="FILE.json",
     required=True,
     help="JSON or YAML of hyperparameters")
+
 parser.add_argument(
     "--allele",
     default=None,
     nargs="+",
     help="Alleles to train models for. If not specified, all alleles with "
     "enough measurements will be used.")
+
 parser.add_argument(
     "--min-measurements-per-allele",
     type=int,
@@ -76,7 +80,7 @@ parser.add_argument(
     metavar="N",
     default=None,
     help="Hold out 1/N fraction of data (for e.g. subsequent model selection. "
-    "For example, specify 5 to hold out 20% of the data.")
+    "For example, specify 5 to hold out 20 percent of the data.")
 parser.add_argument(
     "--held-out-fraction-seed",
     type=int,
@@ -106,30 +110,6 @@ parser.add_argument(
     "--allele-sequences",
     metavar="FILE.csv",
     help="Allele sequences file. Used for computing allele similarity matrix.")
-parser.add_argument(
-    "--num-jobs",
-    default=1,
-    type=int,
-    metavar="N",
-    help="Number of processes to parallelize training over. Experimental. "
-    "Set to 1 for serial run. Set to 0 to use number of cores. Default: %(default)s.")
-parser.add_argument(
-    "--backend",
-    choices=("tensorflow-gpu", "tensorflow-cpu", "tensorflow-default"),
-    help="Keras backend. If not specified will use system default.")
-parser.add_argument(
-    "--gpus",
-    type=int,
-    metavar="N",
-    help="Number of GPUs to attempt to parallelize across. Requires running "
-    "in parallel.")
-parser.add_argument(
-    "--max-workers-per-gpu",
-    type=int,
-    metavar="N",
-    default=1000,
-    help="Maximum number of workers to assign to a GPU. Additional tasks will "
-    "run on CPU.")
 parser.add_argument(
     "--save-interval",
     type=float,
@@ -137,19 +117,13 @@ parser.add_argument(
     default=60,
     help="Write models to disk every N seconds. Only affects parallel runs; "
     "serial runs write each model to disk as it is trained.")
-parser.add_argument(
-    "--max-tasks-per-worker",
-    type=int,
-    metavar="N",
-    default=None,
-    help="Restart workers after N tasks. Workaround for tensorflow memory "
-    "leaks. Requires Python >=3.2.")
 parser.add_argument(
     "--verbosity",
     type=int,
     help="Keras verbosity. Default: %(default)s",
     default=0)
 
+add_worker_pool_args(parser)
 
 TRAIN_DATA_HYPERPARAMETER_DEFAULTS = HyperparameterDefaults(
     subset="all",
@@ -286,52 +260,8 @@ def run(argv=sys.argv[1:]):
                 work_items.append(work_dict)
 
     start = time.time()
-    if serial_run:
-        # Serial run.
-        print("Running in serial.")
-        worker_pool = None
-        if args.backend:
-            set_keras_backend(args.backend)
-    else:
-        # Parallel run.
-        num_workers = args.num_jobs if args.num_jobs else cpu_count()
-        worker_init_kwargs = None
-        if args.gpus:
-            print("Attempting to round-robin assign each worker a GPU.")
-            if args.backend != "tensorflow-default":
-                print("Forcing keras backend to be tensorflow-default")
-                args.backend = "tensorflow-default"
-
-            gpu_assignments_remaining = dict((
-                (gpu, args.max_workers_per_gpu) for gpu in range(args.gpus)
-            ))
-            worker_init_kwargs = []
-            for worker_num in range(num_workers):
-                if gpu_assignments_remaining:
-                    # Use a GPU
-                    gpu_num = sorted(
-                        gpu_assignments_remaining,
-                        key=lambda key: gpu_assignments_remaining[key])[0]
-                    gpu_assignments_remaining[gpu_num] -= 1
-                    if not gpu_assignments_remaining[gpu_num]:
-                        del gpu_assignments_remaining[gpu_num]
-                    gpu_assignment = [gpu_num]
-                else:
-                    # Use CPU
-                    gpu_assignment = []
-
-                worker_init_kwargs.append({
-                    'gpu_device_nums': gpu_assignment,
-                    'keras_backend': args.backend
-                })
-                print("Worker %d assigned GPUs: %s" % (
-                    worker_num, gpu_assignment))
-
-        worker_pool = make_worker_pool(
-            processes=num_workers,
-            initializer=worker_init,
-            initializer_kwargs_per_process=worker_init_kwargs,
-            max_tasks_per_worker=args.max_tasks_per_worker)
+
+    worker_pool = worker_pool_with_gpu_assignments_from_args(args)
 
     if worker_pool:
         print("Processing %d work items in parallel." % len(work_items))
-- 
GitLab