Skip to content
Snippets Groups Projects
Commit a9ec7e1c authored by Tim O'Donnell's avatar Tim O'Donnell
Browse files

better gpu/cpu allocation

parent 4ea2b240
No related branches found
No related tags found
No related merge requests found
...@@ -10,16 +10,28 @@ import pandas ...@@ -10,16 +10,28 @@ import pandas
from . import amino_acid from . import amino_acid
def set_keras_backend(backend): def set_keras_backend(backend=None, gpu_device_nums=None):
""" """
Configure Keras backend to use GPU or CPU. Only tensorflow is supported. Configure Keras backend to use GPU or CPU. Only tensorflow is supported.
Must be called before Keras has been imported. Parameters
----------
backend : string, optional
one of 'tensorflow-default', 'tensorflow-cpu', 'tensorflow-gpu'
gpu_device_nums : list of int, optional
GPU devices to potentially use
backend must be 'tensorflow-cpu' or 'tensorflow-gpu'.
""" """
os.environ["KERAS_BACKEND"] = "tensorflow" os.environ["KERAS_BACKEND"] = "tensorflow"
if not backend:
backend = "tensorflow-default"
if gpu_device_nums is not None:
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(
[str(i) for i in gpu_device_nums])
if backend == "tensorflow-cpu": if backend == "tensorflow-cpu":
print("Forcing tensorflow/CPU backend.") print("Forcing tensorflow/CPU backend.")
os.environ["CUDA_VISIBLE_DEVICES"] = "" os.environ["CUDA_VISIBLE_DEVICES"] = ""
......
...@@ -7,8 +7,7 @@ import signal ...@@ -7,8 +7,7 @@ import signal
import sys import sys
import time import time
import traceback import traceback
import itertools from multiprocessing import Pool, Queue, cpu_count
from multiprocessing import Pool, Queue
from functools import partial from functools import partial
from pprint import pprint from pprint import pprint
...@@ -123,6 +122,13 @@ parser.add_argument( ...@@ -123,6 +122,13 @@ parser.add_argument(
metavar="N", metavar="N",
help="Number of GPUs to attempt to parallelize across. Requires running " help="Number of GPUs to attempt to parallelize across. Requires running "
"in parallel.") "in parallel.")
parser.add_argument(
"--max-workers-per-gpu",
type=int,
metavar="N",
default=1000,
help="Maximum number of workers to assign to a GPU. Additional tasks will "
"run on CPU.")
def run(argv=sys.argv[1:]): def run(argv=sys.argv[1:]):
...@@ -186,32 +192,39 @@ def run(argv=sys.argv[1:]): ...@@ -186,32 +192,39 @@ def run(argv=sys.argv[1:]):
else: else:
# Parallel run. # Parallel run.
env_queue = None num_workers = args.num_jobs[0] if args.num_jobs[0] else cpu_count()
worker_init_args = None
if args.gpus: if args.gpus:
print("Attempting to round-robin assign each worker a GPU.") print("Attempting to round-robin assign each worker a GPU.")
# We assign each worker to a GPU using the CUDA_VISIBLE_DEVICES gpu_assignments_remaining = dict((
# environment variable. To do this, we push environment variables (gpu, args.max_workers_per_gpu) for gpu in range(args.gpus)
# onto a queue. Each worker reads a single item from the queue, ))
# which is a list of environment variables to set. worker_init_args = Queue()
cpus = 16 for worker_num in range(num_workers):
next_device = itertools.cycle([ if gpu_assignments_remaining:
"%d" % num for num in range(args.gpus) # Use a GPU
] + ["" for num in range(cpus)]) gpu_num = sorted(
env_queue = Queue() gpu_assignments_remaining,
for num in range(args.num_jobs[0]): key=lambda key: gpu_assignments_remaining[key])[0]
item = [ gpu_assignments_remaining[gpu_num] -= 1
("CUDA_VISIBLE_DEVICES", next(next_device)), if not gpu_assignments_remaining[gpu_num]:
] del gpu_assignments_remaining[gpu_num]
env_queue.put(item) else:
# Use CPU
gpu_assignment = []
worker_init_args.put({
'gpu_device_nums': gpu_assignment,
'keras_backend': args.backend
})
worker_pool = Pool( worker_pool = Pool(
initializer=worker_init, initializer=worker_init,
initargs=(env_queue,), initargs=(worker_init_args,),
processes=( processes=num_workers)
args.num_jobs[0] print("Started pool of %d workers: %s" % (num_workers, str(worker_pool)))
if args.num_jobs[0] else None))
print("Using worker pool: %s" % str(worker_pool))
if args.out_models_dir and not os.path.exists(args.out_models_dir): if args.out_models_dir and not os.path.exists(args.out_models_dir):
print("Attempting to create directory: %s" % args.out_models_dir) print("Attempting to create directory: %s" % args.out_models_dir)
...@@ -434,19 +447,21 @@ def calibrate_percentile_ranks(allele, predictor, peptides=None): ...@@ -434,19 +447,21 @@ def calibrate_percentile_ranks(allele, predictor, peptides=None):
} }
def worker_init(env_queue=None): def worker_init_entrypoint(arg_queue):
global GLOBAL_DATA if arg_queue:
(args, kwargs) = arg_queue.get()
else:
args = []
kwargs = {}
worker_init(*args, **kwargs)
# The env_queue provides a way for each worker to be configured with a def worker_init(keras_backend=None, gpu_device_nums=None):
# specific set of environment variables. We use it to assign GPUs to workers. if keras_backend or gpu_device_nums:
if env_queue: print("WORKER pid=%d assigned GPU devices: %s" % (
settings = env_queue.get() os.getpgid()), gpu_device_nums)
print("Setting: ", settings) set_keras_backend(
os.environ.update(settings) keras_backend, gpu_device_nums=gpu_device_nums)
command_args = GLOBAL_DATA['args']
if command_args.backend:
set_keras_backend(command_args.backend)
if __name__ == '__main__': if __name__ == '__main__':
run() run()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment