From a9ec7e1cf2afb702114bac96703fb759cc7bd7cd Mon Sep 17 00:00:00 2001
From: Tim O'Donnell <timodonnell@gmail.com>
Date: Fri, 9 Feb 2018 19:14:52 -0500
Subject: [PATCH] better gpu/cpu allocation

---
 mhcflurry/common.py                           | 18 +++-
 .../train_allele_specific_models_command.py   | 83 +++++++++++--------
 2 files changed, 64 insertions(+), 37 deletions(-)

diff --git a/mhcflurry/common.py b/mhcflurry/common.py
index a699235f..3bd2390c 100644
--- a/mhcflurry/common.py
+++ b/mhcflurry/common.py
@@ -10,16 +10,28 @@ import pandas
 from . import amino_acid
 
 
-def set_keras_backend(backend):
+def set_keras_backend(backend=None, gpu_device_nums=None):
     """
     Configure Keras backend to use GPU or CPU. Only tensorflow is supported.
 
-    Must be called before Keras has been imported.
+    Parameters
+    ----------
+    backend : string, optional
+        one of 'tensorflow-default', 'tensorflow-cpu', 'tensorflow-gpu'
+
+    gpu_device_nums : list of int, optional
+        GPU devices to potentially use
 
-    backend must be 'tensorflow-cpu' or 'tensorflow-gpu'.
     """
     os.environ["KERAS_BACKEND"] = "tensorflow"
 
+    if not backend:
+        backend = "tensorflow-default"
+
+    if gpu_device_nums is not None:
+        os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(
+            [str(i) for i in gpu_device_nums])
+
     if backend == "tensorflow-cpu":
         print("Forcing tensorflow/CPU backend.")
         os.environ["CUDA_VISIBLE_DEVICES"] = ""
diff --git a/mhcflurry/train_allele_specific_models_command.py b/mhcflurry/train_allele_specific_models_command.py
index 07025dac..7d42c9fa 100644
--- a/mhcflurry/train_allele_specific_models_command.py
+++ b/mhcflurry/train_allele_specific_models_command.py
@@ -7,8 +7,7 @@ import signal
 import sys
 import time
 import traceback
-import itertools
-from multiprocessing import Pool, Queue
+from multiprocessing import Pool, Queue, cpu_count
 from functools import partial
 from pprint import pprint
 
@@ -123,6 +122,13 @@ parser.add_argument(
     metavar="N",
     help="Number of GPUs to attempt to parallelize across. Requires running "
     "in parallel.")
+parser.add_argument(
+    "--max-workers-per-gpu",
+    type=int,
+    metavar="N",
+    default=1000,
+    help="Maximum number of workers to assign to a GPU. Additional tasks will "
+    "run on CPU.")
 
 
 def run(argv=sys.argv[1:]):
@@ -186,32 +192,39 @@ def run(argv=sys.argv[1:]):
 
     else:
         # Parallel run.
-        env_queue = None
+        num_workers = args.num_jobs[0] if args.num_jobs[0] else cpu_count()
+
+        worker_init_args = None
         if args.gpus:
             print("Attempting to round-robin assign each worker a GPU.")
 
-            # We assign each worker to a GPU using the CUDA_VISIBLE_DEVICES
-            # environment variable. To do this, we push environment variables
-            # onto a queue. Each worker reads a single item from the queue,
-            # which is a list of environment variables to set.
-            cpus = 16
-            next_device = itertools.cycle([
-                "%d" % num for num in range(args.gpus)
-            ] + ["" for num in range(cpus)])
-            env_queue = Queue()
-            for num in range(args.num_jobs[0]):
-                item = [
-                    ("CUDA_VISIBLE_DEVICES", next(next_device)),
-                ]
-                env_queue.put(item)
+            gpu_assignments_remaining = dict((
+                (gpu, args.max_workers_per_gpu) for gpu in range(args.gpus)
+            ))
+            worker_init_args = Queue()
+            for worker_num in range(num_workers):
+                if gpu_assignments_remaining:
+                    # Use a GPU
+                    gpu_num = sorted(
+                        gpu_assignments_remaining,
+                        key=lambda key: gpu_assignments_remaining[key])[0]
+                    gpu_assignments_remaining[gpu_num] -= 1
+                    if not gpu_assignments_remaining[gpu_num]:
+                        del gpu_assignments_remaining[gpu_num]
+                else:
+                    # Use CPU
+                    gpu_assignment = []
+
+                worker_init_args.put({
+                    'gpu_device_nums': gpu_assignment,
+                    'keras_backend': args.backend
+                })
 
         worker_pool = Pool(
             initializer=worker_init,
-            initargs=(env_queue,),
-            processes=(
-                args.num_jobs[0]
-                if args.num_jobs[0] else None))
-        print("Using worker pool: %s" % str(worker_pool))
+            initargs=(worker_init_args,),
+            processes=num_workers)
+        print("Started pool of %d workers: %s" % (num_workers, str(worker_pool)))
 
     if args.out_models_dir and not os.path.exists(args.out_models_dir):
         print("Attempting to create directory: %s" % args.out_models_dir)
@@ -434,19 +447,21 @@ def calibrate_percentile_ranks(allele, predictor, peptides=None):
     }
 
 
-def worker_init(env_queue=None):
-    global GLOBAL_DATA
+def worker_init_entrypoint(arg_queue):
+    if arg_queue:
+        (args, kwargs) = arg_queue.get()
+    else:
+        args = []
+        kwargs = {}
+    worker_init(*args, **kwargs)
+
 
-    # The env_queue provides a way for each worker to be configured with a
-    # specific set of environment variables. We use it to assign GPUs to workers.
-    if env_queue:
-        settings = env_queue.get()
-        print("Setting: ", settings)
-        os.environ.update(settings)
-        
-    command_args = GLOBAL_DATA['args']
-    if command_args.backend:
-        set_keras_backend(command_args.backend)
+def worker_init(keras_backend=None, gpu_device_nums=None):
+    if keras_backend or gpu_device_nums:
+        print("WORKER pid=%d assigned GPU devices: %s" % (
+            os.getpgid()), gpu_device_nums)
+        set_keras_backend(
+            keras_backend, gpu_device_nums=gpu_device_nums)
 
 if __name__ == '__main__':
     run()
-- 
GitLab