Skip to content
Snippets Groups Projects
Commit aa05cea9 authored by Tim O'Donnell's avatar Tim O'Donnell
Browse files

factor out parallelism related argparse arguments

parent 7dae2aad
No related branches found
No related tags found
No related merge requests found
...@@ -28,6 +28,9 @@ cd $SCRATCH_DIR/$DOWNLOAD_NAME ...@@ -28,6 +28,9 @@ cd $SCRATCH_DIR/$DOWNLOAD_NAME
mkdir models mkdir models
GPUS=$(nvidia-smi -L 2> /dev/null | wc -l) || GPUS=0
echo "Detected GPUS: $GPUS"
PROCESSORS=$(getconf _NPROCESSORS_ONLN) PROCESSORS=$(getconf _NPROCESSORS_ONLN)
echo "Detected processors: $PROCESSORS" echo "Detected processors: $PROCESSORS"
...@@ -35,12 +38,12 @@ time mhcflurry-class1-select-allele-specific-models \ ...@@ -35,12 +38,12 @@ time mhcflurry-class1-select-allele-specific-models \
--models-dir "$(mhcflurry-downloads path models_class1_unselected)/models" \ --models-dir "$(mhcflurry-downloads path models_class1_unselected)/models" \
--out-models-dir models \ --out-models-dir models \
--scoring consensus \ --scoring consensus \
--num-jobs $(expr $PROCESSORS \* 2) --num-jobs $(expr $PROCESSORS \* 2) --gpus $GPUS --max-workers-per-gpu 2 --max-tasks-per-worker 50
time mhcflurry-calibrate-percentile-ranks \ time mhcflurry-calibrate-percentile-ranks \
--models-dir models \ --models-dir models \
--num-jobs $(expr $PROCESSORS \* 2) --num-peptides-per-length 100000 \
--num-peptides-per-length 100000 --num-jobs $(expr $PROCESSORS \* 2) --gpus $GPUS --max-workers-per-gpu 2 --max-tasks-per-worker 50
cp $SCRIPT_ABSOLUTE_PATH . cp $SCRIPT_ABSOLUTE_PATH .
bzip2 LOG.txt bzip2 LOG.txt
......
...@@ -21,7 +21,9 @@ tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481 ...@@ -21,7 +21,9 @@ tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481
from .class1_affinity_predictor import Class1AffinityPredictor from .class1_affinity_predictor import Class1AffinityPredictor
from .common import configure_logging from .common import configure_logging
from .parallelism import ( from .parallelism import (
make_worker_pool, call_wrapped) add_worker_pool_args,
worker_pool_with_gpu_assignments_from_args,
call_wrapped)
# To avoid pickling large matrices to send to child processes when running in # To avoid pickling large matrices to send to child processes when running in
...@@ -33,7 +35,6 @@ GLOBAL_DATA = {} ...@@ -33,7 +35,6 @@ GLOBAL_DATA = {}
parser = argparse.ArgumentParser(usage=__doc__) parser = argparse.ArgumentParser(usage=__doc__)
parser.add_argument( parser.add_argument(
"--models-dir", "--models-dir",
metavar="DIR", metavar="DIR",
...@@ -52,26 +53,13 @@ parser.add_argument( ...@@ -52,26 +53,13 @@ parser.add_argument(
default=int(1e5), default=int(1e5),
help="Number of peptides per length to use to calibrate percent ranks. " help="Number of peptides per length to use to calibrate percent ranks. "
"Default: %(default)s.") "Default: %(default)s.")
parser.add_argument(
"--num-jobs",
default=1,
type=int,
metavar="N",
help="Number of processes to parallelize over. "
"Set to 1 for serial run. Set to 0 to use number of cores. Default: %(default)s.")
parser.add_argument(
"--max-tasks-per-worker",
type=int,
metavar="N",
default=None,
help="Restart workers after N tasks. Workaround for tensorflow memory "
"leaks. Requires Python >=3.2.")
parser.add_argument( parser.add_argument(
"--verbosity", "--verbosity",
type=int, type=int,
help="Keras verbosity. Default: %(default)s", help="Keras verbosity. Default: %(default)s",
default=0) default=0)
add_worker_pool_args(parser)
def run(argv=sys.argv[1:]): def run(argv=sys.argv[1:]):
global GLOBAL_DATA global GLOBAL_DATA
...@@ -109,10 +97,15 @@ def run(argv=sys.argv[1:]): ...@@ -109,10 +97,15 @@ def run(argv=sys.argv[1:]):
time.time() - start)) time.time() - start))
print("Calibrating percent rank calibration for %d alleles." % len(alleles)) print("Calibrating percent rank calibration for %d alleles." % len(alleles))
if args.num_jobs == 1: # Store peptides in global variable so they are in shared memory
# after fork, instead of needing to be pickled (when doing a parallel run).
GLOBAL_DATA["calibration_peptides"] = encoded_peptides
worker_pool = worker_pool_with_gpu_assignments_from_args(args)
if worker_pool is None:
# Serial run # Serial run
print("Running in serial.") print("Running in serial.")
worker_pool = None
results = ( results = (
calibrate_percentile_ranks( calibrate_percentile_ranks(
allele=allele, allele=allele,
...@@ -121,16 +114,6 @@ def run(argv=sys.argv[1:]): ...@@ -121,16 +114,6 @@ def run(argv=sys.argv[1:]):
for allele in alleles) for allele in alleles)
else: else:
# Parallel run # Parallel run
# Store peptides in global variable so they are in shared memory
# after fork, instead of needing to be pickled.
GLOBAL_DATA["calibration_peptides"] = encoded_peptides
worker_pool = make_worker_pool(
processes=(
args.num_jobs
if args.num_jobs else None),
max_tasks_per_worker=args.max_tasks_per_worker)
results = worker_pool.imap_unordered( results = worker_pool.imap_unordered(
partial( partial(
partial(call_wrapped, calibrate_percentile_ranks), partial(call_wrapped, calibrate_percentile_ranks),
......
import traceback import traceback
import sys import sys
import os
from multiprocessing import Pool, Queue, cpu_count from multiprocessing import Pool, Queue, cpu_count
from six.moves import queue from six.moves import queue
from multiprocessing.util import Finalize from multiprocessing.util import Finalize
...@@ -11,6 +12,106 @@ import numpy ...@@ -11,6 +12,106 @@ import numpy
from .common import set_keras_backend from .common import set_keras_backend
def add_worker_pool_args(parser):
group = parser.add_argument_group("Worker pool")
group.add_argument(
"--num-jobs",
default=1,
type=int,
metavar="N",
help="Number of processes to parallelize training over. Experimental. "
"Set to 1 for serial run. Set to 0 to use number of cores. Default: %(default)s.")
group.add_argument(
"--backend",
choices=("tensorflow-gpu", "tensorflow-cpu", "tensorflow-default"),
help="Keras backend. If not specified will use system default.")
group.add_argument(
"--gpus",
type=int,
metavar="N",
help="Number of GPUs to attempt to parallelize across. Requires running "
"in parallel.")
group.add_argument(
"--max-workers-per-gpu",
type=int,
metavar="N",
default=1000,
help="Maximum number of workers to assign to a GPU. Additional tasks will "
"run on CPU.")
group.add_argument(
"--max-tasks-per-worker",
type=int,
metavar="N",
default=None,
help="Restart workers after N tasks. Workaround for tensorflow memory "
"leaks. Requires Python >=3.2.")
def worker_pool_with_gpu_assignments_from_args(args):
return worker_pool_with_gpu_assignments(
num_jobs=args.num_jobs,
num_gpus=args.gpus,
backend=args.backend,
max_workers_per_gpu=args.max_workers_per_gpu,
max_tasks_per_worker=args.max_tasks_per_worker
)
def worker_pool_with_gpu_assignments(
num_jobs,
num_gpus=0,
backend=None,
max_workers_per_gpu=1,
max_tasks_per_worker=None):
num_workers = num_jobs if num_jobs else cpu_count()
if num_workers == 1:
if backend:
set_keras_backend(backend)
return None
worker_init_kwargs = None
if num_gpus:
print("Attempting to round-robin assign each worker a GPU.")
if backend != "tensorflow-default":
print("Forcing keras backend to be tensorflow-default")
backend = "tensorflow-default"
gpu_assignments_remaining = dict((
(gpu, max_workers_per_gpu) for gpu in range(num_gpus)
))
worker_init_kwargs = []
for worker_num in range(num_workers):
if gpu_assignments_remaining:
# Use a GPU
gpu_num = sorted(
gpu_assignments_remaining,
key=lambda key: gpu_assignments_remaining[key])[0]
gpu_assignments_remaining[gpu_num] -= 1
if not gpu_assignments_remaining[gpu_num]:
del gpu_assignments_remaining[gpu_num]
gpu_assignment = [gpu_num]
else:
# Use CPU
gpu_assignment = []
worker_init_kwargs.append({
'gpu_device_nums': gpu_assignment,
'keras_backend': backend
})
print("Worker %d assigned GPUs: %s" % (
worker_num, gpu_assignment))
worker_pool = make_worker_pool(
processes=num_workers,
initializer=worker_init,
initializer_kwargs_per_process=worker_init_kwargs,
max_tasks_per_worker=max_tasks_per_worker)
return worker_pool
def make_worker_pool( def make_worker_pool(
processes=None, processes=None,
initializer=None, initializer=None,
......
...@@ -20,7 +20,7 @@ tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481 ...@@ -20,7 +20,7 @@ tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481
from .class1_affinity_predictor import Class1AffinityPredictor from .class1_affinity_predictor import Class1AffinityPredictor
from .encodable_sequences import EncodableSequences from .encodable_sequences import EncodableSequences
from .common import configure_logging, random_peptides from .common import configure_logging, random_peptides
from .parallelism import make_worker_pool from .parallelism import worker_pool_with_gpu_assignments_from_args, add_worker_pool_args
from .regression_target import from_ic50 from .regression_target import from_ic50
...@@ -93,23 +93,14 @@ parser.add_argument( ...@@ -93,23 +93,14 @@ parser.add_argument(
type=int, type=int,
default=100000, default=100000,
help="Num peptides per length to use for consensus scoring") help="Num peptides per length to use for consensus scoring")
parser.add_argument(
"--num-jobs",
default=1,
type=int,
metavar="N",
help="Number of processes to parallelize selection over. "
"Set to 1 for serial run. Set to 0 to use number of cores. Default: %(default)s.")
parser.add_argument(
"--backend",
choices=("tensorflow-gpu", "tensorflow-cpu", "tensorflow-default"),
help="Keras backend. If not specified will use system default.")
parser.add_argument( parser.add_argument(
"--verbosity", "--verbosity",
type=int, type=int,
help="Keras verbosity. Default: %(default)s", help="Keras verbosity. Default: %(default)s",
default=0) default=0)
add_worker_pool_args(parser)
def run(argv=sys.argv[1:]): def run(argv=sys.argv[1:]):
global GLOBAL_DATA global GLOBAL_DATA
...@@ -208,19 +199,17 @@ def run(argv=sys.argv[1:]): ...@@ -208,19 +199,17 @@ def run(argv=sys.argv[1:]):
metadata_dfs["model_selection_data"] = df metadata_dfs["model_selection_data"] = df
result_predictor = Class1AffinityPredictor(metadata_dataframes=metadata_dfs) result_predictor = Class1AffinityPredictor(metadata_dataframes=metadata_dfs)
worker_pool = worker_pool_with_gpu_assignments_from_args(args)
start = time.time() start = time.time()
if args.num_jobs == 1:
if worker_pool is None:
# Serial run # Serial run
print("Running in serial.") print("Running in serial.")
worker_pool = None
results = ( results = (
model_select(allele) for allele in alleles) model_select(allele) for allele in alleles)
else: else:
worker_pool = make_worker_pool( # Parallel run
processes=(
args.num_jobs
if args.num_jobs else None))
random.shuffle(alleles) random.shuffle(alleles)
results = worker_pool.imap_unordered( results = worker_pool.imap_unordered(
model_select, model_select,
......
...@@ -21,7 +21,9 @@ tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481 ...@@ -21,7 +21,9 @@ tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481
from .class1_affinity_predictor import Class1AffinityPredictor from .class1_affinity_predictor import Class1AffinityPredictor
from .common import configure_logging, set_keras_backend from .common import configure_logging, set_keras_backend
from .parallelism import ( from .parallelism import (
make_worker_pool, cpu_count, call_wrapped_kwargs, worker_init) add_worker_pool_args,
worker_pool_with_gpu_assignments_from_args,
call_wrapped_kwargs)
from .hyperparameters import HyperparameterDefaults from .hyperparameters import HyperparameterDefaults
from .allele_encoding import AlleleEncoding from .allele_encoding import AlleleEncoding
...@@ -58,12 +60,14 @@ parser.add_argument( ...@@ -58,12 +60,14 @@ parser.add_argument(
metavar="FILE.json", metavar="FILE.json",
required=True, required=True,
help="JSON or YAML of hyperparameters") help="JSON or YAML of hyperparameters")
parser.add_argument( parser.add_argument(
"--allele", "--allele",
default=None, default=None,
nargs="+", nargs="+",
help="Alleles to train models for. If not specified, all alleles with " help="Alleles to train models for. If not specified, all alleles with "
"enough measurements will be used.") "enough measurements will be used.")
parser.add_argument( parser.add_argument(
"--min-measurements-per-allele", "--min-measurements-per-allele",
type=int, type=int,
...@@ -76,7 +80,7 @@ parser.add_argument( ...@@ -76,7 +80,7 @@ parser.add_argument(
metavar="N", metavar="N",
default=None, default=None,
help="Hold out 1/N fraction of data (for e.g. subsequent model selection. " help="Hold out 1/N fraction of data (for e.g. subsequent model selection. "
"For example, specify 5 to hold out 20% of the data.") "For example, specify 5 to hold out 20 percent of the data.")
parser.add_argument( parser.add_argument(
"--held-out-fraction-seed", "--held-out-fraction-seed",
type=int, type=int,
...@@ -106,30 +110,6 @@ parser.add_argument( ...@@ -106,30 +110,6 @@ parser.add_argument(
"--allele-sequences", "--allele-sequences",
metavar="FILE.csv", metavar="FILE.csv",
help="Allele sequences file. Used for computing allele similarity matrix.") help="Allele sequences file. Used for computing allele similarity matrix.")
parser.add_argument(
"--num-jobs",
default=1,
type=int,
metavar="N",
help="Number of processes to parallelize training over. Experimental. "
"Set to 1 for serial run. Set to 0 to use number of cores. Default: %(default)s.")
parser.add_argument(
"--backend",
choices=("tensorflow-gpu", "tensorflow-cpu", "tensorflow-default"),
help="Keras backend. If not specified will use system default.")
parser.add_argument(
"--gpus",
type=int,
metavar="N",
help="Number of GPUs to attempt to parallelize across. Requires running "
"in parallel.")
parser.add_argument(
"--max-workers-per-gpu",
type=int,
metavar="N",
default=1000,
help="Maximum number of workers to assign to a GPU. Additional tasks will "
"run on CPU.")
parser.add_argument( parser.add_argument(
"--save-interval", "--save-interval",
type=float, type=float,
...@@ -137,19 +117,13 @@ parser.add_argument( ...@@ -137,19 +117,13 @@ parser.add_argument(
default=60, default=60,
help="Write models to disk every N seconds. Only affects parallel runs; " help="Write models to disk every N seconds. Only affects parallel runs; "
"serial runs write each model to disk as it is trained.") "serial runs write each model to disk as it is trained.")
parser.add_argument(
"--max-tasks-per-worker",
type=int,
metavar="N",
default=None,
help="Restart workers after N tasks. Workaround for tensorflow memory "
"leaks. Requires Python >=3.2.")
parser.add_argument( parser.add_argument(
"--verbosity", "--verbosity",
type=int, type=int,
help="Keras verbosity. Default: %(default)s", help="Keras verbosity. Default: %(default)s",
default=0) default=0)
add_worker_pool_args(parser)
TRAIN_DATA_HYPERPARAMETER_DEFAULTS = HyperparameterDefaults( TRAIN_DATA_HYPERPARAMETER_DEFAULTS = HyperparameterDefaults(
subset="all", subset="all",
...@@ -286,52 +260,8 @@ def run(argv=sys.argv[1:]): ...@@ -286,52 +260,8 @@ def run(argv=sys.argv[1:]):
work_items.append(work_dict) work_items.append(work_dict)
start = time.time() start = time.time()
if serial_run:
# Serial run. worker_pool = worker_pool_with_gpu_assignments_from_args(args)
print("Running in serial.")
worker_pool = None
if args.backend:
set_keras_backend(args.backend)
else:
# Parallel run.
num_workers = args.num_jobs if args.num_jobs else cpu_count()
worker_init_kwargs = None
if args.gpus:
print("Attempting to round-robin assign each worker a GPU.")
if args.backend != "tensorflow-default":
print("Forcing keras backend to be tensorflow-default")
args.backend = "tensorflow-default"
gpu_assignments_remaining = dict((
(gpu, args.max_workers_per_gpu) for gpu in range(args.gpus)
))
worker_init_kwargs = []
for worker_num in range(num_workers):
if gpu_assignments_remaining:
# Use a GPU
gpu_num = sorted(
gpu_assignments_remaining,
key=lambda key: gpu_assignments_remaining[key])[0]
gpu_assignments_remaining[gpu_num] -= 1
if not gpu_assignments_remaining[gpu_num]:
del gpu_assignments_remaining[gpu_num]
gpu_assignment = [gpu_num]
else:
# Use CPU
gpu_assignment = []
worker_init_kwargs.append({
'gpu_device_nums': gpu_assignment,
'keras_backend': args.backend
})
print("Worker %d assigned GPUs: %s" % (
worker_num, gpu_assignment))
worker_pool = make_worker_pool(
processes=num_workers,
initializer=worker_init,
initializer_kwargs_per_process=worker_init_kwargs,
max_tasks_per_worker=args.max_tasks_per_worker)
if worker_pool: if worker_pool:
print("Processing %d work items in parallel." % len(work_items)) print("Processing %d work items in parallel." % len(work_items))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment