From c6f884c5286520161f0eb8f386d54fc4bd8100b8 Mon Sep 17 00:00:00 2001 From: Tim O'Donnell <timodonnell@gmail.com> Date: Fri, 23 Sep 2016 11:16:56 -0400 Subject: [PATCH] reorganize parallel code using ParallelBackend class --- .../models.py | 6 +- .../cross_validation.py | 11 ++- .../cv_and_train_command.py | 53 ++++++++---- mhcflurry/class1_allele_specific/train.py | 13 ++- mhcflurry/parallelism.py | 82 ++++++++++++++----- ...s1_allele_specific_cv_and_train_command.py | 1 + 6 files changed, 114 insertions(+), 52 deletions(-) diff --git a/downloads-generation/models_class1_allele_specific_single_kim2014_only/models.py b/downloads-generation/models_class1_allele_specific_single_kim2014_only/models.py index 6e20239c..6375cd45 100644 --- a/downloads-generation/models_class1_allele_specific_single_kim2014_only/models.py +++ b/downloads-generation/models_class1_allele_specific_single_kim2014_only/models.py @@ -3,13 +3,13 @@ from mhcflurry.class1_allele_specific.train import HYPERPARAMETER_DEFAULTS import json models = HYPERPARAMETER_DEFAULTS.models_grid( - impute=[False, True], + #impute=[False, True], + impute=[False], activation=["tanh"], layer_sizes=[[12], [64], [128]], embedding_output_dim=[8, 32, 64], dropout_probability=[0, .1, .25], - pretrain_decay=["1 / (1+epoch)**2"], - fraction_negative=[0, .1, .2], + # fraction_negative=[0, .1, .2], n_training_epochs=[250]) sys.stderr.write("Models: %d\n" % len(models)) diff --git a/mhcflurry/class1_allele_specific/cross_validation.py b/mhcflurry/class1_allele_specific/cross_validation.py index 63852548..6bcd092c 100644 --- a/mhcflurry/class1_allele_specific/cross_validation.py +++ b/mhcflurry/class1_allele_specific/cross_validation.py @@ -20,11 +20,10 @@ from __future__ import ( import collections import logging - import pepdata from .train import impute_and_select_allele, AlleleSpecificTrainTestFold -from ..parallelism import get_default_executor +from ..parallelism import get_default_backend gbmr4_transformer = pepdata.reduced_alphabet.make_alphabet_transformer("gbmr4") @@ -100,7 +99,7 @@ def cross_validation_folds( 'min_observations_per_peptide': 2, 'min_observations_per_allele': 2, }, - executor=None): + parallel_backend=None): ''' Split a Dataset into n_folds cross validation folds for each allele, optionally performing imputation. @@ -136,8 +135,8 @@ def cross_validation_folds( list of AlleleSpecificTrainTestFold of length num alleles * n_folds ''' - if executor is None: - executor = get_default_executor() + if parallel_backend is None: + parallel_backend = get_default_backend() if alleles is None: alleles = train_data.unique_alleles() @@ -166,7 +165,7 @@ def cross_validation_folds( test_split = full_test_split if imputer is not None: - imputation_future = executor.submit( + imputation_future = parallel_backend.submit( impute_and_select_allele, all_allele_train_split, imputer=imputer, diff --git a/mhcflurry/class1_allele_specific/cv_and_train_command.py b/mhcflurry/class1_allele_specific/cv_and_train_command.py index cda07b7e..fea31a95 100644 --- a/mhcflurry/class1_allele_specific/cv_and_train_command.py +++ b/mhcflurry/class1_allele_specific/cv_and_train_command.py @@ -48,9 +48,7 @@ import pickle import numpy -from dask import distributed - -from ..parallelism import set_default_executor, get_default_executor +from .. import parallelism from ..dataset import Dataset from ..imputation_helpers import imputer_from_name from .cross_validation import cross_validation_folds @@ -138,6 +136,19 @@ parser.add_argument( metavar="HOST:PORT", help="Host and port of dask distributed scheduler") +parser.add_argument( + "--num-local-processes", + metavar="N", + type=int, + help="Processes (exclusive with --dask-scheduler and --num-local-threads)") + +parser.add_argument( + "--num-local-threads", + metavar="N", + type=int, + default=1, + help="Threads (exclusive with --dask-scheduler and --num-local-processes)") + parser.add_argument( "--min-samples-per-allele", default=100, @@ -161,22 +172,36 @@ parser.add_argument( def run(argv=sys.argv[1:]): args = parser.parse_args(argv) - if not args.quiet: - logging.basicConfig(level="INFO") if args.verbose: - logging.basicConfig(level="DEBUG") + logging.root.setLevel(level="DEBUG") + elif not args.quiet: + logging.root.setLevel(level="INFO") + + logging.info("Running with arguments: %s" % args) + print("Past logging") + + + # Set parallel backend if args.dask_scheduler: - executor = distributed.Executor(args.dask_scheduler) - set_default_executor(executor) - logging.info( - "Running with dask scheduler: %s [%s cores]" % ( - args.dask_scheduler, - sum(executor.ncores().values()))) + backend = parallelism.DaskDistributedParallelBackend( + args.dask_scheduler) + else: + if args.num_local_processes: + backend = parallelism.ConcurrentFuturesParallelBackend( + args.num_local_processes, + processes=True) + else: + backend = parallelism.ConcurrentFuturesParallelBackend( + args.num_local_threads, + processes=False) + + parallelism.set_default_backend(backend) + logging.info("Using parallel backend: %s" % backend) go(args) def go(args): - executor = get_default_executor() + backend = parallelism.get_default_backend() model_architectures = json.loads(args.model_architectures.read()) logging.info("Read %d model architectures" % len(model_architectures)) @@ -292,7 +317,7 @@ def go(args): (allele, best_index, architecture)) if architecture['impute']: - imputation_future = executor.submit( + imputation_future = backend.submit( impute_and_select_allele, train_data, imputer=imputer, diff --git a/mhcflurry/class1_allele_specific/train.py b/mhcflurry/class1_allele_specific/train.py index 73937f7a..c4f29e49 100644 --- a/mhcflurry/class1_allele_specific/train.py +++ b/mhcflurry/class1_allele_specific/train.py @@ -31,7 +31,7 @@ import mhcflurry from .scoring import make_scores from .class1_binding_predictor import Class1BindingPredictor from ..hyperparameters import HyperparameterDefaults -from ..parallelism import get_default_executor, map_throw_fast +from ..parallelism import get_default_backend TRAIN_HYPERPARAMETER_DEFAULTS = HyperparameterDefaults(impute=False) @@ -239,7 +239,7 @@ def train_across_models_and_folds( cartesian_product_of_folds_and_models=True, return_predictors=False, folds_per_task=1, - executor=None): + parallel_backend=None): ''' Train and optionally test any number of models across any number of folds. @@ -259,14 +259,14 @@ def train_across_models_and_folds( return_predictors : boolean, optional Include the trained predictors in the result. - executor : + parallel_backend : parallel backend, optional Returns ----------- pandas.DataFrame ''' - if executor is None: - executor = get_default_executor() + if parallel_backend is None: + parallel_backend = get_default_backend() if cartesian_product_of_folds_and_models: tasks_per_model = int(math.ceil(float(len(folds)) / folds_per_task)) @@ -303,8 +303,7 @@ def train_across_models_and_folds( [folds[i] for i in fold_nums], return_predictor=return_predictors) - task_results = map_throw_fast( - executor, + task_results = parallel_backend.map( train_and_test_one_model_task, task_model_and_fold_indices) diff --git a/mhcflurry/parallelism.py b/mhcflurry/parallelism.py index 73e9cfce..d22473bb 100644 --- a/mhcflurry/parallelism.py +++ b/mhcflurry/parallelism.py @@ -1,34 +1,72 @@ from concurrent import futures import logging -DEFAULT_EXECUTOR = None +DEFAULT_BACKEND = None -def set_default_executor(executor): - global DEFAULT_EXECUTOR - DEFAULT_EXECUTOR = executor +class ParallelBackend(object): + def __init__(self, executor, module, verbose=1): + self.executor = executor + self.module = module + self.verbose = verbose + def submit(self, func, *args, **kwargs): + if self.verbose > 0: + logging.debug("Submitting: %s %s %s" % (func, args, kwargs)) + return self.executor.submit(func, *args, **kwargs) -def get_default_executor(): - global DEFAULT_EXECUTOR - if DEFAULT_EXECUTOR is None: - DEFAULT_EXECUTOR = futures.ThreadPoolExecutor(max_workers=1) - return DEFAULT_EXECUTOR + def map(self, func, iterable): + fs = [ + self.executor.submit(func, arg) for arg in iterable + ] + return self.wait(fs) + def wait(self, fs): + result_dict = {} + for finished_future in self.module.as_completed(fs): + result = finished_future.result() + logging.info("%3d / %3d tasks completed" % ( + len(result_dict), len(fs))) + result_dict[finished_future] = result -def map_throw_fast(executor, func, iterable): - futures = [ - executor.submit(func, arg) for arg in iterable - ] - return wait_all_throw_fast(futures) + return [result_dict[future] for future in fs] -def wait_all_throw_fast(fs): - result_dict = {} - for finished_future in futures.as_completed(fs): - result = finished_future.result() - logging.info("%3d / %3d tasks completed" % ( - len(result_dict), len(fs))) - result_dict[finished_future] = result +class DaskDistributedParallelBackend(ParallelBackend): + def __init__(self, scheduler_ip_and_port, verbose=1): + from dask import distributed + executor = distributed.Executor(scheduler_ip_and_port) + ParallelBackend.__init__(self, executor, distributed, verbose=verbose) + self.scheduler_ip_and_port = scheduler_ip_and_port - return [result_dict[future] for future in fs] + def __str__(self): + return "<Dask distributed backend, scheduler=%s, total_cores=%d>" % ( + self.scheduler_ip_and_port, + sum(self.executor.ncores().values())) + + +class ConcurrentFuturesParallelBackend(ParallelBackend): + def __init__(self, num_workers=1, processes=False, verbose=1): + if processes: + executor = futures.ProcessPoolExecutor(num_workers) + else: + executor = futures.ThreadPoolExecutor(num_workers) + ParallelBackend.__init__(self, executor, futures, verbose=verbose) + self.num_workers = num_workers + self.processes = processes + + def __str__(self): + return "<Concurrent futures %s parallel backend, num workers = %d>" % ( + ("processes" if self.processes else "threads"), self.num_workers) + + +def set_default_backend(backend): + global DEFAULT_BACKEND + DEFAULT_BACKEND = backend + + +def get_default_backend(): + global DEFAULT_BACKEND + if DEFAULT_BACKEND is None: + set_default_backend(ConcurrentFuturesParallelBackend()) + return DEFAULT_BACKEND diff --git a/test/test_class1_allele_specific_cv_and_train_command.py b/test/test_class1_allele_specific_cv_and_train_command.py index a439b515..fdc123be 100644 --- a/test/test_class1_allele_specific_cv_and_train_command.py +++ b/test/test_class1_allele_specific_cv_and_train_command.py @@ -60,6 +60,7 @@ def test_small_run(): "--cv-num-folds", "2", "--alleles", "HLA-A0201", "HLA-A0301", "--verbose", + "--num-local-threads", "2", ] print("Running cv_and_train_command with args: %s " % str(args)) -- GitLab