diff --git a/mhcflurry/common.py b/mhcflurry/common.py index a699235f711577650e1939afe2bbaf39f5cd37ed..3bd2390cf2d77240aeb8626abd3007b37911fffa 100644 --- a/mhcflurry/common.py +++ b/mhcflurry/common.py @@ -10,16 +10,28 @@ import pandas from . import amino_acid -def set_keras_backend(backend): +def set_keras_backend(backend=None, gpu_device_nums=None): """ Configure Keras backend to use GPU or CPU. Only tensorflow is supported. - Must be called before Keras has been imported. + Parameters + ---------- + backend : string, optional + one of 'tensorflow-default', 'tensorflow-cpu', 'tensorflow-gpu' + + gpu_device_nums : list of int, optional + GPU devices to potentially use - backend must be 'tensorflow-cpu' or 'tensorflow-gpu'. """ os.environ["KERAS_BACKEND"] = "tensorflow" + if not backend: + backend = "tensorflow-default" + + if gpu_device_nums is not None: + os.environ["CUDA_VISIBLE_DEVICES"] = ",".join( + [str(i) for i in gpu_device_nums]) + if backend == "tensorflow-cpu": print("Forcing tensorflow/CPU backend.") os.environ["CUDA_VISIBLE_DEVICES"] = "" diff --git a/mhcflurry/train_allele_specific_models_command.py b/mhcflurry/train_allele_specific_models_command.py index 07025dac74db2dfbc9f0c4520d3aaa3281778b74..7d42c9faa8745c4307f9438a2945342d2a02c107 100644 --- a/mhcflurry/train_allele_specific_models_command.py +++ b/mhcflurry/train_allele_specific_models_command.py @@ -7,8 +7,7 @@ import signal import sys import time import traceback -import itertools -from multiprocessing import Pool, Queue +from multiprocessing import Pool, Queue, cpu_count from functools import partial from pprint import pprint @@ -123,6 +122,13 @@ parser.add_argument( metavar="N", help="Number of GPUs to attempt to parallelize across. Requires running " "in parallel.") +parser.add_argument( + "--max-workers-per-gpu", + type=int, + metavar="N", + default=1000, + help="Maximum number of workers to assign to a GPU. Additional tasks will " + "run on CPU.") def run(argv=sys.argv[1:]): @@ -186,32 +192,39 @@ def run(argv=sys.argv[1:]): else: # Parallel run. - env_queue = None + num_workers = args.num_jobs[0] if args.num_jobs[0] else cpu_count() + + worker_init_args = None if args.gpus: print("Attempting to round-robin assign each worker a GPU.") - # We assign each worker to a GPU using the CUDA_VISIBLE_DEVICES - # environment variable. To do this, we push environment variables - # onto a queue. Each worker reads a single item from the queue, - # which is a list of environment variables to set. - cpus = 16 - next_device = itertools.cycle([ - "%d" % num for num in range(args.gpus) - ] + ["" for num in range(cpus)]) - env_queue = Queue() - for num in range(args.num_jobs[0]): - item = [ - ("CUDA_VISIBLE_DEVICES", next(next_device)), - ] - env_queue.put(item) + gpu_assignments_remaining = dict(( + (gpu, args.max_workers_per_gpu) for gpu in range(args.gpus) + )) + worker_init_args = Queue() + for worker_num in range(num_workers): + if gpu_assignments_remaining: + # Use a GPU + gpu_num = sorted( + gpu_assignments_remaining, + key=lambda key: gpu_assignments_remaining[key])[0] + gpu_assignments_remaining[gpu_num] -= 1 + if not gpu_assignments_remaining[gpu_num]: + del gpu_assignments_remaining[gpu_num] + else: + # Use CPU + gpu_assignment = [] + + worker_init_args.put({ + 'gpu_device_nums': gpu_assignment, + 'keras_backend': args.backend + }) worker_pool = Pool( initializer=worker_init, - initargs=(env_queue,), - processes=( - args.num_jobs[0] - if args.num_jobs[0] else None)) - print("Using worker pool: %s" % str(worker_pool)) + initargs=(worker_init_args,), + processes=num_workers) + print("Started pool of %d workers: %s" % (num_workers, str(worker_pool))) if args.out_models_dir and not os.path.exists(args.out_models_dir): print("Attempting to create directory: %s" % args.out_models_dir) @@ -434,19 +447,21 @@ def calibrate_percentile_ranks(allele, predictor, peptides=None): } -def worker_init(env_queue=None): - global GLOBAL_DATA +def worker_init_entrypoint(arg_queue): + if arg_queue: + (args, kwargs) = arg_queue.get() + else: + args = [] + kwargs = {} + worker_init(*args, **kwargs) + - # The env_queue provides a way for each worker to be configured with a - # specific set of environment variables. We use it to assign GPUs to workers. - if env_queue: - settings = env_queue.get() - print("Setting: ", settings) - os.environ.update(settings) - - command_args = GLOBAL_DATA['args'] - if command_args.backend: - set_keras_backend(command_args.backend) +def worker_init(keras_backend=None, gpu_device_nums=None): + if keras_backend or gpu_device_nums: + print("WORKER pid=%d assigned GPU devices: %s" % ( + os.getpgid()), gpu_device_nums) + set_keras_backend( + keras_backend, gpu_device_nums=gpu_device_nums) if __name__ == '__main__': run()