Skip to content
Snippets Groups Projects
Commit fc26e687 authored by Tim O'Donnell's avatar Tim O'Donnell
Browse files

Swith to using parallel_backend.map instead of parallel_backend.submit, to be...

Swith to using parallel_backend.map instead of parallel_backend.submit, to be compatable with kubeface
parent bc78f82f
No related merge requests found
...@@ -142,6 +142,7 @@ def cross_validation_folds( ...@@ -142,6 +142,7 @@ def cross_validation_folds(
alleles = train_data.unique_alleles() alleles = train_data.unique_alleles()
result_folds = [] result_folds = []
imputation_args = []
for allele in alleles: for allele in alleles:
logging.info("Allele: %s" % allele) logging.info("Allele: %s" % allele)
cv_iter = train_data.cross_validation_iterator( cv_iter = train_data.cross_validation_iterator(
...@@ -165,27 +166,30 @@ def cross_validation_folds( ...@@ -165,27 +166,30 @@ def cross_validation_folds(
test_split = full_test_split test_split = full_test_split
if imputer is not None: if imputer is not None:
imputation_future = parallel_backend.submit( base_args = dict(impute_kwargs)
impute_and_select_allele, base_args.update(dict(
all_allele_train_split, dataset=all_allele_train_split,
imputer=imputer, imputer=imputer,
allele=allele, allele=allele))
**impute_kwargs) imputation_args.append(base_args)
else:
imputation_future = None
train_split = all_allele_train_split.get_allele(allele) train_split = all_allele_train_split.get_allele(allele)
fold = AlleleSpecificTrainTestFold( fold = AlleleSpecificTrainTestFold(
imputed_train=None, # updated later
allele=allele, allele=allele,
train=train_split, train=train_split,
imputed_train=imputation_future,
test=test_split) test=test_split)
result_folds.append(fold) result_folds.append(fold)
return [ if imputation_args:
result_fold._replace(imputed_train=( assert len(imputation_args) == len(result_folds)
result_fold.imputed_train.result() imputation_results = parallel_backend.map(
if result_fold.imputed_train is not None lambda kwargs: impute_and_select_allele(**kwargs),
else None)) imputation_args)
for result_fold in result_folds
] return [
result_fold._replace(imputed_train=imputation_result)
for (result_fold, imputation_result) in zip(
result_folds, imputation_results)
]
return result_fold
...@@ -314,23 +314,24 @@ def go(args): ...@@ -314,23 +314,24 @@ def go(args):
logging.info("") logging.info("")
train_folds = [] train_folds = []
train_models = [] train_models = []
imputation_args_list = []
best_architectures = []
for (allele_num, allele) in enumerate(cv_results.allele.unique()): for (allele_num, allele) in enumerate(cv_results.allele.unique()):
best_index = best_architectures_by_allele[allele] best_index = best_architectures_by_allele[allele]
architecture = model_architectures[best_index] architecture = model_architectures[best_index]
best_architectures.append(architecture)
train_models.append(architecture) train_models.append(architecture)
logging.info( logging.info(
"Allele: %s best architecture is index %d: %s" % "Allele: %s best architecture is index %d: %s" %
(allele, best_index, architecture)) (allele, best_index, architecture))
if architecture['impute']: if architecture['impute']:
imputation_future = backend.submit( imputation_args = dict(impute_kwargs)
impute_and_select_allele, imputation_args.update(dict(
train_data, dataset=train_data,
imputer=imputer, imputer=imputer,
allele=allele, allele=allele))
**impute_kwargs) imputation_args_list.append(imputation_args)
else:
imputation_future = None
test_data_this_allele = None test_data_this_allele = None
if test_data is not None: if test_data is not None:
...@@ -338,25 +339,26 @@ def go(args): ...@@ -338,25 +339,26 @@ def go(args):
fold = AlleleSpecificTrainTestFold( fold = AlleleSpecificTrainTestFold(
allele=allele, allele=allele,
train=train_data.get_allele(allele), train=train_data.get_allele(allele),
imputed_train=None,
# Here we set imputed_train to the imputation *task* if
# imputation was used on this fold. We set this to the actual
# imputed training dataset a few lines farther down. This
# complexity is because we want to be able to parallelize
# the imputations so we have to queue up the tasks first.
# If we are not doing imputation then the imputation_task
# is None.
imputed_train=imputation_future,
test=test_data_this_allele) test=test_data_this_allele)
train_folds.append(fold) train_folds.append(fold)
train_folds = [ if imputation_args_list:
result_fold._replace(imputed_train=( imputation_results = list(backend.map(
result_fold.imputed_train.result() lambda kwargs: impute_and_select_allele(**kwargs),
if result_fold.imputed_train is not None imputation_args_list))
else None))
for result_fold in train_folds new_train_folds = []
] for (best_architecture, train_fold) in zip(
best_architectures, train_folds):
imputed_train = None
if best_architecture['impute']:
imputed_train = imputation_results.pop(0)
new_train_folds.append(
train_fold._replace(imputed_train=imputed_train))
assert not imputation_results
train_folds = new_train_folds
logging.info("Training %d production models" % len(train_folds)) logging.info("Training %d production models" % len(train_folds))
start = time.time() start = time.time()
......
...@@ -14,28 +14,6 @@ class ParallelBackend(object): ...@@ -14,28 +14,6 @@ class ParallelBackend(object):
self.module = module self.module = module
self.verbose = verbose self.verbose = verbose
def submit(self, func, *args, **kwargs):
if self.verbose > 0:
logging.debug("Submitting: %s %s %s" % (func, args, kwargs))
return self.executor.submit(func, *args, **kwargs)
def map(self, func, iterable):
fs = [
self.executor.submit(func, arg) for arg in iterable
]
return self.wait(fs)
def wait(self, fs):
result_dict = {}
for finished_future in self.module.as_completed(fs):
result = finished_future.result()
logging.info("%3d / %3d tasks completed" % (
len(result_dict), len(fs)))
result_dict[finished_future] = result
return [result_dict[future] for future in fs]
class KubefaceParallelBackend(ParallelBackend): class KubefaceParallelBackend(ParallelBackend):
""" """
ParallelBackend that uses kubeface ParallelBackend that uses kubeface
...@@ -61,6 +39,22 @@ class DaskDistributedParallelBackend(ParallelBackend): ...@@ -61,6 +39,22 @@ class DaskDistributedParallelBackend(ParallelBackend):
ParallelBackend.__init__(self, executor, distributed, verbose=verbose) ParallelBackend.__init__(self, executor, distributed, verbose=verbose)
self.scheduler_ip_and_port = scheduler_ip_and_port self.scheduler_ip_and_port = scheduler_ip_and_port
def map(self, func, iterable):
fs = [
self.executor.submit(func, arg) for arg in iterable
]
return self.wait(fs)
def wait(self, fs):
result_dict = {}
for finished_future in self.module.as_completed(fs):
result = finished_future.result()
logging.info("%3d / %3d tasks completed" % (
len(result_dict), len(fs)))
result_dict[finished_future] = result
return [result_dict[future] for future in fs]
def __str__(self): def __str__(self):
return "<Dask distributed backend, scheduler=%s, total_cores=%d>" % ( return "<Dask distributed backend, scheduler=%s, total_cores=%d>" % (
self.scheduler_ip_and_port, self.scheduler_ip_and_port,
...@@ -85,6 +79,22 @@ class ConcurrentFuturesParallelBackend(ParallelBackend): ...@@ -85,6 +79,22 @@ class ConcurrentFuturesParallelBackend(ParallelBackend):
return "<Concurrent futures %s parallel backend, num workers = %d>" % ( return "<Concurrent futures %s parallel backend, num workers = %d>" % (
("processes" if self.processes else "threads"), self.num_workers) ("processes" if self.processes else "threads"), self.num_workers)
def map(self, func, iterable):
fs = [
self.executor.submit(func, arg) for arg in iterable
]
return self.wait(fs)
def wait(self, fs):
result_dict = {}
for finished_future in self.module.as_completed(fs):
result = finished_future.result()
logging.info("%3d / %3d tasks completed" % (
len(result_dict), len(fs)))
result_dict[finished_future] = result
return [result_dict[future] for future in fs]
def set_default_backend(backend): def set_default_backend(backend):
global DEFAULT_BACKEND global DEFAULT_BACKEND
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment