diff --git a/mhcflurry/__init__.py b/mhcflurry/__init__.py index 3a4433007cf15ab2c7eda935ff9fb820d6050e53..dc67b8bc3bff77fee5d5d845de4f7535eb73a480 100644 --- a/mhcflurry/__init__.py +++ b/mhcflurry/__init__.py @@ -18,8 +18,6 @@ from .predict import predict from .package_metadata import __version__ from . import parallelism -parallelism.configure_joblib() - __all__ = [ "Class1BindingPredictor", "predict", diff --git a/mhcflurry/class1_allele_specific/cross_validation.py b/mhcflurry/class1_allele_specific/cross_validation.py index b02ea0ede5fd731a7c2e258565be1f48dd558af8..63852548670970e1ac90816b7934cebc1ad54ec8 100644 --- a/mhcflurry/class1_allele_specific/cross_validation.py +++ b/mhcflurry/class1_allele_specific/cross_validation.py @@ -20,11 +20,11 @@ from __future__ import ( import collections import logging -from joblib import Parallel, delayed import pepdata from .train import impute_and_select_allele, AlleleSpecificTrainTestFold +from ..parallelism import get_default_executor gbmr4_transformer = pepdata.reduced_alphabet.make_alphabet_transformer("gbmr4") @@ -100,9 +100,7 @@ def cross_validation_folds( 'min_observations_per_peptide': 2, 'min_observations_per_allele': 2, }, - n_jobs=1, - verbose=0, - pre_dispatch='2*n_jobs'): + executor=None): ''' Split a Dataset into n_folds cross validation folds for each allele, optionally performing imputation. @@ -133,26 +131,18 @@ def cross_validation_folds( The number of jobs to run in parallel. If -1, then the number of jobs is set to the number of cores. - verbose : integer, optional - The joblib verbosity. If non zero, progress messages are printed. Above - 50, the output is sent to stdout. The frequency of the messages - increases with the verbosity level. If it more than 10, all iterations - are reported. - - pre_dispatch : {"all", integer, or expression, as in "3*n_jobs"} - The number of joblib batches (of tasks) to be pre-dispatched. Default - is "2*n_jobs". - Returns ----------- list of AlleleSpecificTrainTestFold of length num alleles * n_folds ''' + if executor is None: + executor = get_default_executor() + if alleles is None: alleles = train_data.unique_alleles() - result = [] - imputation_tasks = [] + result_folds = [] for allele in alleles: logging.info("Allele: %s" % allele) cv_iter = train_data.cross_validation_iterator( @@ -176,31 +166,27 @@ def cross_validation_folds( test_split = full_test_split if imputer is not None: - imputation_tasks.append(delayed(impute_and_select_allele)( + imputation_future = executor.submit( + impute_and_select_allele, all_allele_train_split, imputer=imputer, allele=allele, - **impute_kwargs)) + **impute_kwargs) + else: + imputation_future = None train_split = all_allele_train_split.get_allele(allele) fold = AlleleSpecificTrainTestFold( allele=allele, train=train_split, - imputed_train=None, + imputed_train=imputation_future, test=test_split) - result.append(fold) - - if imputer is not None: - imputation_results = Parallel( - n_jobs=n_jobs, - verbose=verbose, - pre_dispatch=pre_dispatch)(imputation_tasks) - - result = [ - result_fold._replace( - imputed_train=imputation_result) - for (imputation_result, result_fold) - in zip(imputation_results, result) - if imputation_result is not None - ] - return result + result_folds.append(fold) + + return [ + result_fold._replace(imputed_train=( + result_fold.imputed_train.result() + if result_fold.imputed_train is not None + else None)) + for result_fold in result_folds + ] diff --git a/mhcflurry/class1_allele_specific/cv_and_train_command.py b/mhcflurry/class1_allele_specific/cv_and_train_command.py index 29615bb1419f0cd2c1f5635e5b6d8a19bb3c851e..cda07b7e9218c155837b2b75918816a10ed573ff 100644 --- a/mhcflurry/class1_allele_specific/cv_and_train_command.py +++ b/mhcflurry/class1_allele_specific/cv_and_train_command.py @@ -22,19 +22,14 @@ What it does: Features: * Supports imputation as a hyperparameter that can be searched over - * Parallelized with joblib + * Parallelized with concurrent.futures Note: -The joblib-based parallelization is primary intended to be used with an -alternative joblib backend such as dask-distributed that supports +The parallelization is primary intended to be used with an +alternative concurrent.futures Executor such as dask-distributed that supports multi-node parallelization. Theano in particular seems to have deadlocks when running with single-node parallelization. - -Also, when using the multiprocessing backend for joblib (the default), -the 'fork' mode causes a library we use to hang. We have to instead use -the 'spawn' or 'forkserver' modes. See: -https://pythonhosted.org/joblib/parallel.html#bad-interaction-of-multiprocessing-and-third-party-libraries ''' from __future__ import ( print_function, @@ -52,8 +47,10 @@ import hashlib import pickle import numpy -import joblib +from dask import distributed + +from ..parallelism import set_default_executor, get_default_executor from ..dataset import Dataset from ..imputation_helpers import imputer_from_name from .cross_validation import cross_validation_folds @@ -141,20 +138,6 @@ parser.add_argument( metavar="HOST:PORT", help="Host and port of dask distributed scheduler") -parser.add_argument( - "--joblib-num-jobs", - type=int, - default=1, - metavar="N", - help="Number of joblib workers. Set to -1 to use as many jobs as cores. " - "Default: %(default)s") - -parser.add_argument( - "--joblib-pre-dispatch", - metavar="STRING", - default='2*n_jobs', - help="Tasks to initially dispatch to joblib. Default: %(default)s") - parser.add_argument( "--min-samples-per-allele", default=100, @@ -183,23 +166,18 @@ def run(argv=sys.argv[1:]): if args.verbose: logging.basicConfig(level="DEBUG") if args.dask_scheduler: - import distributed.joblib # for side effects - backend = joblib.parallel_backend( - 'distributed', - scheduler_host=args.dask_scheduler) - with backend: - active_backend = joblib.parallel.get_active_backend()[0] - logging.info( - "Running with dask scheduler: %s [%d cores]" % ( - args.dask_scheduler, - active_backend.effective_n_jobs())) - - go(args) - else: - go(args) + executor = distributed.Executor(args.dask_scheduler) + set_default_executor(executor) + logging.info( + "Running with dask scheduler: %s [%s cores]" % ( + args.dask_scheduler, + sum(executor.ncores().values()))) + go(args) def go(args): + executor = get_default_executor() + model_architectures = json.loads(args.model_architectures.read()) logging.info("Read %d model architectures" % len(model_architectures)) if args.max_models: @@ -251,10 +229,7 @@ def go(args): imputer=imputer, impute_kwargs=impute_kwargs, drop_similar_peptides=True, - alleles=args.alleles, - n_jobs=args.joblib_num_jobs, - pre_dispatch=args.joblib_pre_dispatch, - verbose=1 if not args.quiet else 0) + alleles=args.alleles) logging.info( "Training %d model architectures across %d folds = %d models" @@ -266,10 +241,7 @@ def go(args): cv_results = train_across_models_and_folds( cv_folds, model_architectures, - folds_per_task=args.cv_folds_per_task, - n_jobs=args.joblib_num_jobs, - verbose=1 if not args.quiet else 0, - pre_dispatch=args.joblib_pre_dispatch) + folds_per_task=args.cv_folds_per_task) logging.info( "Completed cross validation in %0.2f seconds" % (time.time() - start)) @@ -311,7 +283,6 @@ def go(args): logging.info("") train_folds = [] train_models = [] - imputation_tasks = [] for (allele_num, allele) in enumerate(cv_results.allele.unique()): best_index = best_architectures_by_allele[allele] architecture = model_architectures[best_index] @@ -321,14 +292,14 @@ def go(args): (allele, best_index, architecture)) if architecture['impute']: - imputation_task = joblib.delayed(impute_and_select_allele)( + imputation_future = executor.submit( + impute_and_select_allele, train_data, imputer=imputer, allele=allele, **impute_kwargs) - imputation_tasks.append(imputation_task) else: - imputation_task = None + imputation_future = None test_data_this_allele = None if test_data is not None: @@ -344,29 +315,17 @@ def go(args): # the imputations so we have to queue up the tasks first. # If we are not doing imputation then the imputation_task # is None. - imputed_train=imputation_task, + imputed_train=imputation_future, test=test_data_this_allele) train_folds.append(fold) - if imputation_tasks: - logging.info( - "Waiting for %d full-data imputation tasks to complete" - % len(imputation_tasks)) - imputation_results = joblib.Parallel( - n_jobs=args.joblib_num_jobs, - verbose=1 if not args.quiet else 0, - pre_dispatch=args.joblib_pre_dispatch)(imputation_tasks) - - train_folds = [ - train_fold._replace( - # Now we replace imputed_train with the actual imputed - # dataset. - imputed_train=imputation_results.pop(0) - if (train_fold.imputed_train is not None) else None) - for train_fold in train_folds - ] - assert not imputation_results - del imputation_tasks + train_folds = [ + result_fold._replace(imputed_train=( + result_fold.imputed_train.result() + if result_fold.imputed_train is not None + else None)) + for result_fold in train_folds + ] logging.info("Training %d production models" % len(train_folds)) start = time.time() @@ -374,10 +333,7 @@ def go(args): train_folds, train_models, cartesian_product_of_folds_and_models=False, - return_predictors=args.out_models_dir is not None, - n_jobs=args.joblib_num_jobs, - verbose=1 if not args.quiet else 0, - pre_dispatch=args.joblib_pre_dispatch) + return_predictors=args.out_models_dir is not None) logging.info( "Completed production training in %0.2f seconds" % (time.time() - start)) diff --git a/mhcflurry/class1_allele_specific/train.py b/mhcflurry/class1_allele_specific/train.py index c59719b341657f32f0fc280fcb59b0a196ddad9e..73937f7a06144267367b75e5728a580574e03e69 100644 --- a/mhcflurry/class1_allele_specific/train.py +++ b/mhcflurry/class1_allele_specific/train.py @@ -28,11 +28,11 @@ import pandas import mhcflurry -from joblib import Parallel, delayed - from .scoring import make_scores from .class1_binding_predictor import Class1BindingPredictor from ..hyperparameters import HyperparameterDefaults +from ..parallelism import get_default_executor, map_throw_fast + TRAIN_HYPERPARAMETER_DEFAULTS = HyperparameterDefaults(impute=False) HYPERPARAMETER_DEFAULTS = ( @@ -239,9 +239,7 @@ def train_across_models_and_folds( cartesian_product_of_folds_and_models=True, return_predictors=False, folds_per_task=1, - n_jobs=1, - verbose=0, - pre_dispatch='2*n_jobs'): + executor=None): ''' Train and optionally test any number of models across any number of folds. @@ -261,24 +259,15 @@ def train_across_models_and_folds( return_predictors : boolean, optional Include the trained predictors in the result. - n_jobs : integer, optional - The number of jobs to run in parallel. If -1, then the number of jobs - is set to the number of cores. - - verbose : integer, optional - The joblib verbosity. If non zero, progress messages are printed. Above - 50, the output is sent to stdout. The frequency of the messages - increases with the verbosity level. If it more than 10, all iterations - are reported. - - pre_dispatch : {"all", integer, or expression, as in "3*n_jobs"} - The number of joblib batches (of tasks) to be pre-dispatched. Default - is "2*n_jobs". + executor : Returns ----------- pandas.DataFrame ''' + if executor is None: + executor = get_default_executor() + if cartesian_product_of_folds_and_models: tasks_per_model = int(math.ceil(float(len(folds)) / folds_per_task)) fold_index_groups = [[] for _ in range(tasks_per_model)] @@ -307,15 +296,17 @@ def train_across_models_and_folds( logging.info("Training %d architectures on %d folds = %d tasks." % ( len(model_descriptions), len(folds), len(task_model_and_fold_indices))) - task_results = Parallel( - n_jobs=n_jobs, - verbose=verbose, - pre_dispatch=pre_dispatch)( - delayed(train_and_test_one_model)( + def train_and_test_one_model_task(model_and_fold_nums_pair): + (model_num, fold_nums) = model_and_fold_nums_pair + return train_and_test_one_model( model_descriptions[model_num], [folds[i] for i in fold_nums], return_predictor=return_predictors) - for (model_num, fold_nums) in task_model_and_fold_indices) + + task_results = map_throw_fast( + executor, + train_and_test_one_model_task, + task_model_and_fold_indices) logging.info("Done.") diff --git a/mhcflurry/parallelism.py b/mhcflurry/parallelism.py index 00c8302f12a47ed2dbd21836a7e00d095ba5f81c..73e9cfce34cd686a99beb2b307e2ad7c8d584ecd 100644 --- a/mhcflurry/parallelism.py +++ b/mhcflurry/parallelism.py @@ -1,30 +1,34 @@ -import multiprocessing +from concurrent import futures import logging -import joblib.parallel +DEFAULT_EXECUTOR = None -def configure_joblib(multiprocessing_mode="spawn"): - """ - Set joblib's default multiprocessing mode. +def set_default_executor(executor): + global DEFAULT_EXECUTOR + DEFAULT_EXECUTOR = executor - The default used in joblib is "fork" which causes a library we use to - deadlock. This function defaults to setting the multiprocessing mode - to "spawn", which does not deadlock. On Python 3.4, you can also try - the "forkserver" mode which does not deadlock and has better - performance. - See: https://pythonhosted.org/joblib/parallel.html#bad-interaction-of-multiprocessing-and-third-party-libraries +def get_default_executor(): + global DEFAULT_EXECUTOR + if DEFAULT_EXECUTOR is None: + DEFAULT_EXECUTOR = futures.ThreadPoolExecutor(max_workers=1) + return DEFAULT_EXECUTOR - Parameters - ------------- - multiprocessing_mode : string, one of "spawn", "fork", or "forkserver" - """ - if hasattr(multiprocessing, "get_context"): - joblib.parallel.DEFAULT_MP_CONTEXT = multiprocessing.get_context( - multiprocessing_mode) - else: - logging.warn( - "You will probably get deadlocks on Python earlier than 3.4 " - "if you set n_jobs to anything other than 1.") +def map_throw_fast(executor, func, iterable): + futures = [ + executor.submit(func, arg) for arg in iterable + ] + return wait_all_throw_fast(futures) + + +def wait_all_throw_fast(fs): + result_dict = {} + for finished_future in futures.as_completed(fs): + result = finished_future.result() + logging.info("%3d / %3d tasks completed" % ( + len(result_dict), len(fs))) + result_dict[finished_future] = result + + return [result_dict[future] for future in fs] diff --git a/setup.py b/setup.py index 32f478f49b9cc341dd6af1344e1354040965ce89..4583f51880513a6119a46427d05260b9e24e393b 100644 --- a/setup.py +++ b/setup.py @@ -81,8 +81,7 @@ if __name__ == '__main__': 'h5py', 'typechecks', 'pepdata', - 'joblib', - 'cherrypy', # for multi-threaded web server + 'futures', 'bottle', 'six', ], diff --git a/test/test_class1_allele_specific_cv_and_train_command.py b/test/test_class1_allele_specific_cv_and_train_command.py index 204920b29ab525d9451ad5563f7e11469fe2cfbd..a439b5151977414f6e8f4f9cb7addec2086159b5 100644 --- a/test/test_class1_allele_specific_cv_and_train_command.py +++ b/test/test_class1_allele_specific_cv_and_train_command.py @@ -58,7 +58,6 @@ def test_small_run(): "--out-production-results", join(temp_dir, "production.csv"), "--out-models", join(temp_dir, "models"), "--cv-num-folds", "2", - "--joblib-num-jobs", "1", "--alleles", "HLA-A0201", "HLA-A0301", "--verbose", ] diff --git a/test/test_cross_validation.py b/test/test_cross_validation.py index eaa1348a20f18eaf275f3532a27a0155b446f7fc..cf95333b81c993ef0282af397abd85658f737d8b 100644 --- a/test/test_cross_validation.py +++ b/test/test_cross_validation.py @@ -28,10 +28,7 @@ def test_imputation(): n_folds=3, imputer=imputer, drop_similar_peptides=True, - alleles=["HLA-A0201", "HLA-A0202"], - n_jobs=2, - verbose=5, - ) + alleles=["HLA-A0201", "HLA-A0202"]) eq_(set(x.allele for x in folds), {"HLA-A0201", "HLA-A0202"}) eq_(len(folds), 6) @@ -70,11 +67,7 @@ def test_cross_validation_no_imputation(): n_training_epochs=[3]) print(models) - df = train_across_models_and_folds( - folds, - models, - n_jobs=2, - verbose=50) + df = train_across_models_and_folds(folds, models) print(df) assert df.test_auc.mean() > 0.6 @@ -92,10 +85,7 @@ def test_cross_validation_with_imputation(): n_folds=3, imputer=imputer, drop_similar_peptides=True, - alleles=["HLA-A0201", "HLA-A0202"], - n_jobs=3, - verbose=5, - ) + alleles=["HLA-A0201", "HLA-A0202"]) eq_(set(x.allele for x in folds), {"HLA-A0201", "HLA-A0202"}) eq_(len(folds), 6) @@ -112,10 +102,6 @@ def test_cross_validation_with_imputation(): n_training_epochs=[3]) print(models) - df = train_across_models_and_folds( - folds, - models, - n_jobs=3, - verbose=5) + df = train_across_models_and_folds(folds, models) print(df) assert df.test_auc.mean() > 0.6