diff --git a/downloads-generation/models_class1_allele_specific_single/README.md b/downloads-generation/models_class1_allele_specific_single/README.md index 58b6880f496aadb9e36f5ab6fdd434ae2feee9f7..0003e3cd699e1122401f2797060815a7ab768d00 100644 --- a/downloads-generation/models_class1_allele_specific_single/README.md +++ b/downloads-generation/models_class1_allele_specific_single/README.md @@ -2,19 +2,20 @@ This download contains trained MHC Class I allele-specific MHCflurry models. The training data used is in the [data_combined_iedb_kim2014](../data_combined_iedb_kim2014) MHCflurry download. We first select network hyperparameters for each allele individually using cross validation over the models enumerated in [models.py](models.py). The best hyperparameter settings are selected via average of AUC (at 500nm), F1, and Kendall's Tau over the training folds. We then train the production models over the full training set using the selected hyperparameters. -The training script supports multi-node parallel execution using the [dask-distributed](https://distributed.readthedocs.io/en/latest/) library. To enable this, pass the IP and port of the dask scheduler to the training script with the '--dask-scheduler' option. The GENERATE.sh script passes all arguments to the training script so you can just give it as an argument to GENERATE.sh. +The training script supports multi-node parallel execution using the [kubeface](https://github.com/hammerlab/kubeface) librarie. -We run dask distributed on Google Container Engine using Kubernetes as described [here](https://github.com/hammerlab/dask-distributed-on-kubernetes). +To use kubeface, you should make a google storage bucket and pass it below with the --storage-prefix argument. To generate this download we run: ``` -# If you are running dask distributed using our kubernetes config, you can use the DASK_IP one liner below. -# Otherwise, just set it to the IP of the dask scheduler. -DASK_IP=$(kubectl get service | grep daskd-scheduler | tr -s ' ' | cut -d ' ' -f 3) ./GENERATE.sh \ - --joblib-num-jobs 100 \ - --joblib-pre-dispatch all \ --cv-folds-per-task 10 \ - --dask-scheduler $DASK_IP:8786 + --backend kubernetes \ + --storage-prefix gs://kubeface \ + --worker-image hammerlab/mhcflurry:latest \ + --kubernetes-task-resources-memory-mb 10000 \ + --worker-path-prefix venv-py3/bin \ + --max-simultaneous-tasks 200 \ + ``` diff --git a/downloads-generation/models_class1_allele_specific_single/models.py b/downloads-generation/models_class1_allele_specific_single/models.py index 6375cd4510bfebadd4df529a6884c1eb1632f162..30f8e3d5e0a5cbdc3426147d118d3fd22edae304 100644 --- a/downloads-generation/models_class1_allele_specific_single/models.py +++ b/downloads-generation/models_class1_allele_specific_single/models.py @@ -3,13 +3,12 @@ from mhcflurry.class1_allele_specific.train import HYPERPARAMETER_DEFAULTS import json models = HYPERPARAMETER_DEFAULTS.models_grid( - #impute=[False, True], - impute=[False], + impute=[False, True], activation=["tanh"], layer_sizes=[[12], [64], [128]], embedding_output_dim=[8, 32, 64], dropout_probability=[0, .1, .25], - # fraction_negative=[0, .1, .2], + fraction_negative=[0, .1, .2], n_training_epochs=[250]) sys.stderr.write("Models: %d\n" % len(models)) diff --git a/mhcflurry/class1_allele_specific/cross_validation.py b/mhcflurry/class1_allele_specific/cross_validation.py index 5ceeb4b729e6f13c8dd9fa2ff4d5e12839ace6e1..1bf0038fdde8f9ab2a6808693bc18d0e93fdba75 100644 --- a/mhcflurry/class1_allele_specific/cross_validation.py +++ b/mhcflurry/class1_allele_specific/cross_validation.py @@ -142,6 +142,7 @@ def cross_validation_folds( alleles = train_data.unique_alleles() result_folds = [] + imputation_args = [] for allele in alleles: logging.info("Allele: %s" % allele) cv_iter = train_data.cross_validation_iterator( @@ -165,27 +166,33 @@ def cross_validation_folds( test_split = full_test_split if imputer is not None: - imputation_future = parallel_backend.submit( - impute_and_select_allele, - all_allele_train_split, + base_args = dict(impute_kwargs) + base_args.update(dict( + dataset=all_allele_train_split, imputer=imputer, - allele=allele, - **impute_kwargs) - else: - imputation_future = None + allele=allele)) + imputation_args.append(base_args) train_split = all_allele_train_split.get_allele(allele) fold = AlleleSpecificTrainTestFold( + imputed_train=None, # updated later allele=allele, train=train_split, - imputed_train=imputation_future, test=test_split) result_folds.append(fold) - return [ - result_fold._replace(imputed_train=( - result_fold.imputed_train.result() - if result_fold.imputed_train is not None - else None)) - for result_fold in result_folds - ] + if imputation_args: + assert len(imputation_args) == len(result_folds) + imputation_results = parallel_backend.map( + lambda kwargs: impute_and_select_allele(**kwargs), + imputation_args) + + # Here _replace is a method on named tuples that returns a new named + # tuple with the specified key set to the given value and all other key/ + # values the same as the original. + return [ + result_fold._replace(imputed_train=imputation_result) + for (result_fold, imputation_result) in zip( + result_folds, imputation_results) + ] + return result_folds diff --git a/mhcflurry/class1_allele_specific/cv_and_train_command.py b/mhcflurry/class1_allele_specific/cv_and_train_command.py index 075e8e3761c5421efc3f7f6d2f30ed30d2e8b565..dac1c4e1a5f02db16ef0e9eb5907a2db80cd04e2 100644 --- a/mhcflurry/class1_allele_specific/cv_and_train_command.py +++ b/mhcflurry/class1_allele_specific/cv_and_train_command.py @@ -169,6 +169,12 @@ parser.add_argument( default=False, help="Output more info") +try: + import kubeface + kubeface.Client.add_args(parser) +except ImportError: + logging.error("Kubeface support disabled, not installed.") + def run(argv=sys.argv[1:]): args = parser.parse_args(argv) @@ -183,6 +189,8 @@ def run(argv=sys.argv[1:]): if args.dask_scheduler: backend = parallelism.DaskDistributedParallelBackend( args.dask_scheduler) + elif hasattr(args, 'storage_prefix') and args.storage_prefix: + backend = parallelism.KubefaceParallelBackend(args) else: if args.num_local_processes: backend = parallelism.ConcurrentFuturesParallelBackend( @@ -306,23 +314,24 @@ def go(args): logging.info("") train_folds = [] train_models = [] + imputation_args_list = [] + best_architectures = [] for (allele_num, allele) in enumerate(cv_results.allele.unique()): best_index = best_architectures_by_allele[allele] architecture = model_architectures[best_index] + best_architectures.append(architecture) train_models.append(architecture) logging.info( "Allele: %s best architecture is index %d: %s" % (allele, best_index, architecture)) if architecture['impute']: - imputation_future = backend.submit( - impute_and_select_allele, - train_data, + imputation_args = dict(impute_kwargs) + imputation_args.update(dict( + dataset=train_data, imputer=imputer, - allele=allele, - **impute_kwargs) - else: - imputation_future = None + allele=allele)) + imputation_args_list.append(imputation_args) test_data_this_allele = None if test_data is not None: @@ -330,25 +339,26 @@ def go(args): fold = AlleleSpecificTrainTestFold( allele=allele, train=train_data.get_allele(allele), - - # Here we set imputed_train to the imputation *task* if - # imputation was used on this fold. We set this to the actual - # imputed training dataset a few lines farther down. This - # complexity is because we want to be able to parallelize - # the imputations so we have to queue up the tasks first. - # If we are not doing imputation then the imputation_task - # is None. - imputed_train=imputation_future, + imputed_train=None, test=test_data_this_allele) train_folds.append(fold) - train_folds = [ - result_fold._replace(imputed_train=( - result_fold.imputed_train.result() - if result_fold.imputed_train is not None - else None)) - for result_fold in train_folds - ] + if imputation_args_list: + imputation_results = list(backend.map( + lambda kwargs: impute_and_select_allele(**kwargs), + imputation_args_list)) + + new_train_folds = [] + for (best_architecture, train_fold) in zip( + best_architectures, train_folds): + imputed_train = None + if best_architecture['impute']: + imputed_train = imputation_results.pop(0) + new_train_folds.append( + train_fold._replace(imputed_train=imputed_train)) + assert not imputation_results + + train_folds = new_train_folds logging.info("Training %d production models" % len(train_folds)) start = time.time() diff --git a/mhcflurry/parallelism.py b/mhcflurry/parallelism.py index 18008b4e9057c3b4e0c3b9c6d1a2db81522beb5f..faecc871469210058ebb11f3323ae0a1ccdc20ce 100644 --- a/mhcflurry/parallelism.py +++ b/mhcflurry/parallelism.py @@ -14,10 +14,31 @@ class ParallelBackend(object): self.module = module self.verbose = verbose - def submit(self, func, *args, **kwargs): - if self.verbose > 0: - logging.debug("Submitting: %s %s %s" % (func, args, kwargs)) - return self.executor.submit(func, *args, **kwargs) + +class KubefaceParallelBackend(ParallelBackend): + """ + ParallelBackend that uses kubeface + """ + def __init__(self, args): + from kubeface import Client # pylint: disable=import-error + self.client = Client.from_args(args) + + def map(self, func, iterable): + return self.client.map(func, iterable) + + def __str__(self): + return "<Kubeface backend, client=%s>" % self.client + + +class DaskDistributedParallelBackend(ParallelBackend): + """ + ParallelBackend that uses dask.distributed + """ + def __init__(self, scheduler_ip_and_port, verbose=1): + from dask import distributed # pylint: disable=import-error + executor = distributed.Executor(scheduler_ip_and_port) + ParallelBackend.__init__(self, executor, distributed, verbose=verbose) + self.scheduler_ip_and_port = scheduler_ip_and_port def map(self, func, iterable): fs = [ @@ -35,17 +56,6 @@ class ParallelBackend(object): return [result_dict[future] for future in fs] - -class DaskDistributedParallelBackend(ParallelBackend): - """ - ParallelBackend that uses dask.distributed - """ - def __init__(self, scheduler_ip_and_port, verbose=1): - from dask import distributed # pylint: disable=import-error - executor = distributed.Executor(scheduler_ip_and_port) - ParallelBackend.__init__(self, executor, distributed, verbose=verbose) - self.scheduler_ip_and_port = scheduler_ip_and_port - def __str__(self): return "<Dask distributed backend, scheduler=%s, total_cores=%d>" % ( self.scheduler_ip_and_port, @@ -70,6 +80,22 @@ class ConcurrentFuturesParallelBackend(ParallelBackend): return "<Concurrent futures %s parallel backend, num workers = %d>" % ( ("processes" if self.processes else "threads"), self.num_workers) + def map(self, func, iterable): + fs = [ + self.executor.submit(func, arg) for arg in iterable + ] + return self.wait(fs) + + def wait(self, fs): + result_dict = {} + for finished_future in self.module.as_completed(fs): + result = finished_future.result() + logging.info("%3d / %3d tasks completed" % ( + len(result_dict), len(fs))) + result_dict[finished_future] = result + + return [result_dict[future] for future in fs] + def set_default_backend(backend): global DEFAULT_BACKEND diff --git a/test/test_class1_allele_specific_cv_and_train_command.py b/test/test_class1_allele_specific_cv_and_train_command.py index f6e52d87a4aa73ba6555f4d0c86c493b58a48a85..bcc71db1e63e8f35e0e5be41f907943c1d7d650a 100644 --- a/test/test_class1_allele_specific_cv_and_train_command.py +++ b/test/test_class1_allele_specific_cv_and_train_command.py @@ -13,6 +13,12 @@ from mhcflurry.class1_allele_specific import cv_and_train_command from mhcflurry import downloads, predict from mhcflurry.class1_allele_specific.train import HYPERPARAMETER_DEFAULTS +try: + import kubeface + KUBEFACE_INSTALLED = True +except ImportError: + KUBEFACE_INSTALLED = False + def test_small_run(): base_temp_dir = tempfile.mkdtemp() @@ -62,6 +68,11 @@ def test_small_run(): "--verbose", "--num-local-threads", "1", ] + if KUBEFACE_INSTALLED: + # If kubeface is installed, then this command will by default use it. + # In that case, we want to have the kubeface storage written to a + # local file and not assume the existence of a google storage bucket. + args.extend(["--storage-prefix", "/tmp/"]) print("Running cv_and_train_command with args: %s " % str(args)) cv_and_train_command.run(args) diff --git a/test/test_cross_validation.py b/test/test_cross_validation.py index cf95333b81c993ef0282af397abd85658f737d8b..c0d4296e815dad534d8cb8f46b799dd0343f9f69 100644 --- a/test/test_cross_validation.py +++ b/test/test_cross_validation.py @@ -77,7 +77,7 @@ def test_cross_validation_with_imputation(): n_imputations=2, n_burn_in=1, n_nearest_columns=25) train_data = ( mhcflurry.dataset.Dataset.from_csv( - get_path("data_kim2014" , "bdata.2009.mhci.public.1.txt")) + get_path("data_kim2014", "bdata.2009.mhci.public.1.txt")) .get_alleles(["HLA-A0201", "HLA-A0202", "HLA-A0301"])) folds = cross_validation_folds(