Skip to content
Snippets Groups Projects
Commit 20de13c4 authored by Tim O'Donnell's avatar Tim O'Donnell
Browse files

Switch off joblib in favor of concurrent.futures

Fixes #59
parent 573c0736
No related merge requests found
......@@ -18,8 +18,6 @@ from .predict import predict
from .package_metadata import __version__
from . import parallelism
parallelism.configure_joblib()
__all__ = [
"Class1BindingPredictor",
"predict",
......
......@@ -20,11 +20,11 @@ from __future__ import (
import collections
import logging
from joblib import Parallel, delayed
import pepdata
from .train import impute_and_select_allele, AlleleSpecificTrainTestFold
from ..parallelism import get_default_executor
gbmr4_transformer = pepdata.reduced_alphabet.make_alphabet_transformer("gbmr4")
......@@ -100,9 +100,7 @@ def cross_validation_folds(
'min_observations_per_peptide': 2,
'min_observations_per_allele': 2,
},
n_jobs=1,
verbose=0,
pre_dispatch='2*n_jobs'):
executor=None):
'''
Split a Dataset into n_folds cross validation folds for each allele,
optionally performing imputation.
......@@ -133,26 +131,18 @@ def cross_validation_folds(
The number of jobs to run in parallel. If -1, then the number of jobs
is set to the number of cores.
verbose : integer, optional
The joblib verbosity. If non zero, progress messages are printed. Above
50, the output is sent to stdout. The frequency of the messages
increases with the verbosity level. If it more than 10, all iterations
are reported.
pre_dispatch : {"all", integer, or expression, as in "3*n_jobs"}
The number of joblib batches (of tasks) to be pre-dispatched. Default
is "2*n_jobs".
Returns
-----------
list of AlleleSpecificTrainTestFold of length num alleles * n_folds
'''
if executor is None:
executor = get_default_executor()
if alleles is None:
alleles = train_data.unique_alleles()
result = []
imputation_tasks = []
result_folds = []
for allele in alleles:
logging.info("Allele: %s" % allele)
cv_iter = train_data.cross_validation_iterator(
......@@ -176,31 +166,27 @@ def cross_validation_folds(
test_split = full_test_split
if imputer is not None:
imputation_tasks.append(delayed(impute_and_select_allele)(
imputation_future = executor.submit(
impute_and_select_allele,
all_allele_train_split,
imputer=imputer,
allele=allele,
**impute_kwargs))
**impute_kwargs)
else:
imputation_future = None
train_split = all_allele_train_split.get_allele(allele)
fold = AlleleSpecificTrainTestFold(
allele=allele,
train=train_split,
imputed_train=None,
imputed_train=imputation_future,
test=test_split)
result.append(fold)
if imputer is not None:
imputation_results = Parallel(
n_jobs=n_jobs,
verbose=verbose,
pre_dispatch=pre_dispatch)(imputation_tasks)
result = [
result_fold._replace(
imputed_train=imputation_result)
for (imputation_result, result_fold)
in zip(imputation_results, result)
if imputation_result is not None
]
return result
result_folds.append(fold)
return [
result_fold._replace(imputed_train=(
result_fold.imputed_train.result()
if result_fold.imputed_train is not None
else None))
for result_fold in result_folds
]
......@@ -22,19 +22,14 @@ What it does:
Features:
* Supports imputation as a hyperparameter that can be searched over
* Parallelized with joblib
* Parallelized with concurrent.futures
Note:
The joblib-based parallelization is primary intended to be used with an
alternative joblib backend such as dask-distributed that supports
The parallelization is primary intended to be used with an
alternative concurrent.futures Executor such as dask-distributed that supports
multi-node parallelization. Theano in particular seems to have deadlocks
when running with single-node parallelization.
Also, when using the multiprocessing backend for joblib (the default),
the 'fork' mode causes a library we use to hang. We have to instead use
the 'spawn' or 'forkserver' modes. See:
https://pythonhosted.org/joblib/parallel.html#bad-interaction-of-multiprocessing-and-third-party-libraries
'''
from __future__ import (
print_function,
......@@ -52,8 +47,10 @@ import hashlib
import pickle
import numpy
import joblib
from dask import distributed
from ..parallelism import set_default_executor, get_default_executor
from ..dataset import Dataset
from ..imputation_helpers import imputer_from_name
from .cross_validation import cross_validation_folds
......@@ -141,20 +138,6 @@ parser.add_argument(
metavar="HOST:PORT",
help="Host and port of dask distributed scheduler")
parser.add_argument(
"--joblib-num-jobs",
type=int,
default=1,
metavar="N",
help="Number of joblib workers. Set to -1 to use as many jobs as cores. "
"Default: %(default)s")
parser.add_argument(
"--joblib-pre-dispatch",
metavar="STRING",
default='2*n_jobs',
help="Tasks to initially dispatch to joblib. Default: %(default)s")
parser.add_argument(
"--min-samples-per-allele",
default=100,
......@@ -183,23 +166,18 @@ def run(argv=sys.argv[1:]):
if args.verbose:
logging.basicConfig(level="DEBUG")
if args.dask_scheduler:
import distributed.joblib # for side effects
backend = joblib.parallel_backend(
'distributed',
scheduler_host=args.dask_scheduler)
with backend:
active_backend = joblib.parallel.get_active_backend()[0]
logging.info(
"Running with dask scheduler: %s [%d cores]" % (
args.dask_scheduler,
active_backend.effective_n_jobs()))
go(args)
else:
go(args)
executor = distributed.Executor(args.dask_scheduler)
set_default_executor(executor)
logging.info(
"Running with dask scheduler: %s [%s cores]" % (
args.dask_scheduler,
sum(executor.ncores().values())))
go(args)
def go(args):
executor = get_default_executor()
model_architectures = json.loads(args.model_architectures.read())
logging.info("Read %d model architectures" % len(model_architectures))
if args.max_models:
......@@ -251,10 +229,7 @@ def go(args):
imputer=imputer,
impute_kwargs=impute_kwargs,
drop_similar_peptides=True,
alleles=args.alleles,
n_jobs=args.joblib_num_jobs,
pre_dispatch=args.joblib_pre_dispatch,
verbose=1 if not args.quiet else 0)
alleles=args.alleles)
logging.info(
"Training %d model architectures across %d folds = %d models"
......@@ -266,10 +241,7 @@ def go(args):
cv_results = train_across_models_and_folds(
cv_folds,
model_architectures,
folds_per_task=args.cv_folds_per_task,
n_jobs=args.joblib_num_jobs,
verbose=1 if not args.quiet else 0,
pre_dispatch=args.joblib_pre_dispatch)
folds_per_task=args.cv_folds_per_task)
logging.info(
"Completed cross validation in %0.2f seconds" % (time.time() - start))
......@@ -311,7 +283,6 @@ def go(args):
logging.info("")
train_folds = []
train_models = []
imputation_tasks = []
for (allele_num, allele) in enumerate(cv_results.allele.unique()):
best_index = best_architectures_by_allele[allele]
architecture = model_architectures[best_index]
......@@ -321,14 +292,14 @@ def go(args):
(allele, best_index, architecture))
if architecture['impute']:
imputation_task = joblib.delayed(impute_and_select_allele)(
imputation_future = executor.submit(
impute_and_select_allele,
train_data,
imputer=imputer,
allele=allele,
**impute_kwargs)
imputation_tasks.append(imputation_task)
else:
imputation_task = None
imputation_future = None
test_data_this_allele = None
if test_data is not None:
......@@ -344,29 +315,17 @@ def go(args):
# the imputations so we have to queue up the tasks first.
# If we are not doing imputation then the imputation_task
# is None.
imputed_train=imputation_task,
imputed_train=imputation_future,
test=test_data_this_allele)
train_folds.append(fold)
if imputation_tasks:
logging.info(
"Waiting for %d full-data imputation tasks to complete"
% len(imputation_tasks))
imputation_results = joblib.Parallel(
n_jobs=args.joblib_num_jobs,
verbose=1 if not args.quiet else 0,
pre_dispatch=args.joblib_pre_dispatch)(imputation_tasks)
train_folds = [
train_fold._replace(
# Now we replace imputed_train with the actual imputed
# dataset.
imputed_train=imputation_results.pop(0)
if (train_fold.imputed_train is not None) else None)
for train_fold in train_folds
]
assert not imputation_results
del imputation_tasks
train_folds = [
result_fold._replace(imputed_train=(
result_fold.imputed_train.result()
if result_fold.imputed_train is not None
else None))
for result_fold in train_folds
]
logging.info("Training %d production models" % len(train_folds))
start = time.time()
......@@ -374,10 +333,7 @@ def go(args):
train_folds,
train_models,
cartesian_product_of_folds_and_models=False,
return_predictors=args.out_models_dir is not None,
n_jobs=args.joblib_num_jobs,
verbose=1 if not args.quiet else 0,
pre_dispatch=args.joblib_pre_dispatch)
return_predictors=args.out_models_dir is not None)
logging.info(
"Completed production training in %0.2f seconds"
% (time.time() - start))
......
......@@ -28,11 +28,11 @@ import pandas
import mhcflurry
from joblib import Parallel, delayed
from .scoring import make_scores
from .class1_binding_predictor import Class1BindingPredictor
from ..hyperparameters import HyperparameterDefaults
from ..parallelism import get_default_executor, map_throw_fast
TRAIN_HYPERPARAMETER_DEFAULTS = HyperparameterDefaults(impute=False)
HYPERPARAMETER_DEFAULTS = (
......@@ -239,9 +239,7 @@ def train_across_models_and_folds(
cartesian_product_of_folds_and_models=True,
return_predictors=False,
folds_per_task=1,
n_jobs=1,
verbose=0,
pre_dispatch='2*n_jobs'):
executor=None):
'''
Train and optionally test any number of models across any number of folds.
......@@ -261,24 +259,15 @@ def train_across_models_and_folds(
return_predictors : boolean, optional
Include the trained predictors in the result.
n_jobs : integer, optional
The number of jobs to run in parallel. If -1, then the number of jobs
is set to the number of cores.
verbose : integer, optional
The joblib verbosity. If non zero, progress messages are printed. Above
50, the output is sent to stdout. The frequency of the messages
increases with the verbosity level. If it more than 10, all iterations
are reported.
pre_dispatch : {"all", integer, or expression, as in "3*n_jobs"}
The number of joblib batches (of tasks) to be pre-dispatched. Default
is "2*n_jobs".
executor :
Returns
-----------
pandas.DataFrame
'''
if executor is None:
executor = get_default_executor()
if cartesian_product_of_folds_and_models:
tasks_per_model = int(math.ceil(float(len(folds)) / folds_per_task))
fold_index_groups = [[] for _ in range(tasks_per_model)]
......@@ -307,15 +296,17 @@ def train_across_models_and_folds(
logging.info("Training %d architectures on %d folds = %d tasks." % (
len(model_descriptions), len(folds), len(task_model_and_fold_indices)))
task_results = Parallel(
n_jobs=n_jobs,
verbose=verbose,
pre_dispatch=pre_dispatch)(
delayed(train_and_test_one_model)(
def train_and_test_one_model_task(model_and_fold_nums_pair):
(model_num, fold_nums) = model_and_fold_nums_pair
return train_and_test_one_model(
model_descriptions[model_num],
[folds[i] for i in fold_nums],
return_predictor=return_predictors)
for (model_num, fold_nums) in task_model_and_fold_indices)
task_results = map_throw_fast(
executor,
train_and_test_one_model_task,
task_model_and_fold_indices)
logging.info("Done.")
......
import multiprocessing
from concurrent import futures
import logging
import joblib.parallel
DEFAULT_EXECUTOR = None
def configure_joblib(multiprocessing_mode="spawn"):
"""
Set joblib's default multiprocessing mode.
def set_default_executor(executor):
global DEFAULT_EXECUTOR
DEFAULT_EXECUTOR = executor
The default used in joblib is "fork" which causes a library we use to
deadlock. This function defaults to setting the multiprocessing mode
to "spawn", which does not deadlock. On Python 3.4, you can also try
the "forkserver" mode which does not deadlock and has better
performance.
See: https://pythonhosted.org/joblib/parallel.html#bad-interaction-of-multiprocessing-and-third-party-libraries
def get_default_executor():
global DEFAULT_EXECUTOR
if DEFAULT_EXECUTOR is None:
DEFAULT_EXECUTOR = futures.ThreadPoolExecutor(max_workers=1)
return DEFAULT_EXECUTOR
Parameters
-------------
multiprocessing_mode : string, one of "spawn", "fork", or "forkserver"
"""
if hasattr(multiprocessing, "get_context"):
joblib.parallel.DEFAULT_MP_CONTEXT = multiprocessing.get_context(
multiprocessing_mode)
else:
logging.warn(
"You will probably get deadlocks on Python earlier than 3.4 "
"if you set n_jobs to anything other than 1.")
def map_throw_fast(executor, func, iterable):
futures = [
executor.submit(func, arg) for arg in iterable
]
return wait_all_throw_fast(futures)
def wait_all_throw_fast(fs):
result_dict = {}
for finished_future in futures.as_completed(fs):
result = finished_future.result()
logging.info("%3d / %3d tasks completed" % (
len(result_dict), len(fs)))
result_dict[finished_future] = result
return [result_dict[future] for future in fs]
......@@ -81,8 +81,7 @@ if __name__ == '__main__':
'h5py',
'typechecks',
'pepdata',
'joblib',
'cherrypy', # for multi-threaded web server
'futures',
'bottle',
'six',
],
......
......@@ -58,7 +58,6 @@ def test_small_run():
"--out-production-results", join(temp_dir, "production.csv"),
"--out-models", join(temp_dir, "models"),
"--cv-num-folds", "2",
"--joblib-num-jobs", "1",
"--alleles", "HLA-A0201", "HLA-A0301",
"--verbose",
]
......
......@@ -28,10 +28,7 @@ def test_imputation():
n_folds=3,
imputer=imputer,
drop_similar_peptides=True,
alleles=["HLA-A0201", "HLA-A0202"],
n_jobs=2,
verbose=5,
)
alleles=["HLA-A0201", "HLA-A0202"])
eq_(set(x.allele for x in folds), {"HLA-A0201", "HLA-A0202"})
eq_(len(folds), 6)
......@@ -70,11 +67,7 @@ def test_cross_validation_no_imputation():
n_training_epochs=[3])
print(models)
df = train_across_models_and_folds(
folds,
models,
n_jobs=2,
verbose=50)
df = train_across_models_and_folds(folds, models)
print(df)
assert df.test_auc.mean() > 0.6
......@@ -92,10 +85,7 @@ def test_cross_validation_with_imputation():
n_folds=3,
imputer=imputer,
drop_similar_peptides=True,
alleles=["HLA-A0201", "HLA-A0202"],
n_jobs=3,
verbose=5,
)
alleles=["HLA-A0201", "HLA-A0202"])
eq_(set(x.allele for x in folds), {"HLA-A0201", "HLA-A0202"})
eq_(len(folds), 6)
......@@ -112,10 +102,6 @@ def test_cross_validation_with_imputation():
n_training_epochs=[3])
print(models)
df = train_across_models_and_folds(
folds,
models,
n_jobs=3,
verbose=5)
df = train_across_models_and_folds(folds, models)
print(df)
assert df.test_auc.mean() > 0.6
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment