Skip to content
Snippets Groups Projects
Commit 8f08993d authored by Tim O'Donnell's avatar Tim O'Donnell Committed by GitHub
Browse files

Merge pull request #81 from hammerlab/kubeface-model-training

Kubeface model training
parents 41c0c018 9d5d36b0
No related merge requests found
......@@ -2,19 +2,20 @@
This download contains trained MHC Class I allele-specific MHCflurry models. The training data used is in the [data_combined_iedb_kim2014](../data_combined_iedb_kim2014) MHCflurry download. We first select network hyperparameters for each allele individually using cross validation over the models enumerated in [models.py](models.py). The best hyperparameter settings are selected via average of AUC (at 500nm), F1, and Kendall's Tau over the training folds. We then train the production models over the full training set using the selected hyperparameters.
The training script supports multi-node parallel execution using the [dask-distributed](https://distributed.readthedocs.io/en/latest/) library. To enable this, pass the IP and port of the dask scheduler to the training script with the '--dask-scheduler' option. The GENERATE.sh script passes all arguments to the training script so you can just give it as an argument to GENERATE.sh.
The training script supports multi-node parallel execution using the [kubeface](https://github.com/hammerlab/kubeface) librarie.
We run dask distributed on Google Container Engine using Kubernetes as described [here](https://github.com/hammerlab/dask-distributed-on-kubernetes).
To use kubeface, you should make a google storage bucket and pass it below with the --storage-prefix argument.
To generate this download we run:
```
# If you are running dask distributed using our kubernetes config, you can use the DASK_IP one liner below.
# Otherwise, just set it to the IP of the dask scheduler.
DASK_IP=$(kubectl get service | grep daskd-scheduler | tr -s ' ' | cut -d ' ' -f 3)
./GENERATE.sh \
--joblib-num-jobs 100 \
--joblib-pre-dispatch all \
--cv-folds-per-task 10 \
--dask-scheduler $DASK_IP:8786
--backend kubernetes \
--storage-prefix gs://kubeface \
--worker-image hammerlab/mhcflurry:latest \
--kubernetes-task-resources-memory-mb 10000 \
--worker-path-prefix venv-py3/bin \
--max-simultaneous-tasks 200 \
```
......@@ -3,13 +3,12 @@ from mhcflurry.class1_allele_specific.train import HYPERPARAMETER_DEFAULTS
import json
models = HYPERPARAMETER_DEFAULTS.models_grid(
#impute=[False, True],
impute=[False],
impute=[False, True],
activation=["tanh"],
layer_sizes=[[12], [64], [128]],
embedding_output_dim=[8, 32, 64],
dropout_probability=[0, .1, .25],
# fraction_negative=[0, .1, .2],
fraction_negative=[0, .1, .2],
n_training_epochs=[250])
sys.stderr.write("Models: %d\n" % len(models))
......
......@@ -142,6 +142,7 @@ def cross_validation_folds(
alleles = train_data.unique_alleles()
result_folds = []
imputation_args = []
for allele in alleles:
logging.info("Allele: %s" % allele)
cv_iter = train_data.cross_validation_iterator(
......@@ -165,27 +166,33 @@ def cross_validation_folds(
test_split = full_test_split
if imputer is not None:
imputation_future = parallel_backend.submit(
impute_and_select_allele,
all_allele_train_split,
base_args = dict(impute_kwargs)
base_args.update(dict(
dataset=all_allele_train_split,
imputer=imputer,
allele=allele,
**impute_kwargs)
else:
imputation_future = None
allele=allele))
imputation_args.append(base_args)
train_split = all_allele_train_split.get_allele(allele)
fold = AlleleSpecificTrainTestFold(
imputed_train=None, # updated later
allele=allele,
train=train_split,
imputed_train=imputation_future,
test=test_split)
result_folds.append(fold)
return [
result_fold._replace(imputed_train=(
result_fold.imputed_train.result()
if result_fold.imputed_train is not None
else None))
for result_fold in result_folds
]
if imputation_args:
assert len(imputation_args) == len(result_folds)
imputation_results = parallel_backend.map(
lambda kwargs: impute_and_select_allele(**kwargs),
imputation_args)
# Here _replace is a method on named tuples that returns a new named
# tuple with the specified key set to the given value and all other key/
# values the same as the original.
return [
result_fold._replace(imputed_train=imputation_result)
for (result_fold, imputation_result) in zip(
result_folds, imputation_results)
]
return result_folds
......@@ -169,6 +169,12 @@ parser.add_argument(
default=False,
help="Output more info")
try:
import kubeface
kubeface.Client.add_args(parser)
except ImportError:
logging.error("Kubeface support disabled, not installed.")
def run(argv=sys.argv[1:]):
args = parser.parse_args(argv)
......@@ -183,6 +189,8 @@ def run(argv=sys.argv[1:]):
if args.dask_scheduler:
backend = parallelism.DaskDistributedParallelBackend(
args.dask_scheduler)
elif hasattr(args, 'storage_prefix') and args.storage_prefix:
backend = parallelism.KubefaceParallelBackend(args)
else:
if args.num_local_processes:
backend = parallelism.ConcurrentFuturesParallelBackend(
......@@ -306,23 +314,24 @@ def go(args):
logging.info("")
train_folds = []
train_models = []
imputation_args_list = []
best_architectures = []
for (allele_num, allele) in enumerate(cv_results.allele.unique()):
best_index = best_architectures_by_allele[allele]
architecture = model_architectures[best_index]
best_architectures.append(architecture)
train_models.append(architecture)
logging.info(
"Allele: %s best architecture is index %d: %s" %
(allele, best_index, architecture))
if architecture['impute']:
imputation_future = backend.submit(
impute_and_select_allele,
train_data,
imputation_args = dict(impute_kwargs)
imputation_args.update(dict(
dataset=train_data,
imputer=imputer,
allele=allele,
**impute_kwargs)
else:
imputation_future = None
allele=allele))
imputation_args_list.append(imputation_args)
test_data_this_allele = None
if test_data is not None:
......@@ -330,25 +339,26 @@ def go(args):
fold = AlleleSpecificTrainTestFold(
allele=allele,
train=train_data.get_allele(allele),
# Here we set imputed_train to the imputation *task* if
# imputation was used on this fold. We set this to the actual
# imputed training dataset a few lines farther down. This
# complexity is because we want to be able to parallelize
# the imputations so we have to queue up the tasks first.
# If we are not doing imputation then the imputation_task
# is None.
imputed_train=imputation_future,
imputed_train=None,
test=test_data_this_allele)
train_folds.append(fold)
train_folds = [
result_fold._replace(imputed_train=(
result_fold.imputed_train.result()
if result_fold.imputed_train is not None
else None))
for result_fold in train_folds
]
if imputation_args_list:
imputation_results = list(backend.map(
lambda kwargs: impute_and_select_allele(**kwargs),
imputation_args_list))
new_train_folds = []
for (best_architecture, train_fold) in zip(
best_architectures, train_folds):
imputed_train = None
if best_architecture['impute']:
imputed_train = imputation_results.pop(0)
new_train_folds.append(
train_fold._replace(imputed_train=imputed_train))
assert not imputation_results
train_folds = new_train_folds
logging.info("Training %d production models" % len(train_folds))
start = time.time()
......
......@@ -14,10 +14,31 @@ class ParallelBackend(object):
self.module = module
self.verbose = verbose
def submit(self, func, *args, **kwargs):
if self.verbose > 0:
logging.debug("Submitting: %s %s %s" % (func, args, kwargs))
return self.executor.submit(func, *args, **kwargs)
class KubefaceParallelBackend(ParallelBackend):
"""
ParallelBackend that uses kubeface
"""
def __init__(self, args):
from kubeface import Client # pylint: disable=import-error
self.client = Client.from_args(args)
def map(self, func, iterable):
return self.client.map(func, iterable)
def __str__(self):
return "<Kubeface backend, client=%s>" % self.client
class DaskDistributedParallelBackend(ParallelBackend):
"""
ParallelBackend that uses dask.distributed
"""
def __init__(self, scheduler_ip_and_port, verbose=1):
from dask import distributed # pylint: disable=import-error
executor = distributed.Executor(scheduler_ip_and_port)
ParallelBackend.__init__(self, executor, distributed, verbose=verbose)
self.scheduler_ip_and_port = scheduler_ip_and_port
def map(self, func, iterable):
fs = [
......@@ -35,17 +56,6 @@ class ParallelBackend(object):
return [result_dict[future] for future in fs]
class DaskDistributedParallelBackend(ParallelBackend):
"""
ParallelBackend that uses dask.distributed
"""
def __init__(self, scheduler_ip_and_port, verbose=1):
from dask import distributed # pylint: disable=import-error
executor = distributed.Executor(scheduler_ip_and_port)
ParallelBackend.__init__(self, executor, distributed, verbose=verbose)
self.scheduler_ip_and_port = scheduler_ip_and_port
def __str__(self):
return "<Dask distributed backend, scheduler=%s, total_cores=%d>" % (
self.scheduler_ip_and_port,
......@@ -70,6 +80,22 @@ class ConcurrentFuturesParallelBackend(ParallelBackend):
return "<Concurrent futures %s parallel backend, num workers = %d>" % (
("processes" if self.processes else "threads"), self.num_workers)
def map(self, func, iterable):
fs = [
self.executor.submit(func, arg) for arg in iterable
]
return self.wait(fs)
def wait(self, fs):
result_dict = {}
for finished_future in self.module.as_completed(fs):
result = finished_future.result()
logging.info("%3d / %3d tasks completed" % (
len(result_dict), len(fs)))
result_dict[finished_future] = result
return [result_dict[future] for future in fs]
def set_default_backend(backend):
global DEFAULT_BACKEND
......
......@@ -13,6 +13,12 @@ from mhcflurry.class1_allele_specific import cv_and_train_command
from mhcflurry import downloads, predict
from mhcflurry.class1_allele_specific.train import HYPERPARAMETER_DEFAULTS
try:
import kubeface
KUBEFACE_INSTALLED = True
except ImportError:
KUBEFACE_INSTALLED = False
def test_small_run():
base_temp_dir = tempfile.mkdtemp()
......@@ -62,6 +68,11 @@ def test_small_run():
"--verbose",
"--num-local-threads", "1",
]
if KUBEFACE_INSTALLED:
# If kubeface is installed, then this command will by default use it.
# In that case, we want to have the kubeface storage written to a
# local file and not assume the existence of a google storage bucket.
args.extend(["--storage-prefix", "/tmp/"])
print("Running cv_and_train_command with args: %s " % str(args))
cv_and_train_command.run(args)
......
......@@ -77,7 +77,7 @@ def test_cross_validation_with_imputation():
n_imputations=2, n_burn_in=1, n_nearest_columns=25)
train_data = (
mhcflurry.dataset.Dataset.from_csv(
get_path("data_kim2014" , "bdata.2009.mhci.public.1.txt"))
get_path("data_kim2014", "bdata.2009.mhci.public.1.txt"))
.get_alleles(["HLA-A0201", "HLA-A0202", "HLA-A0301"]))
folds = cross_validation_folds(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment