Skip to content
Snippets Groups Projects
Commit eeb07c9a authored by Tim O'Donnell's avatar Tim O'Donnell Committed by GitHub
Browse files

Merge pull request #60 from hammerlab/additions2

Kim2014 comparison, mhcflurry-predict tool, reimplement parallelization with futures
parents 770a1e21 00d545ec
No related branches found
No related tags found
No related merge requests found
Showing
with 1782 additions and 166 deletions
...@@ -35,10 +35,17 @@ install: ...@@ -35,10 +35,17 @@ install:
- pip install -r requirements.txt - pip install -r requirements.txt
- pip install . - pip install .
- pip install coveralls - pip install coveralls
env:
global:
- PYTHONHASHSEED=0
matrix:
# Enable this eventually after getting tensorflow to build on travis:
# - KERAS_BACKEND=theano KERAS_BACKEND=tensorflow
- KERAS_BACKEND=theano
script: script:
# download data and models, then run tests # download data and models, then run tests
- mhcflurry-downloads fetch - mhcflurry-downloads fetch
- mhcflurry-downloads info # just to test this command works - mhcflurry-downloads info # just to test this command works
- PYTHONHASHSEED=0 nosetests test --with-coverage --cover-package=mhcflurry && ./lint.sh - nosetests test --with-coverage --cover-package=mhcflurry && ./lint.sh
after_success: after_success:
coveralls coveralls
...@@ -56,6 +56,9 @@ RUN virtualenv venv-py3 --python=python3 && \ ...@@ -56,6 +56,9 @@ RUN virtualenv venv-py3 --python=python3 && \
scikit-learn \ scikit-learn \
seaborn seaborn
ENV KERAS_BACKEND theano
# RUN venv-py3/bin/pip install --upgrade https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-0.10.0-cp35-cp35m-linux_x86_64.whl
# Install mhcflurry and download data and models. # Install mhcflurry and download data and models.
COPY . ./mhcflurry COPY . ./mhcflurry
RUN venv-py3/bin/pip install ./mhcflurry && venv-py3/bin/mhcflurry-downloads fetch RUN venv-py3/bin/pip install ./mhcflurry && venv-py3/bin/mhcflurry-downloads fetch
......
...@@ -31,7 +31,24 @@ From a checkout you can run the unit tests with: ...@@ -31,7 +31,24 @@ From a checkout you can run the unit tests with:
nosetests . nosetests .
``` ```
## Making predictions ## Making predictions from the command-line
```shell
$ mhcflurry-predict --alleles HLA-A0201 HLA-A0301 --peptides SIINFEKL SIINFEKD SIINFEKQ
Predicting for 2 alleles and 3 peptides = 6 predictions
allele,peptide,mhcflurry_prediction
HLA-A0201,SIINFEKL,10672.34765625
HLA-A0201,SIINFEKD,26042.716796875
HLA-A0201,SIINFEKQ,26375.794921875
HLA-A0301,SIINFEKL,25532.703125
HLA-A0301,SIINFEKD,24997.876953125
HLA-A0301,SIINFEKQ,28262.828125
```
You can also specify the input and output as CSV files. Run `mhcflurry-predict -h` for details.
## Making predictions from Python
```python ```python
from mhcflurry import predict from mhcflurry import predict
...@@ -47,7 +64,9 @@ The predictions returned by `predict` are affinities (KD) in nM. ...@@ -47,7 +64,9 @@ The predictions returned by `predict` are affinities (KD) in nM.
## Training your own models ## Training your own models
This [unit test](https://github.com/hammerlab/mhcflurry/blob/master/test/test_class1_binding_predictor_A0205.py) gives a simple example of how to train a predictor in Python. There is also a script called `mhcflurry-class1-allele-specific-cv-and-train` that will perform cross validation and model selection given a CSV file of training data. Try `mhcflurry-class1-allele-specific-cv-and-train --help` for details. See the [class1_allele_specific_models.ipynb](https://github.com/hammerlab/mhcflurry/blob/master/examples/class1_allele_specific_models.ipynb) notebook for an overview of the Python API, including predicting, fitting, and scoring models.
There is also a script called `mhcflurry-class1-allele-specific-cv-and-train` that will perform cross validation and model selection given a CSV file of training data. Try `mhcflurry-class1-allele-specific-cv-and-train --help` for details.
## Details on the downloaded class I allele-specific models ## Details on the downloaded class I allele-specific models
......
...@@ -13,6 +13,7 @@ DOWNLOAD_NAME=models_class1_allele_specific_single ...@@ -13,6 +13,7 @@ DOWNLOAD_NAME=models_class1_allele_specific_single
SCRATCH_DIR=/tmp/mhcflurry-downloads-generation SCRATCH_DIR=/tmp/mhcflurry-downloads-generation
SCRIPT_ABSOLUTE_PATH="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/$(basename "${BASH_SOURCE[0]}")" SCRIPT_ABSOLUTE_PATH="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/$(basename "${BASH_SOURCE[0]}")"
SCRIPT_DIR=$(dirname "$SCRIPT_ABSOLUTE_PATH") SCRIPT_DIR=$(dirname "$SCRIPT_ABSOLUTE_PATH")
export PYTHONUNBUFFERED=1
mkdir -p "$SCRATCH_DIR" mkdir -p "$SCRATCH_DIR"
rm -rf "$SCRATCH_DIR/$DOWNLOAD_NAME" rm -rf "$SCRATCH_DIR/$DOWNLOAD_NAME"
......
#!/bin/bash
if [[ $# -eq 0 ]] ; then
echo 'WARNING: This script is intended to be called with additional arguments to pass to mhcflurry-class1-allele-specific-cv-and-train'
echo 'At minimum you probably want to pass --dask-scheduler <IP:PORT> as training many models on one node is extremely '
echo 'slow.'
fi
set -e
set -x
DOWNLOAD_NAME=models_class1_allele_specific_single_kim2014_only
SCRATCH_DIR=/tmp/mhcflurry-downloads-generation
SCRIPT_ABSOLUTE_PATH="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/$(basename "${BASH_SOURCE[0]}")"
SCRIPT_DIR=$(dirname "$SCRIPT_ABSOLUTE_PATH")
export PYTHONUNBUFFERED=1
mkdir -p "$SCRATCH_DIR"
rm -rf "$SCRATCH_DIR/$DOWNLOAD_NAME"
mkdir "$SCRATCH_DIR/$DOWNLOAD_NAME"
# Send stdout and stderr to a logfile included with the archive.
exec > >(tee -ia "$SCRATCH_DIR/$DOWNLOAD_NAME/LOG.txt")
exec 2> >(tee -ia "$SCRATCH_DIR/$DOWNLOAD_NAME/LOG.txt" >&2)
# Log some environment info
date
pip freeze
git rev-parse HEAD
git status
cd $SCRATCH_DIR/$DOWNLOAD_NAME
mkdir models
cp $SCRIPT_DIR/models.py $SCRIPT_DIR/imputer.json .
python models.py > models.json
time mhcflurry-class1-allele-specific-cv-and-train \
--model-architectures models.json \
--imputer-description imputer.json \
--train-data "$(mhcflurry-downloads path data_kim2014)/bdata.2009.mhci.public.1.txt" \
--test-data "$(mhcflurry-downloads path data_kim2014)/bdata.2013.mhci.public.blind.1.txt" \
--min-samples-per-allele 50 \
--out-cv-results cv.csv \
--out-production-results production.csv \
--out-models models \
--verbose \
"$@"
cp $SCRIPT_ABSOLUTE_PATH .
tar -cjf "../${DOWNLOAD_NAME}.tar.bz2" *
echo "Created archive: $SCRATCH_DIR/$DOWNLOAD_NAME.tar.bz2"
# Class I allele specific models (single) trained and tested in Kim 2014 dataset
This is a reimplementation of the analysis in [Predicting Peptide-MHC Binding Affinities With Imputed Training Data](http://biorxiv.org/content/early/2016/05/22/054775).
{
"imputation_method_name": "mice",
"n_burn_in": 5,
"n_imputations": 50,
"n_nearest_columns": 25,
"min_observations_per_peptide": 2,
"min_observations_per_allele": 2
}
import sys
from mhcflurry.class1_allele_specific.train import HYPERPARAMETER_DEFAULTS
import json
models = HYPERPARAMETER_DEFAULTS.models_grid(
#impute=[False, True],
impute=[False],
activation=["tanh"],
layer_sizes=[[12], [64], [128]],
embedding_output_dim=[8, 32, 64],
dropout_probability=[0, .1, .25],
# fraction_negative=[0, .1, .2],
n_training_epochs=[250])
sys.stderr.write("Models: %d\n" % len(models))
print(json.dumps(models, indent=4))
This diff is collapsed.
...@@ -18,8 +18,6 @@ from .predict import predict ...@@ -18,8 +18,6 @@ from .predict import predict
from .package_metadata import __version__ from .package_metadata import __version__
from . import parallelism from . import parallelism
parallelism.configure_joblib()
__all__ = [ __all__ = [
"Class1BindingPredictor", "Class1BindingPredictor",
"predict", "predict",
......
...@@ -20,11 +20,10 @@ from __future__ import ( ...@@ -20,11 +20,10 @@ from __future__ import (
import collections import collections
import logging import logging
from joblib import Parallel, delayed
import pepdata import pepdata
from .train import impute_and_select_allele, AlleleSpecificTrainTestFold from .train import impute_and_select_allele, AlleleSpecificTrainTestFold
from ..parallelism import get_default_backend
gbmr4_transformer = pepdata.reduced_alphabet.make_alphabet_transformer("gbmr4") gbmr4_transformer = pepdata.reduced_alphabet.make_alphabet_transformer("gbmr4")
...@@ -100,9 +99,7 @@ def cross_validation_folds( ...@@ -100,9 +99,7 @@ def cross_validation_folds(
'min_observations_per_peptide': 2, 'min_observations_per_peptide': 2,
'min_observations_per_allele': 2, 'min_observations_per_allele': 2,
}, },
n_jobs=1, parallel_backend=None):
verbose=0,
pre_dispatch='2*n_jobs'):
''' '''
Split a Dataset into n_folds cross validation folds for each allele, Split a Dataset into n_folds cross validation folds for each allele,
optionally performing imputation. optionally performing imputation.
...@@ -129,30 +126,22 @@ def cross_validation_folds( ...@@ -129,30 +126,22 @@ def cross_validation_folds(
impute_kwargs : dict, optional impute_kwargs : dict, optional
Additional kwargs to pass to mhcflurry.Dataset.impute_missing_values. Additional kwargs to pass to mhcflurry.Dataset.impute_missing_values.
n_jobs : integer, optional parallel_backend : mhcflurry.parallelism.ParallelBackend, optional
The number of jobs to run in parallel. If -1, then the number of jobs Futures implementation to use for running on multiple threads,
is set to the number of cores. processes, or nodes
verbose : integer, optional
The joblib verbosity. If non zero, progress messages are printed. Above
50, the output is sent to stdout. The frequency of the messages
increases with the verbosity level. If it more than 10, all iterations
are reported.
pre_dispatch : {"all", integer, or expression, as in "3*n_jobs"}
The number of joblib batches (of tasks) to be pre-dispatched. Default
is "2*n_jobs".
Returns Returns
----------- -----------
list of AlleleSpecificTrainTestFold of length num alleles * n_folds list of AlleleSpecificTrainTestFold of length num alleles * n_folds
''' '''
if parallel_backend is None:
parallel_backend = get_default_backend()
if alleles is None: if alleles is None:
alleles = train_data.unique_alleles() alleles = train_data.unique_alleles()
result = [] result_folds = []
imputation_tasks = []
for allele in alleles: for allele in alleles:
logging.info("Allele: %s" % allele) logging.info("Allele: %s" % allele)
cv_iter = train_data.cross_validation_iterator( cv_iter = train_data.cross_validation_iterator(
...@@ -176,31 +165,27 @@ def cross_validation_folds( ...@@ -176,31 +165,27 @@ def cross_validation_folds(
test_split = full_test_split test_split = full_test_split
if imputer is not None: if imputer is not None:
imputation_tasks.append(delayed(impute_and_select_allele)( imputation_future = parallel_backend.submit(
impute_and_select_allele,
all_allele_train_split, all_allele_train_split,
imputer=imputer, imputer=imputer,
allele=allele, allele=allele,
**impute_kwargs)) **impute_kwargs)
else:
imputation_future = None
train_split = all_allele_train_split.get_allele(allele) train_split = all_allele_train_split.get_allele(allele)
fold = AlleleSpecificTrainTestFold( fold = AlleleSpecificTrainTestFold(
allele=allele, allele=allele,
train=train_split, train=train_split,
imputed_train=None, imputed_train=imputation_future,
test=test_split) test=test_split)
result.append(fold) result_folds.append(fold)
if imputer is not None: return [
imputation_results = Parallel( result_fold._replace(imputed_train=(
n_jobs=n_jobs, result_fold.imputed_train.result()
verbose=verbose, if result_fold.imputed_train is not None
pre_dispatch=pre_dispatch)(imputation_tasks) else None))
for result_fold in result_folds
result = [ ]
result_fold._replace(
imputed_train=imputation_result)
for (imputation_result, result_fold)
in zip(imputation_results, result)
if imputation_result is not None
]
return result
...@@ -22,19 +22,14 @@ What it does: ...@@ -22,19 +22,14 @@ What it does:
Features: Features:
* Supports imputation as a hyperparameter that can be searched over * Supports imputation as a hyperparameter that can be searched over
* Parallelized with joblib * Parallelized with concurrent.futures
Note: Note:
The joblib-based parallelization is primary intended to be used with an The parallelization is primary intended to be used with an
alternative joblib backend such as dask-distributed that supports alternative concurrent.futures Executor such as dask-distributed that supports
multi-node parallelization. Theano in particular seems to have deadlocks multi-node parallelization. Theano in particular seems to have deadlocks
when running with single-node parallelization. when running with single-node parallelization.
Also, when using the multiprocessing backend for joblib (the default),
the 'fork' mode causes a library we use to hang. We have to instead use
the 'spawn' or 'forkserver' modes. See:
https://pythonhosted.org/joblib/parallel.html#bad-interaction-of-multiprocessing-and-third-party-libraries
''' '''
from __future__ import ( from __future__ import (
print_function, print_function,
...@@ -52,8 +47,8 @@ import hashlib ...@@ -52,8 +47,8 @@ import hashlib
import pickle import pickle
import numpy import numpy
import joblib
from .. import parallelism
from ..dataset import Dataset from ..dataset import Dataset
from ..imputation_helpers import imputer_from_name from ..imputation_helpers import imputer_from_name
from .cross_validation import cross_validation_folds from .cross_validation import cross_validation_folds
...@@ -142,18 +137,17 @@ parser.add_argument( ...@@ -142,18 +137,17 @@ parser.add_argument(
help="Host and port of dask distributed scheduler") help="Host and port of dask distributed scheduler")
parser.add_argument( parser.add_argument(
"--joblib-num-jobs", "--num-local-processes",
type=int,
default=1,
metavar="N", metavar="N",
help="Number of joblib workers. Set to -1 to use as many jobs as cores. " type=int,
"Default: %(default)s") help="Processes (exclusive with --dask-scheduler and --num-local-threads)")
parser.add_argument( parser.add_argument(
"--joblib-pre-dispatch", "--num-local-threads",
metavar="STRING", metavar="N",
default='2*n_jobs', type=int,
help="Tasks to initially dispatch to joblib. Default: %(default)s") default=1,
help="Threads (exclusive with --dask-scheduler and --num-local-processes)")
parser.add_argument( parser.add_argument(
"--min-samples-per-allele", "--min-samples-per-allele",
...@@ -178,28 +172,35 @@ parser.add_argument( ...@@ -178,28 +172,35 @@ parser.add_argument(
def run(argv=sys.argv[1:]): def run(argv=sys.argv[1:]):
args = parser.parse_args(argv) args = parser.parse_args(argv)
if not args.quiet:
logging.basicConfig(level="INFO")
if args.verbose: if args.verbose:
logging.basicConfig(level="DEBUG") logging.root.setLevel(level="DEBUG")
elif not args.quiet:
logging.root.setLevel(level="INFO")
logging.info("Running with arguments: %s" % args)
# Set parallel backend
if args.dask_scheduler: if args.dask_scheduler:
import distributed.joblib # for side effects backend = parallelism.DaskDistributedParallelBackend(
backend = joblib.parallel_backend( args.dask_scheduler)
'distributed',
scheduler_host=args.dask_scheduler)
with backend:
active_backend = joblib.parallel.get_active_backend()[0]
logging.info(
"Running with dask scheduler: %s [%d cores]" % (
args.dask_scheduler,
active_backend.effective_n_jobs()))
go(args)
else: else:
go(args) if args.num_local_processes:
backend = parallelism.ConcurrentFuturesParallelBackend(
args.num_local_processes,
processes=True)
else:
backend = parallelism.ConcurrentFuturesParallelBackend(
args.num_local_threads,
processes=False)
parallelism.set_default_backend(backend)
logging.info("Using parallel backend: %s" % backend)
go(args)
def go(args): def go(args):
backend = parallelism.get_default_backend()
model_architectures = json.loads(args.model_architectures.read()) model_architectures = json.loads(args.model_architectures.read())
logging.info("Read %d model architectures" % len(model_architectures)) logging.info("Read %d model architectures" % len(model_architectures))
if args.max_models: if args.max_models:
...@@ -251,10 +252,7 @@ def go(args): ...@@ -251,10 +252,7 @@ def go(args):
imputer=imputer, imputer=imputer,
impute_kwargs=impute_kwargs, impute_kwargs=impute_kwargs,
drop_similar_peptides=True, drop_similar_peptides=True,
alleles=args.alleles, alleles=args.alleles)
n_jobs=args.joblib_num_jobs,
pre_dispatch=args.joblib_pre_dispatch,
verbose=1 if not args.quiet else 0)
logging.info( logging.info(
"Training %d model architectures across %d folds = %d models" "Training %d model architectures across %d folds = %d models"
...@@ -266,10 +264,7 @@ def go(args): ...@@ -266,10 +264,7 @@ def go(args):
cv_results = train_across_models_and_folds( cv_results = train_across_models_and_folds(
cv_folds, cv_folds,
model_architectures, model_architectures,
folds_per_task=args.cv_folds_per_task, folds_per_task=args.cv_folds_per_task)
n_jobs=args.joblib_num_jobs,
verbose=1 if not args.quiet else 0,
pre_dispatch=args.joblib_pre_dispatch)
logging.info( logging.info(
"Completed cross validation in %0.2f seconds" % (time.time() - start)) "Completed cross validation in %0.2f seconds" % (time.time() - start))
...@@ -311,7 +306,6 @@ def go(args): ...@@ -311,7 +306,6 @@ def go(args):
logging.info("") logging.info("")
train_folds = [] train_folds = []
train_models = [] train_models = []
imputation_tasks = []
for (allele_num, allele) in enumerate(cv_results.allele.unique()): for (allele_num, allele) in enumerate(cv_results.allele.unique()):
best_index = best_architectures_by_allele[allele] best_index = best_architectures_by_allele[allele]
architecture = model_architectures[best_index] architecture = model_architectures[best_index]
...@@ -321,14 +315,14 @@ def go(args): ...@@ -321,14 +315,14 @@ def go(args):
(allele, best_index, architecture)) (allele, best_index, architecture))
if architecture['impute']: if architecture['impute']:
imputation_task = joblib.delayed(impute_and_select_allele)( imputation_future = backend.submit(
impute_and_select_allele,
train_data, train_data,
imputer=imputer, imputer=imputer,
allele=allele, allele=allele,
**impute_kwargs) **impute_kwargs)
imputation_tasks.append(imputation_task)
else: else:
imputation_task = None imputation_future = None
test_data_this_allele = None test_data_this_allele = None
if test_data is not None: if test_data is not None:
...@@ -344,29 +338,17 @@ def go(args): ...@@ -344,29 +338,17 @@ def go(args):
# the imputations so we have to queue up the tasks first. # the imputations so we have to queue up the tasks first.
# If we are not doing imputation then the imputation_task # If we are not doing imputation then the imputation_task
# is None. # is None.
imputed_train=imputation_task, imputed_train=imputation_future,
test=test_data_this_allele) test=test_data_this_allele)
train_folds.append(fold) train_folds.append(fold)
if imputation_tasks: train_folds = [
logging.info( result_fold._replace(imputed_train=(
"Waiting for %d full-data imputation tasks to complete" result_fold.imputed_train.result()
% len(imputation_tasks)) if result_fold.imputed_train is not None
imputation_results = joblib.Parallel( else None))
n_jobs=args.joblib_num_jobs, for result_fold in train_folds
verbose=1 if not args.quiet else 0, ]
pre_dispatch=args.joblib_pre_dispatch)(imputation_tasks)
train_folds = [
train_fold._replace(
# Now we replace imputed_train with the actual imputed
# dataset.
imputed_train=imputation_results.pop(0)
if (train_fold.imputed_train is not None) else None)
for train_fold in train_folds
]
assert not imputation_results
del imputation_tasks
logging.info("Training %d production models" % len(train_folds)) logging.info("Training %d production models" % len(train_folds))
start = time.time() start = time.time()
...@@ -374,10 +356,7 @@ def go(args): ...@@ -374,10 +356,7 @@ def go(args):
train_folds, train_folds,
train_models, train_models,
cartesian_product_of_folds_and_models=False, cartesian_product_of_folds_and_models=False,
return_predictors=args.out_models_dir is not None, return_predictors=args.out_models_dir is not None)
n_jobs=args.joblib_num_jobs,
verbose=1 if not args.quiet else 0,
pre_dispatch=args.joblib_pre_dispatch)
logging.info( logging.info(
"Completed production training in %0.2f seconds" "Completed production training in %0.2f seconds"
% (time.time() - start)) % (time.time() - start))
......
...@@ -25,7 +25,7 @@ from os.path import join ...@@ -25,7 +25,7 @@ from os.path import join
import pandas import pandas
from ..downloads import get_path from ..downloads import get_path
from ..common import normalize_allele_name from ..common import normalize_allele_name, UnsupportedAllele
CACHED_LOADER = None CACHED_LOADER = None
...@@ -113,7 +113,7 @@ class Class1AlleleSpecificPredictorLoader(object): ...@@ -113,7 +113,7 @@ class Class1AlleleSpecificPredictorLoader(object):
try: try:
predictor_name = self.df.ix[allele_name].predictor_name predictor_name = self.df.ix[allele_name].predictor_name
except KeyError: except KeyError:
raise ValueError( raise UnsupportedAllele(
"No models for allele '%s'. Alleles with models: %s" "No models for allele '%s'. Alleles with models: %s"
" in models file: %s" % ( " in models file: %s" % (
allele_name, allele_name,
......
...@@ -28,11 +28,11 @@ import pandas ...@@ -28,11 +28,11 @@ import pandas
import mhcflurry import mhcflurry
from joblib import Parallel, delayed
from .scoring import make_scores from .scoring import make_scores
from .class1_binding_predictor import Class1BindingPredictor from .class1_binding_predictor import Class1BindingPredictor
from ..hyperparameters import HyperparameterDefaults from ..hyperparameters import HyperparameterDefaults
from ..parallelism import get_default_backend
TRAIN_HYPERPARAMETER_DEFAULTS = HyperparameterDefaults(impute=False) TRAIN_HYPERPARAMETER_DEFAULTS = HyperparameterDefaults(impute=False)
HYPERPARAMETER_DEFAULTS = ( HYPERPARAMETER_DEFAULTS = (
...@@ -239,9 +239,7 @@ def train_across_models_and_folds( ...@@ -239,9 +239,7 @@ def train_across_models_and_folds(
cartesian_product_of_folds_and_models=True, cartesian_product_of_folds_and_models=True,
return_predictors=False, return_predictors=False,
folds_per_task=1, folds_per_task=1,
n_jobs=1, parallel_backend=None):
verbose=0,
pre_dispatch='2*n_jobs'):
''' '''
Train and optionally test any number of models across any number of folds. Train and optionally test any number of models across any number of folds.
...@@ -261,24 +259,17 @@ def train_across_models_and_folds( ...@@ -261,24 +259,17 @@ def train_across_models_and_folds(
return_predictors : boolean, optional return_predictors : boolean, optional
Include the trained predictors in the result. Include the trained predictors in the result.
n_jobs : integer, optional parallel_backend : mhcflurry.parallelism.ParallelBackend, optional
The number of jobs to run in parallel. If -1, then the number of jobs Futures implementation to use for running on multiple threads,
is set to the number of cores. processes, or nodes
verbose : integer, optional
The joblib verbosity. If non zero, progress messages are printed. Above
50, the output is sent to stdout. The frequency of the messages
increases with the verbosity level. If it more than 10, all iterations
are reported.
pre_dispatch : {"all", integer, or expression, as in "3*n_jobs"}
The number of joblib batches (of tasks) to be pre-dispatched. Default
is "2*n_jobs".
Returns Returns
----------- -----------
pandas.DataFrame pandas.DataFrame
''' '''
if parallel_backend is None:
parallel_backend = get_default_backend()
if cartesian_product_of_folds_and_models: if cartesian_product_of_folds_and_models:
tasks_per_model = int(math.ceil(float(len(folds)) / folds_per_task)) tasks_per_model = int(math.ceil(float(len(folds)) / folds_per_task))
fold_index_groups = [[] for _ in range(tasks_per_model)] fold_index_groups = [[] for _ in range(tasks_per_model)]
...@@ -307,15 +298,16 @@ def train_across_models_and_folds( ...@@ -307,15 +298,16 @@ def train_across_models_and_folds(
logging.info("Training %d architectures on %d folds = %d tasks." % ( logging.info("Training %d architectures on %d folds = %d tasks." % (
len(model_descriptions), len(folds), len(task_model_and_fold_indices))) len(model_descriptions), len(folds), len(task_model_and_fold_indices)))
task_results = Parallel( def train_and_test_one_model_task(model_and_fold_nums_pair):
n_jobs=n_jobs, (model_num, fold_nums) = model_and_fold_nums_pair
verbose=verbose, return train_and_test_one_model(
pre_dispatch=pre_dispatch)(
delayed(train_and_test_one_model)(
model_descriptions[model_num], model_descriptions[model_num],
[folds[i] for i in fold_nums], [folds[i] for i in fold_nums],
return_predictor=return_predictors) return_predictor=return_predictors)
for (model_num, fold_nums) in task_model_and_fold_indices)
task_results = parallel_backend.map(
train_and_test_one_model_task,
task_model_and_fold_indices)
logging.info("Done.") logging.info("Done.")
......
...@@ -20,6 +20,10 @@ from collections import defaultdict ...@@ -20,6 +20,10 @@ from collections import defaultdict
import numpy as np import numpy as np
class UnsupportedAllele(Exception):
pass
def parse_int_list(s): def parse_int_list(s):
return [int(part.strip()) for part in s.split(",")] return [int(part.strip()) for part in s.split(",")]
......
...@@ -70,7 +70,9 @@ class Dataset(object): ...@@ -70,7 +70,9 @@ class Dataset(object):
for expected_column_name in {"allele", "peptide", "affinity"}: for expected_column_name in {"allele", "peptide", "affinity"}:
if expected_column_name not in columns: if expected_column_name not in columns:
raise ValueError("Missing column '%s' from DataFrame") raise ValueError(
"Missing column '%s' from DataFrame" %
expected_column_name)
# make allele and peptide columns the index, and copy it # make allele and peptide columns the index, and copy it
# so we can add a column without any observable side-effect in # so we can add a column without any observable side-effect in
# the calling code # the calling code
......
import multiprocessing from concurrent import futures
import logging import logging
import joblib.parallel DEFAULT_BACKEND = None
def configure_joblib(multiprocessing_mode="spawn"): class ParallelBackend(object):
""" """
Set joblib's default multiprocessing mode. Thin wrapper of futures implementations. Designed to support
concurrent.futures as well as dask.distributed's workalike implementation.
"""
def __init__(self, executor, module, verbose=1):
self.executor = executor
self.module = module
self.verbose = verbose
def submit(self, func, *args, **kwargs):
if self.verbose > 0:
logging.debug("Submitting: %s %s %s" % (func, args, kwargs))
return self.executor.submit(func, *args, **kwargs)
def map(self, func, iterable):
fs = [
self.executor.submit(func, arg) for arg in iterable
]
return self.wait(fs)
def wait(self, fs):
result_dict = {}
for finished_future in self.module.as_completed(fs):
result = finished_future.result()
logging.info("%3d / %3d tasks completed" % (
len(result_dict), len(fs)))
result_dict[finished_future] = result
The default used in joblib is "fork" which causes a library we use to return [result_dict[future] for future in fs]
deadlock. This function defaults to setting the multiprocessing mode
to "spawn", which does not deadlock. On Python 3.4, you can also try
the "forkserver" mode which does not deadlock and has better class DaskDistributedParallelBackend(ParallelBackend):
performance. """
ParallelBackend that uses dask.distributed
"""
def __init__(self, scheduler_ip_and_port, verbose=1):
from dask import distributed # pylint: disable=import-error
executor = distributed.Executor(scheduler_ip_and_port)
ParallelBackend.__init__(self, executor, distributed, verbose=verbose)
self.scheduler_ip_and_port = scheduler_ip_and_port
See: https://pythonhosted.org/joblib/parallel.html#bad-interaction-of-multiprocessing-and-third-party-libraries def __str__(self):
return "<Dask distributed backend, scheduler=%s, total_cores=%d>" % (
self.scheduler_ip_and_port,
sum(self.executor.ncores().values()))
Parameters
-------------
multiprocessing_mode : string, one of "spawn", "fork", or "forkserver"
class ConcurrentFuturesParallelBackend(ParallelBackend):
"""
ParallelBackend that uses Python's concurrent.futures module.
Can use either threads or processes.
""" """
if hasattr(multiprocessing, "get_context"): def __init__(self, num_workers=1, processes=False, verbose=1):
joblib.parallel.DEFAULT_MP_CONTEXT = multiprocessing.get_context( if processes:
multiprocessing_mode) executor = futures.ProcessPoolExecutor(num_workers)
else: else:
logging.warn( executor = futures.ThreadPoolExecutor(num_workers)
"You will probably get deadlocks on Python earlier than 3.4 " ParallelBackend.__init__(self, executor, futures, verbose=verbose)
"if you set n_jobs to anything other than 1.") self.num_workers = num_workers
self.processes = processes
def __str__(self):
return "<Concurrent futures %s parallel backend, num workers = %d>" % (
("processes" if self.processes else "threads"), self.num_workers)
def set_default_backend(backend):
global DEFAULT_BACKEND
DEFAULT_BACKEND = backend
def get_default_backend():
global DEFAULT_BACKEND
if DEFAULT_BACKEND is None:
set_default_backend(ConcurrentFuturesParallelBackend())
return DEFAULT_BACKEND
...@@ -17,11 +17,14 @@ from collections import OrderedDict ...@@ -17,11 +17,14 @@ from collections import OrderedDict
import pandas as pd import pandas as pd
from .class1_allele_specific import load from .class1_allele_specific import load
from .common import normalize_allele_name from .common import normalize_allele_name, UnsupportedAllele
def predict(alleles, peptides): def predict(alleles, peptides, loaders=None):
""" """
Make predictions across all combinations of the specified alleles and
peptides.
Parameters Parameters
---------- ----------
alleles : list of str alleles : list of str
...@@ -30,8 +33,15 @@ def predict(alleles, peptides): ...@@ -30,8 +33,15 @@ def predict(alleles, peptides):
peptides : list of str peptides : list of str
Peptide amino acid sequences. Peptide amino acid sequences.
loaders : list of Class1AlleleSpecificPredictorLoader, optional
Loaders to try. Will be tried in the order given.
Returns DataFrame with columns "Allele", "Peptide", and "Prediction" Returns DataFrame with columns "Allele", "Peptide", and "Prediction"
""" """
if loaders is None:
loaders = [
load.get_loader_for_downloaded_models(),
]
result_dict = OrderedDict([ result_dict = OrderedDict([
("Allele", []), ("Allele", []),
("Peptide", []), ("Peptide", []),
...@@ -39,7 +49,22 @@ def predict(alleles, peptides): ...@@ -39,7 +49,22 @@ def predict(alleles, peptides):
]) ])
for allele in alleles: for allele in alleles:
allele = normalize_allele_name(allele) allele = normalize_allele_name(allele)
model = load.from_allele_name(allele) exceptions = {} # loader -> UnsupportedAllele exception
model = None
for loader in loaders:
try:
model = loader.from_allele_name(allele)
break
except UnsupportedAllele as e:
exceptions[loader] = e
if model is None:
raise UnsupportedAllele(
"No loaders support allele '%s'. Errors were:\n%s" % (
allele,
"\n".join(
("\t%-20s : %s" % (k, v))
for (k, v) in exceptions.items())))
for i, ic50 in enumerate(model.predict(peptides)): for i, ic50 in enumerate(model.predict(peptides)):
result_dict["Allele"].append(allele) result_dict["Allele"].append(allele)
result_dict["Peptide"].append(peptides[i]) result_dict["Peptide"].append(peptides[i])
......
# Copyright (c) 2016. Mount Sinai School of Medicine
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''
Run MHCflurry predictor on specified peptide/allele pairs.
Examples:
Write a CSV file containing the contents of INPUT.csv plus an
additional column giving MHCflurry binding affinity predictions:
mhcflurry-predict INPUT.csv --out RESULT.csv
The input CSV file is expected to contain columns 'allele' and 'peptide'.
The predictions are written to a column called 'mhcflurry_prediction'.
These default column names may be changed with the --allele-column,
--peptide-column, and --prediction-column options.
If --out is not specified, results are writtent to standard out.
You can also run on alleles and peptides specified on the commandline, in
which case predictions are written for all combinations of alleles and
peptides:
mhcflurry-predict --alleles HLA-A0201 H-2Kb --peptides SIINFEKL DENDREKLLL
'''
from __future__ import (
print_function,
division,
absolute_import,
)
import sys
import argparse
import logging
import pandas
import itertools
from .downloads import get_path
from . import class1_allele_specific
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument(
"input",
metavar="FILE.csv",
nargs="?",
help="Input CSV")
parser.add_argument(
"--out",
metavar="FILE.csv",
help="Output CSV")
parser.add_argument(
"--alleles",
metavar="ALLELE",
nargs="+",
help="Alleles to predict (exclusive with --input)")
parser.add_argument(
"--peptides",
metavar="PEPTIDE",
nargs="+",
help="Peptides to predict (exclusive with --input)")
parser.add_argument(
"--allele-column",
metavar="NAME",
default="allele",
help="Input column name for alleles. Default: '%(default)s'")
parser.add_argument(
"--peptide-column",
metavar="NAME",
default="peptide",
help="Input column name for peptides. Default: '%(default)s'")
parser.add_argument(
"--prediction-column",
metavar="NAME",
default="mhcflurry_prediction",
help="Output column name for predictions. Default: '%(default)s'")
parser.add_argument(
"--models-class1-allele-specific-single",
metavar="DIR",
default=get_path("models_class1_allele_specific_single"),
help="Directory containing class1 allele specific single models. "
"Default: '%(default)s'")
def run(argv=sys.argv[1:]):
args = parser.parse_args(argv)
if args.input:
if args.alleles or args.peptides:
parser.error(
"If an input file is specified, do not specify --alleles "
"or --peptides")
df = pandas.read_csv(args.input)
print("Read input CSV with %d rows, columns are: %s" % (
len(df), ", ".join(df.columns)))
for col in [args.allele_column, args.peptide_column]:
if col not in df.columns:
raise ValueError(
"No such column '%s' in CSV. Columns are: %s" % (
col, ", ".join(["'%s'" % c for c in df.columns])))
else:
if not args.alleles or not args.peptides:
parser.error(
"Specify either an input CSV file or both the "
"--alleles and --peptides arguments")
pairs = list(itertools.product(args.alleles, args.peptides))
df = pandas.DataFrame({
"allele": [p[0] for p in pairs],
"peptide": [p[1] for p in pairs],
})
print("Predicting for %d alleles and %d peptides = %d predictions" % (
len(args.alleles), len(args.peptides), len(df)))
class1_allele_specific_loader = (
class1_allele_specific.load.Class1AlleleSpecificPredictorLoader(
args.models_class1_allele_specific_single))
predictions = {} # allele -> peptide -> value
for (allele, sub_df) in df.groupby(args.allele_column):
logging.info("Running %d predictions for allele %s" % (
len(sub_df), allele))
model = class1_allele_specific_loader.from_allele_name(allele)
peptides = sub_df[args.peptide_column].values
predictions[allele] = dict(
(peptide, prediction)
for (peptide, prediction)
in zip(peptides, model.predict(peptides)))
logging.info("Collecting result")
df[args.prediction_column] = [
predictions[row[args.allele_column]][row[args.peptide_column]]
for (_, row) in df.iterrows()
]
if args.out:
df.to_csv(args.out, index=False)
print("Wrote: %s" % args.out)
else:
df.to_csv(sys.stdout, index=False)
...@@ -54,6 +54,7 @@ if __name__ == '__main__': ...@@ -54,6 +54,7 @@ if __name__ == '__main__':
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'mhcflurry-downloads = mhcflurry.downloads_command:run', 'mhcflurry-downloads = mhcflurry.downloads_command:run',
'mhcflurry-predict = mhcflurry.predict_command:run',
'mhcflurry-class1-allele-specific-cv-and-train = ' 'mhcflurry-class1-allele-specific-cv-and-train = '
'mhcflurry.class1_allele_specific.cv_and_train_command:run' 'mhcflurry.class1_allele_specific.cv_and_train_command:run'
] ]
...@@ -81,8 +82,7 @@ if __name__ == '__main__': ...@@ -81,8 +82,7 @@ if __name__ == '__main__':
'h5py', 'h5py',
'typechecks', 'typechecks',
'pepdata', 'pepdata',
'joblib', 'futures',
'cherrypy', # for multi-threaded web server
'bottle', 'bottle',
'six', 'six',
], ],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment