Skip to content
Snippets Groups Projects
Commit c6f884c5 authored by Tim O'Donnell's avatar Tim O'Donnell
Browse files

reorganize parallel code using ParallelBackend class

parent d5f9f405
No related branches found
No related tags found
No related merge requests found
...@@ -3,13 +3,13 @@ from mhcflurry.class1_allele_specific.train import HYPERPARAMETER_DEFAULTS ...@@ -3,13 +3,13 @@ from mhcflurry.class1_allele_specific.train import HYPERPARAMETER_DEFAULTS
import json import json
models = HYPERPARAMETER_DEFAULTS.models_grid( models = HYPERPARAMETER_DEFAULTS.models_grid(
impute=[False, True], #impute=[False, True],
impute=[False],
activation=["tanh"], activation=["tanh"],
layer_sizes=[[12], [64], [128]], layer_sizes=[[12], [64], [128]],
embedding_output_dim=[8, 32, 64], embedding_output_dim=[8, 32, 64],
dropout_probability=[0, .1, .25], dropout_probability=[0, .1, .25],
pretrain_decay=["1 / (1+epoch)**2"], # fraction_negative=[0, .1, .2],
fraction_negative=[0, .1, .2],
n_training_epochs=[250]) n_training_epochs=[250])
sys.stderr.write("Models: %d\n" % len(models)) sys.stderr.write("Models: %d\n" % len(models))
......
...@@ -20,11 +20,10 @@ from __future__ import ( ...@@ -20,11 +20,10 @@ from __future__ import (
import collections import collections
import logging import logging
import pepdata import pepdata
from .train import impute_and_select_allele, AlleleSpecificTrainTestFold from .train import impute_and_select_allele, AlleleSpecificTrainTestFold
from ..parallelism import get_default_executor from ..parallelism import get_default_backend
gbmr4_transformer = pepdata.reduced_alphabet.make_alphabet_transformer("gbmr4") gbmr4_transformer = pepdata.reduced_alphabet.make_alphabet_transformer("gbmr4")
...@@ -100,7 +99,7 @@ def cross_validation_folds( ...@@ -100,7 +99,7 @@ def cross_validation_folds(
'min_observations_per_peptide': 2, 'min_observations_per_peptide': 2,
'min_observations_per_allele': 2, 'min_observations_per_allele': 2,
}, },
executor=None): parallel_backend=None):
''' '''
Split a Dataset into n_folds cross validation folds for each allele, Split a Dataset into n_folds cross validation folds for each allele,
optionally performing imputation. optionally performing imputation.
...@@ -136,8 +135,8 @@ def cross_validation_folds( ...@@ -136,8 +135,8 @@ def cross_validation_folds(
list of AlleleSpecificTrainTestFold of length num alleles * n_folds list of AlleleSpecificTrainTestFold of length num alleles * n_folds
''' '''
if executor is None: if parallel_backend is None:
executor = get_default_executor() parallel_backend = get_default_backend()
if alleles is None: if alleles is None:
alleles = train_data.unique_alleles() alleles = train_data.unique_alleles()
...@@ -166,7 +165,7 @@ def cross_validation_folds( ...@@ -166,7 +165,7 @@ def cross_validation_folds(
test_split = full_test_split test_split = full_test_split
if imputer is not None: if imputer is not None:
imputation_future = executor.submit( imputation_future = parallel_backend.submit(
impute_and_select_allele, impute_and_select_allele,
all_allele_train_split, all_allele_train_split,
imputer=imputer, imputer=imputer,
......
...@@ -48,9 +48,7 @@ import pickle ...@@ -48,9 +48,7 @@ import pickle
import numpy import numpy
from dask import distributed from .. import parallelism
from ..parallelism import set_default_executor, get_default_executor
from ..dataset import Dataset from ..dataset import Dataset
from ..imputation_helpers import imputer_from_name from ..imputation_helpers import imputer_from_name
from .cross_validation import cross_validation_folds from .cross_validation import cross_validation_folds
...@@ -138,6 +136,19 @@ parser.add_argument( ...@@ -138,6 +136,19 @@ parser.add_argument(
metavar="HOST:PORT", metavar="HOST:PORT",
help="Host and port of dask distributed scheduler") help="Host and port of dask distributed scheduler")
parser.add_argument(
"--num-local-processes",
metavar="N",
type=int,
help="Processes (exclusive with --dask-scheduler and --num-local-threads)")
parser.add_argument(
"--num-local-threads",
metavar="N",
type=int,
default=1,
help="Threads (exclusive with --dask-scheduler and --num-local-processes)")
parser.add_argument( parser.add_argument(
"--min-samples-per-allele", "--min-samples-per-allele",
default=100, default=100,
...@@ -161,22 +172,36 @@ parser.add_argument( ...@@ -161,22 +172,36 @@ parser.add_argument(
def run(argv=sys.argv[1:]): def run(argv=sys.argv[1:]):
args = parser.parse_args(argv) args = parser.parse_args(argv)
if not args.quiet:
logging.basicConfig(level="INFO")
if args.verbose: if args.verbose:
logging.basicConfig(level="DEBUG") logging.root.setLevel(level="DEBUG")
elif not args.quiet:
logging.root.setLevel(level="INFO")
logging.info("Running with arguments: %s" % args)
print("Past logging")
# Set parallel backend
if args.dask_scheduler: if args.dask_scheduler:
executor = distributed.Executor(args.dask_scheduler) backend = parallelism.DaskDistributedParallelBackend(
set_default_executor(executor) args.dask_scheduler)
logging.info( else:
"Running with dask scheduler: %s [%s cores]" % ( if args.num_local_processes:
args.dask_scheduler, backend = parallelism.ConcurrentFuturesParallelBackend(
sum(executor.ncores().values()))) args.num_local_processes,
processes=True)
else:
backend = parallelism.ConcurrentFuturesParallelBackend(
args.num_local_threads,
processes=False)
parallelism.set_default_backend(backend)
logging.info("Using parallel backend: %s" % backend)
go(args) go(args)
def go(args): def go(args):
executor = get_default_executor() backend = parallelism.get_default_backend()
model_architectures = json.loads(args.model_architectures.read()) model_architectures = json.loads(args.model_architectures.read())
logging.info("Read %d model architectures" % len(model_architectures)) logging.info("Read %d model architectures" % len(model_architectures))
...@@ -292,7 +317,7 @@ def go(args): ...@@ -292,7 +317,7 @@ def go(args):
(allele, best_index, architecture)) (allele, best_index, architecture))
if architecture['impute']: if architecture['impute']:
imputation_future = executor.submit( imputation_future = backend.submit(
impute_and_select_allele, impute_and_select_allele,
train_data, train_data,
imputer=imputer, imputer=imputer,
......
...@@ -31,7 +31,7 @@ import mhcflurry ...@@ -31,7 +31,7 @@ import mhcflurry
from .scoring import make_scores from .scoring import make_scores
from .class1_binding_predictor import Class1BindingPredictor from .class1_binding_predictor import Class1BindingPredictor
from ..hyperparameters import HyperparameterDefaults from ..hyperparameters import HyperparameterDefaults
from ..parallelism import get_default_executor, map_throw_fast from ..parallelism import get_default_backend
TRAIN_HYPERPARAMETER_DEFAULTS = HyperparameterDefaults(impute=False) TRAIN_HYPERPARAMETER_DEFAULTS = HyperparameterDefaults(impute=False)
...@@ -239,7 +239,7 @@ def train_across_models_and_folds( ...@@ -239,7 +239,7 @@ def train_across_models_and_folds(
cartesian_product_of_folds_and_models=True, cartesian_product_of_folds_and_models=True,
return_predictors=False, return_predictors=False,
folds_per_task=1, folds_per_task=1,
executor=None): parallel_backend=None):
''' '''
Train and optionally test any number of models across any number of folds. Train and optionally test any number of models across any number of folds.
...@@ -259,14 +259,14 @@ def train_across_models_and_folds( ...@@ -259,14 +259,14 @@ def train_across_models_and_folds(
return_predictors : boolean, optional return_predictors : boolean, optional
Include the trained predictors in the result. Include the trained predictors in the result.
executor : parallel_backend : parallel backend, optional
Returns Returns
----------- -----------
pandas.DataFrame pandas.DataFrame
''' '''
if executor is None: if parallel_backend is None:
executor = get_default_executor() parallel_backend = get_default_backend()
if cartesian_product_of_folds_and_models: if cartesian_product_of_folds_and_models:
tasks_per_model = int(math.ceil(float(len(folds)) / folds_per_task)) tasks_per_model = int(math.ceil(float(len(folds)) / folds_per_task))
...@@ -303,8 +303,7 @@ def train_across_models_and_folds( ...@@ -303,8 +303,7 @@ def train_across_models_and_folds(
[folds[i] for i in fold_nums], [folds[i] for i in fold_nums],
return_predictor=return_predictors) return_predictor=return_predictors)
task_results = map_throw_fast( task_results = parallel_backend.map(
executor,
train_and_test_one_model_task, train_and_test_one_model_task,
task_model_and_fold_indices) task_model_and_fold_indices)
......
from concurrent import futures from concurrent import futures
import logging import logging
DEFAULT_EXECUTOR = None DEFAULT_BACKEND = None
def set_default_executor(executor): class ParallelBackend(object):
global DEFAULT_EXECUTOR def __init__(self, executor, module, verbose=1):
DEFAULT_EXECUTOR = executor self.executor = executor
self.module = module
self.verbose = verbose
def submit(self, func, *args, **kwargs):
if self.verbose > 0:
logging.debug("Submitting: %s %s %s" % (func, args, kwargs))
return self.executor.submit(func, *args, **kwargs)
def get_default_executor(): def map(self, func, iterable):
global DEFAULT_EXECUTOR fs = [
if DEFAULT_EXECUTOR is None: self.executor.submit(func, arg) for arg in iterable
DEFAULT_EXECUTOR = futures.ThreadPoolExecutor(max_workers=1) ]
return DEFAULT_EXECUTOR return self.wait(fs)
def wait(self, fs):
result_dict = {}
for finished_future in self.module.as_completed(fs):
result = finished_future.result()
logging.info("%3d / %3d tasks completed" % (
len(result_dict), len(fs)))
result_dict[finished_future] = result
def map_throw_fast(executor, func, iterable): return [result_dict[future] for future in fs]
futures = [
executor.submit(func, arg) for arg in iterable
]
return wait_all_throw_fast(futures)
def wait_all_throw_fast(fs): class DaskDistributedParallelBackend(ParallelBackend):
result_dict = {} def __init__(self, scheduler_ip_and_port, verbose=1):
for finished_future in futures.as_completed(fs): from dask import distributed
result = finished_future.result() executor = distributed.Executor(scheduler_ip_and_port)
logging.info("%3d / %3d tasks completed" % ( ParallelBackend.__init__(self, executor, distributed, verbose=verbose)
len(result_dict), len(fs))) self.scheduler_ip_and_port = scheduler_ip_and_port
result_dict[finished_future] = result
return [result_dict[future] for future in fs] def __str__(self):
return "<Dask distributed backend, scheduler=%s, total_cores=%d>" % (
self.scheduler_ip_and_port,
sum(self.executor.ncores().values()))
class ConcurrentFuturesParallelBackend(ParallelBackend):
def __init__(self, num_workers=1, processes=False, verbose=1):
if processes:
executor = futures.ProcessPoolExecutor(num_workers)
else:
executor = futures.ThreadPoolExecutor(num_workers)
ParallelBackend.__init__(self, executor, futures, verbose=verbose)
self.num_workers = num_workers
self.processes = processes
def __str__(self):
return "<Concurrent futures %s parallel backend, num workers = %d>" % (
("processes" if self.processes else "threads"), self.num_workers)
def set_default_backend(backend):
global DEFAULT_BACKEND
DEFAULT_BACKEND = backend
def get_default_backend():
global DEFAULT_BACKEND
if DEFAULT_BACKEND is None:
set_default_backend(ConcurrentFuturesParallelBackend())
return DEFAULT_BACKEND
...@@ -60,6 +60,7 @@ def test_small_run(): ...@@ -60,6 +60,7 @@ def test_small_run():
"--cv-num-folds", "2", "--cv-num-folds", "2",
"--alleles", "HLA-A0201", "HLA-A0301", "--alleles", "HLA-A0201", "HLA-A0301",
"--verbose", "--verbose",
"--num-local-threads", "2",
] ]
print("Running cv_and_train_command with args: %s " % str(args)) print("Running cv_and_train_command with args: %s " % str(args))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment