Skip to content
Snippets Groups Projects
Commit 11ad19cd authored by Tim O'Donnell's avatar Tim O'Donnell
Browse files

Basic support for model selection

parent 51bb93e3
No related branches found
No related tags found
No related merge requests found
"""
Calibrate percentile ranks for models. Runs in-place.
"""
import argparse
import os
import signal
import sys
import time
import traceback
import random
from functools import partial
import numpy
import pandas
import yaml
from sklearn.metrics.pairwise import cosine_similarity
from mhcnames import normalize_allele_name
import tqdm # progress bar
tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481
from .class1_affinity_predictor import Class1AffinityPredictor
from .common import configure_logging
from .parallelism import (
make_worker_pool, call_wrapped)
# To avoid pickling large matrices to send to child processes when running in
# parallel, we use this global variable as a place to store data. Data that is
# stored here before creating the thread pool will be inherited to the child
# processes upon fork() call, allowing us to share large data with the workers
# via shared memory.
GLOBAL_DATA = {}
parser = argparse.ArgumentParser(usage=__doc__)
parser.add_argument(
"--models-dir",
metavar="DIR",
required=True,
help="Directory to read and write models")
parser.add_argument(
"--allele",
default=None,
nargs="+",
help="Alleles to train models for. If not specified, all alleles with "
"enough measurements will be used.")
parser.add_argument(
"--num-peptides-per-length",
type=int,
metavar="N",
default=int(1e5),
help="Number of peptides per length to use to calibrate percent ranks. "
"Default: %(default)s.")
parser.add_argument(
"--num-jobs",
default=1,
type=int,
metavar="N",
help="Number of processes to parallelize over. "
"Set to 1 for serial run. Set to 0 to use number of cores. Default: %(default)s.")
parser.add_argument(
"--max-tasks-per-worker",
type=int,
metavar="N",
default=None,
help="Restart workers after N tasks. Workaround for tensorflow memory "
"leaks. Requires Python >=3.2.")
parser.add_argument(
"--verbosity",
type=int,
help="Keras verbosity. Default: %(default)s",
default=0)
def run(argv=sys.argv[1:]):
global GLOBAL_DATA
# On sigusr1 print stack trace
print("To show stack trace, run:\nkill -s USR1 %d" % os.getpid())
signal.signal(signal.SIGUSR1, lambda sig, frame: traceback.print_stack())
args = parser.parse_args(argv)
args.models_dir = os.path.abspath(args.models_dir)
configure_logging(verbose=args.verbosity > 1)
predictor = Class1AffinityPredictor.load(args.models_dir)
if args.allele:
alleles = [normalize_allele_name(a) for a in args.allele]
else:
alleles = predictor.supported_alleles
start = time.time()
print("Performing percent rank calibration. Encoding peptides.")
encoded_peptides = predictor.calibrate_percentile_ranks(
alleles=[], # don't actually do any calibration, just return peptides
num_peptides_per_length=args.num_peptides_per_length)
# Now we encode the peptides for each neural network, so the encoding
# becomes cached.
for network in predictor.neural_networks:
network.peptides_to_network_input(encoded_peptides)
assert encoded_peptides.encoding_cache # must have cached the encoding
print("Finished encoding peptides for percent ranks in %0.2f sec." % (
time.time() - start))
print("Calibrating percent rank calibration for %d alleles." % len(alleles))
if args.num_jobs == 1:
# Serial run
print("Running in serial.")
worker_pool = None
results = (
calibrate_percentile_ranks(
allele=allele,
predictor=predictor,
peptides=encoded_peptides)
for allele in alleles)
else:
# Parallel run
# Store peptides in global variable so they are in shared memory
# after fork, instead of needing to be pickled.
GLOBAL_DATA["calibration_peptides"] = encoded_peptides
worker_pool = make_worker_pool(
processes=(
args.num_jobs
if args.num_jobs else None),
max_tasks_per_worker=args.max_tasks_per_worker)
results = worker_pool.imap_unordered(
partial(
partial(call_wrapped, calibrate_percentile_ranks),
predictor=predictor),
alleles,
chunksize=1)
for result in tqdm.tqdm(results, total=len(alleles)):
predictor.allele_to_percent_rank_transform.update(result)
print("Done calibrating %d additional alleles." % len(alleles))
predictor.save(args.models_dir, model_names_to_write=[])
percent_rank_calibration_time = time.time() - start
if worker_pool:
worker_pool.close()
worker_pool.join()
print("Percent rank calibration time: %0.2f min." % (
percent_rank_calibration_time / 60.0))
print("Predictor written to: %s" % args.models_dir)
def calibrate_percentile_ranks(allele, predictor, peptides=None):
"""
Private helper function.
"""
global GLOBAL_DATA
if peptides is None:
peptides = GLOBAL_DATA["calibration_peptides"]
predictor.calibrate_percentile_ranks(
peptides=peptides,
alleles=[allele])
return {
allele: predictor.allele_to_percent_rank_transform[allele],
}
if __name__ == '__main__':
run()
......@@ -47,7 +47,8 @@ class Class1AffinityPredictor(object):
class1_pan_allele_models=None,
allele_to_fixed_length_sequence=None,
manifest_df=None,
allele_to_percent_rank_transform=None):
allele_to_percent_rank_transform=None,
metadata_dataframes=None):
"""
Parameters
----------
......@@ -68,6 +69,10 @@ class Class1AffinityPredictor(object):
allele_to_percent_rank_transform : dict of string -> `PercentRankTransform`, optional
`PercentRankTransform` instances to use for each allele
metadata_dataframes : dict of string -> pandas.DataFrame, optional
Optional additional dataframes to write to the models dir when
save() is called. Useful for tracking provenance.
"""
if allele_to_allele_specific_models is None:
......@@ -107,6 +112,7 @@ class Class1AffinityPredictor(object):
if not allele_to_percent_rank_transform:
allele_to_percent_rank_transform = {}
self.allele_to_percent_rank_transform = allele_to_percent_rank_transform
self.metadata_dataframes = metadata_dataframes
self._cache = {}
def clear_cache(self):
......@@ -264,7 +270,7 @@ class Class1AffinityPredictor(object):
self._cache["supported_peptide_lengths"] = result
return self._cache["supported_peptide_lengths"]
def save(self, models_dir, model_names_to_write=None):
def save(self, models_dir, model_names_to_write=None, write_metadata=True):
"""
Serialize the predictor to a directory on disk. If the directory does
not exist it will be created.
......@@ -284,6 +290,9 @@ class Class1AffinityPredictor(object):
model_names_to_write : list of string, optional
Only write the weights for the specified models. Useful for
incremental updates during training.
write_metadata : boolean, optional
Whether to write optional metadata
"""
num_models = len(self.class1_pan_allele_models) + sum(
len(v) for v in self.allele_to_allele_specific_models.values())
......@@ -315,16 +324,22 @@ class Class1AffinityPredictor(object):
write_manifest_df.to_csv(manifest_path, index=False)
logging.info("Wrote: %s" % manifest_path)
# Write "info.txt"
info_path = join(models_dir, "info.txt")
rows = [
("trained on", time.asctime()),
("package ", "mhcflurry %s" % __version__),
("hostname ", gethostname()),
("user ", getuser()),
]
pandas.DataFrame(rows).to_csv(
info_path, sep="\t", header=False, index=False)
if write_metadata:
# Write "info.txt"
info_path = join(models_dir, "info.txt")
rows = [
("trained on", time.asctime()),
("package ", "mhcflurry %s" % __version__),
("hostname ", gethostname()),
("user ", getuser()),
]
pandas.DataFrame(rows).to_csv(
info_path, sep="\t", header=False, index=False)
if self.metadata_dataframes:
for (name, df) in self.metadata_dataframes.items():
metadata_df_path = join(models_dir, "%s.csv.bz2" % name)
df.to_csv(metadata_df_path, index=False, compression="bz2")
if self.allele_to_fixed_length_sequence is not None:
allele_to_sequence_df = pandas.DataFrame(
......@@ -1132,4 +1147,93 @@ class Class1AffinityPredictor(object):
allele_to_allele_specific_models=allele_to_allele_specific_models,
class1_pan_allele_models=class1_pan_allele_models,
allele_to_fixed_length_sequence=self.allele_to_fixed_length_sequence,
)
\ No newline at end of file
)
def model_select(
self,
score_function,
alleles=None,
min_models=1,
max_models=10000):
"""
Perform model selection using a user-specified scoring function.
Model selection is done using a "step up" variable selection procedure,
in which models are repeatedly added to an ensemble until the score
stops improving.
Parameters
----------
score_function : Class1AffinityPredictor -> float function
Scoring function
alleles : list of string, optional
If not specified, model selection is performed for all alleles.
min_models : int, optional
Min models to select per allele
max_models : int, optional
Max models to select per allele
Returns
-------
Class1AffinityPredictor : predictor containing the selected models
"""
if alleles is None:
alleles = self.supported_alleles
dfs = []
allele_to_allele_specific_models = {}
for allele in alleles:
df = pandas.DataFrame({
'model': self.allele_to_allele_specific_models[allele]
})
df["model_num"] = df.index
df["allele"] = allele
df["selected"] = False
round_num = 1
while not df.selected.all() and sum(df.selected) < max_models:
score_col = "score_%2d" % round_num
prev_score_col = "score_%2d" % (round_num - 1)
existing_selected = list(df[df.selected].model)
df[score_col] = [
numpy.nan if row.selected else
score_function(
Class1AffinityPredictor(
allele_to_allele_specific_models={
allele: [row.model] + existing_selected
}))
for (_, row) in df.iterrows()
]
if round_num > min_models and (
df[score_col].max() < df[prev_score_col].max()):
break
# In case of a tie, pick a model at random.
(best_model_index,) = df.loc[
(df[score_col] == df[score_col].max())
].sample(1).index
df.loc[best_model_index, "selected"] = True
round_num += 1
dfs.append(df)
print("Selected %d models for allele %s" % (
df.selected.sum(), allele))
allele_to_allele_specific_models[allele] = list(
df.loc[df.selected].model)
df = pandas.concat(dfs, ignore_index=True)
new_predictor = Class1AffinityPredictor(
allele_to_allele_specific_models,
metadata_dataframes={
"model_selection": df,
})
return new_predictor
......@@ -3,6 +3,8 @@ import collections
import logging
import sys
import os
from struct import unpack
from hashlib import sha256
import numpy
import pandas
......
......@@ -4,6 +4,11 @@ from multiprocessing import Pool, Queue, cpu_count
from six.moves import queue
from multiprocessing.util import Finalize
from pprint import pprint
import random
import numpy
from .common import set_keras_backend
def make_worker_pool(
......@@ -102,6 +107,17 @@ def worker_init_entry_point(
init_function(**kwargs)
def worker_init(keras_backend=None, gpu_device_nums=None):
# Each worker needs distinct random numbers
numpy.random.seed()
random.seed()
if keras_backend or gpu_device_nums:
print("WORKER pid=%d assigned GPU devices: %s" % (
os.getpid(), gpu_device_nums))
set_keras_backend(
keras_backend, gpu_device_nums=gpu_device_nums)
# Solution suggested in https://bugs.python.org/issue13831
class WrapException(Exception):
"""
......
"""
Model select class1 single allele models.
"""
import argparse
import os
import signal
import sys
import time
import traceback
import random
from functools import partial
import pandas
from scipy.stats import kendalltau
from mhcnames import normalize_allele_name
import tqdm # progress bar
tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481
from .class1_affinity_predictor import Class1AffinityPredictor
from .encodable_sequences import EncodableSequences
from .common import configure_logging, random_peptides
from .parallelism import make_worker_pool
from .regression_target import from_ic50
# To avoid pickling large matrices to send to child processes when running in
# parallel, we use this global variable as a place to store data. Data that is
# stored here before creating the thread pool will be inherited to the child
# processes upon fork() call, allowing us to share large data with the workers
# via shared memory.
GLOBAL_DATA = {}
parser = argparse.ArgumentParser(usage=__doc__)
parser.add_argument(
"--data",
metavar="FILE.csv",
required=False,
help=(
"Model selection data CSV. Expected columns: "
"allele, peptide, measurement_value"))
parser.add_argument(
"--exclude-data",
metavar="FILE.csv",
required=False,
help=(
"Data to EXCLUDE from model selection. Useful to specify the original "
"training data used"))
parser.add_argument(
"--models-dir",
metavar="DIR",
required=True,
help="Directory to read models")
parser.add_argument(
"--out-models-dir",
metavar="DIR",
required=True,
help="Directory to write selected models")
parser.add_argument(
"--allele",
default=None,
nargs="+",
help="Alleles to select models for. If not specified, all alleles with "
"enough measurements will be used.")
parser.add_argument(
"--min-measurements-per-allele",
type=int,
metavar="N",
default=50,
help="Min number of data points required for data-driven model selection")
parser.add_argument(
"--min-models",
type=int,
default=8,
metavar="N",
help="Min number of models to select per allele")
parser.add_argument(
"--max-models",
type=int,
default=15,
metavar="N",
help="Max number of models to select per allele")
parser.add_argument(
"--scoring",
nargs="+",
choices=("mse", "mass-spec", "consensus"),
default=["mse", "consensus"],
help="Scoring procedures to use in order")
parser.add_argument(
"--consensus-num-peptides-per-length",
type=int,
default=100000,
help="Num peptides per length to use for consensus scoring")
parser.add_argument(
"--num-jobs",
default=1,
type=int,
metavar="N",
help="Number of processes to parallelize selection over. "
"Set to 1 for serial run. Set to 0 to use number of cores. Default: %(default)s.")
parser.add_argument(
"--backend",
choices=("tensorflow-gpu", "tensorflow-cpu", "tensorflow-default"),
help="Keras backend. If not specified will use system default.")
parser.add_argument(
"--verbosity",
type=int,
help="Keras verbosity. Default: %(default)s",
default=0)
def run(argv=sys.argv[1:]):
global GLOBAL_DATA
# On sigusr1 print stack trace
print("To show stack trace, run:\nkill -s USR1 %d" % os.getpid())
signal.signal(signal.SIGUSR1, lambda sig, frame: traceback.print_stack())
args = parser.parse_args(argv)
args.out_models_dir = os.path.abspath(args.out_models_dir)
configure_logging(verbose=args.verbosity > 1)
input_predictor = Class1AffinityPredictor.load(args.models_dir)
print("Loaded: %s" % input_predictor)
if args.allele:
alleles = [normalize_allele_name(a) for a in args.allele]
else:
alleles = input_predictor.supported_alleles
if args.data:
df = pandas.read_csv(args.data)
print("Loaded data: %s" % (str(df.shape)))
df = df.ix[
(df.peptide.str.len() >= 8) & (df.peptide.str.len() <= 15)
]
print("Subselected to 8-15mers: %s" % (str(df.shape)))
# Allele names in data are assumed to be already normalized.
df = df.loc[df.allele.isin(alleles)].dropna()
print("Selected %d alleles: %s" % (len(alleles), ' '.join(alleles)))
metadata_dfs = {}
if args.exclude_data:
exclude_df = pandas.read_csv(args.exclude_data)
metadata_dfs["model_selection_exclude"] = exclude_df
print("Loaded exclude data: %s" % (str(df.shape)))
df["_key"] = df.allele + "__" + df.peptide
exclude_df["_key"] = exclude_df.allele + "__" + exclude_df.peptide
df["_excluded"] = df._key.isin(exclude_df._key.unique())
print("Excluding measurements per allele (counts): ")
print(df.groupby("allele")._excluded.sum())
print("Excluding measurements per allele (fractions): ")
print(df.groupby("allele")._excluded.mean())
df = df.loc[~df._excluded]
print("Reduced data to: %s" % (str(df.shape)))
else:
df = None
model_selection_kwargs = {
'min_models': args.min_models,
'max_models': args.max_models,
}
selectors = {}
for scoring in args.scoring:
if scoring == "mse":
selector = MSEModelSelector(
df=df,
predictor=input_predictor,
min_measurements=args.min_measurements_per_allele,
model_selection_kwargs=model_selection_kwargs)
elif scoring == "consensus":
selector = ConsensusModelSelector(
predictor=input_predictor,
num_peptides_per_length=args.consensus_num_peptides_per_length,
model_selection_kwargs=model_selection_kwargs)
selectors[scoring] = selector
print("Selectors for alleles:")
allele_to_selector = {}
for allele in alleles:
selector = None
for possible_selector in args.scoring:
if selectors[possible_selector].usable_for_allele(allele=allele):
selector = selectors[possible_selector]
print("%20s %s" % (allele, possible_selector))
break
if selector is None:
raise ValueError("No selectors usable for allele: %s" % allele)
allele_to_selector[allele] = selector
GLOBAL_DATA["allele_to_selector"] = allele_to_selector
if not os.path.exists(args.out_models_dir):
print("Attempting to create directory: %s" % args.out_models_dir)
os.mkdir(args.out_models_dir)
print("Done.")
metadata_dfs["model_selection_data"] = df
result_predictor = Class1AffinityPredictor(metadata_dataframes=metadata_dfs)
start = time.time()
if args.num_jobs == 1:
# Serial run
print("Running in serial.")
worker_pool = None
results = (
model_select(allele) for allele in alleles)
else:
worker_pool = make_worker_pool(
processes=(
args.num_jobs
if args.num_jobs else None),
max_tasks_per_worker=args.max_tasks_per_worker)
random.shuffle(alleles)
results = worker_pool.imap_unordered(
model_select,
alleles,
chunksize=1)
for result in tqdm.tqdm(results, total=len(alleles)):
result_predictor.merge_in_place([result])
print("Done model selecting for %d alleles." % len(alleles))
result_predictor.save(args.out_models_dir)
model_selection_time = time.time() - start
if worker_pool:
worker_pool.close()
worker_pool.join()
print("Model selection time %0.2f min." % (model_selection_time / 60.0))
print("Predictor written to: %s" % args.models_dir)
def model_select(allele):
global GLOBAL_DATA
selector = GLOBAL_DATA["allele_to_selector"][allele]
return selector.select(allele)
class ConsensusModelSelector(object):
def __init__(
self,
predictor,
num_peptides_per_length=100000,
model_selection_kwargs={}):
(min_length, max_length) = predictor.supported_peptide_lengths
peptides = []
for length in range(min_length, max_length + 1):
peptides.extend(
random_peptides(num_peptides_per_length, length=length))
self.peptides = EncodableSequences.create(peptides)
self.predictor = predictor
self.model_selection_kwargs = model_selection_kwargs
# Run predictions for one allele so self.peptides caches the encoding.
self.predictor.predict(
allele=self.predictor.supported_alleles[0],
peptides=self.peptides)
def usable_for_allele(self, allele):
return True
def score_function(self, allele, ensemble_predictions, predictor):
predictions = predictor.predict(
allele=allele,
peptides=self.peptides,
)
return kendalltau(predictions, ensemble_predictions).correlation
def select(self, allele):
full_ensemble_predictions = self.predictor.predict(
allele=allele,
peptides=self.peptides)
return self.predictor.model_select(
score_function=partial(
self.score_function, allele, full_ensemble_predictions),
alleles=[allele],
**self.model_selection_kwargs
)
class MSEModelSelector(object):
def __init__(
self,
df,
predictor,
model_selection_kwargs={},
min_measurements=1):
self.df = df
self.predictor = predictor
self.model_selection_kwargs = model_selection_kwargs
self.min_measurements = min_measurements
def usable_for_allele(self, allele):
return (self.df.allele == allele).sum() >= self.min_measurements
@staticmethod
def score_function(allele, sub_df, peptides, predictor):
predictions = predictor.predict(
allele=allele,
peptides=peptides,
)
deviations = from_ic50(predictions) - from_ic50(sub_df.measurement_value)
if 'measurement_inequality' in sub_df.columns:
# Must reverse meaning of inequality since we are working with
# transformed 0-1 values, which are anti-correlated with the ic50s.
# The measurement_inequality column is given in terms of ic50s.
deviations.loc[
((sub_df.measurement_inequality == "<") & (deviations > 0)) |
((sub_df.measurement_inequality == ">") & (deviations < 0))
] = 0.0
return -1 * (deviations**2).mean()
def select(self, allele):
sub_df = self.df.loc[self.df.allele == allele]
peptides = EncodableSequences.create(sub_df.peptide.values)
return self.predictor.model_select(
score_function=partial(
self.score_function, allele, sub_df, peptides),
alleles=[allele],
**self.model_selection_kwargs
)
if __name__ == '__main__':
run()
......@@ -10,10 +10,10 @@ import traceback
import random
from functools import partial
import numpy
import pandas
import yaml
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.model_selection import StratifiedKFold
from mhcnames import normalize_allele_name
import tqdm # progress bar
tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481
......@@ -21,7 +21,7 @@ tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481
from .class1_affinity_predictor import Class1AffinityPredictor
from .common import configure_logging, set_keras_backend
from .parallelism import (
make_worker_pool, cpu_count, call_wrapped, call_wrapped_kwargs)
make_worker_pool, cpu_count, call_wrapped_kwargs, worker_init)
from .hyperparameters import HyperparameterDefaults
from .allele_encoding import AlleleEncoding
......@@ -70,19 +70,25 @@ parser.add_argument(
metavar="N",
default=50,
help="Train models for alleles with >=N measurements.")
parser.add_argument(
"--held-out-fraction-reciprocal",
type=int,
metavar="N",
default=None,
help="Hold out 1/N fraction of data (for e.g. subsequent model selection. "
"For example, specify 5 to hold out 20% of the data.")
parser.add_argument(
"--held-out-fraction-seed",
type=int,
metavar="N",
default=0,
help="Seed for randomizing which measurements are held out. Only matters "
"when --held-out-fraction is specified. Default: %(default)s.")
parser.add_argument(
"--ignore-inequalities",
action="store_true",
default=False,
help="Do not use affinity value inequalities even when present in data")
parser.add_argument(
"--percent-rank-calibration-num-peptides-per-length",
type=int,
metavar="N",
default=int(1e5),
help="Number of peptides per length to use to calibrate percent ranks. "
"Set to 0 to disable percent rank calibration. The resulting models will "
"not support percent ranks. Default: %(default)s.")
parser.add_argument(
"--n-models",
type=int,
......@@ -100,20 +106,12 @@ parser.add_argument(
"--allele-sequences",
metavar="FILE.csv",
help="Allele sequences file. Used for computing allele similarity matrix.")
parser.add_argument(
"--verbosity",
type=int,
help="Keras verbosity. Default: %(default)s",
default=0)
parser.add_argument(
"--num-jobs",
default=[1],
default=1,
type=int,
metavar="N",
nargs="+",
help="Number of processes to parallelize training and percent rank "
"calibration over, respectively. Experimental. If only one value is specified "
"then the same number of jobs is used for both phases."
help="Number of processes to parallelize training over. Experimental. "
"Set to 1 for serial run. Set to 0 to use number of cores. Default: %(default)s.")
parser.add_argument(
"--backend",
......@@ -146,6 +144,11 @@ parser.add_argument(
default=None,
help="Restart workers after N tasks. Workaround for tensorflow memory "
"leaks. Requires Python >=3.2.")
parser.add_argument(
"--verbosity",
type=int,
help="Keras verbosity. Default: %(default)s",
default=0)
TRAIN_DATA_HYPERPARAMETER_DEFAULTS = HyperparameterDefaults(
......@@ -196,8 +199,14 @@ def run(argv=sys.argv[1:]):
# Allele names in data are assumed to be already normalized.
df = df.loc[df.allele.isin(alleles)].dropna()
print("Selected %d alleles: %s" % (len(alleles), ' '.join(alleles)))
if args.held_out_fraction_reciprocal:
df = subselect_df_held_out(
df,
recriprocal_held_out_fraction=args.held_out_fraction_reciprocal,
seed=args.held_out_fraction_seed)
print("Training data: %s" % (str(df.shape)))
GLOBAL_DATA["train_data"] = df
......@@ -208,8 +217,11 @@ def run(argv=sys.argv[1:]):
os.mkdir(args.out_models_dir)
print("Done.")
predictor = Class1AffinityPredictor()
serial_run = args.num_jobs[0] == 1
predictor = Class1AffinityPredictor(
metadata_dataframes={
'train_data': df,
})
serial_run = args.num_jobs == 1
work_items = []
for (h, hyperparameters) in enumerate(hyperparameters_lst):
......@@ -219,14 +231,15 @@ def run(argv=sys.argv[1:]):
if args.n_models:
n_models = args.n_models
if not n_models:
raise ValueError("Specify --ensemble-size or n_models hyperparameter")
raise ValueError(
"Specify --ensemble-size or n_models hyperparameter")
if args.max_epochs:
hyperparameters['max_epochs'] = args.max_epochs
hyperparameters['train_data'] = TRAIN_DATA_HYPERPARAMETER_DEFAULTS.with_defaults(
hyperparameters.get('train_data', {})
)
hyperparameters['train_data'] = (
TRAIN_DATA_HYPERPARAMETER_DEFAULTS.with_defaults(
hyperparameters.get('train_data', {})))
if hyperparameters['train_data']['pretrain_min_points'] and (
'allele_similarity_matrix' not in GLOBAL_DATA):
......@@ -281,7 +294,7 @@ def run(argv=sys.argv[1:]):
set_keras_backend(args.backend)
else:
# Parallel run.
num_workers = args.num_jobs[0] if args.num_jobs[0] else cpu_count()
num_workers = args.num_jobs if args.num_jobs else cpu_count()
worker_init_kwargs = None
if args.gpus:
print("Attempting to round-robin assign each worker a GPU.")
......@@ -342,7 +355,9 @@ def run(argv=sys.argv[1:]):
save_start = time.time()
new_model_names = predictor.merge_in_place(unsaved_predictors)
predictor.save(
args.out_models_dir, model_names_to_write=new_model_names)
args.out_models_dir,
model_names_to_write=new_model_names,
write_metadata=False)
print(
"Saved predictor (%d models total) including %d new models "
"in %0.2f sec to %s" % (
......@@ -353,10 +368,7 @@ def run(argv=sys.argv[1:]):
unsaved_predictors = []
last_save_time = time.time()
print("Saving final predictor to: %s" % args.out_models_dir)
predictor.merge_in_place(unsaved_predictors)
predictor.save(args.out_models_dir) # write all models just to be sure
print("Done.")
else:
# Run in serial. In this case, every worker is passed the same predictor,
......@@ -368,77 +380,20 @@ def run(argv=sys.argv[1:]):
assert work_predictor is predictor
assert not work_items
print("Saving final predictor to: %s" % args.out_models_dir)
predictor.save(args.out_models_dir) # write all models just to be sure
print("Done.")
print("*" * 30)
training_time = time.time() - start
print("Trained affinity predictor with %d networks in %0.2f min." % (
len(predictor.neural_networks), training_time / 60.0))
print("*" * 30)
if worker_pool:
worker_pool.close()
worker_pool.join()
worker_pool = None
start = time.time()
if args.percent_rank_calibration_num_peptides_per_length > 0:
alleles = list(predictor.supported_alleles)
print("Performing percent rank calibration. Encoding peptides.")
encoded_peptides = predictor.calibrate_percentile_ranks(
alleles=[], # don't actually do any calibration, just return peptides
num_peptides_per_length=args.percent_rank_calibration_num_peptides_per_length)
# Now we encode the peptides for each neural network, so the encoding
# becomes cached.
for network in predictor.neural_networks:
network.peptides_to_network_input(encoded_peptides)
assert encoded_peptides.encoding_cache # must have cached the encoding
print("Finished encoding peptides for percent ranks in %0.2f sec." % (
time.time() - start))
print("Calibrating percent rank calibration for %d alleles." % len(alleles))
if args.num_jobs[-1] == 1:
# Serial run
print("Running in serial.")
worker_pool = None
results = (
calibrate_percentile_ranks(
allele=allele,
predictor=predictor,
peptides=encoded_peptides)
for allele in alleles)
else:
# Parallel run
# Store peptides in global variable so they are in shared memory
# after fork, instead of needing to be pickled.
GLOBAL_DATA["calibration_peptides"] = encoded_peptides
worker_pool = make_worker_pool(
processes=(
args.num_jobs[-1]
if args.num_jobs[-1] else None),
max_tasks_per_worker=args.max_tasks_per_worker)
results = worker_pool.imap_unordered(
partial(
partial(call_wrapped, calibrate_percentile_ranks),
predictor=predictor),
alleles,
chunksize=1)
for result in tqdm.tqdm(results, total=len(alleles)):
predictor.allele_to_percent_rank_transform.update(result)
print("Done calibrating %d additional alleles." % len(alleles))
predictor.save(args.out_models_dir, model_names_to_write=[])
percent_rank_calibration_time = time.time() - start
if worker_pool:
worker_pool.close()
worker_pool.join()
print("Train time: %0.2f min. Percent rank calibration time: %0.2f min." % (
training_time / 60.0, percent_rank_calibration_time / 60.0))
print("Predictor written to: %s" % args.out_models_dir)
......@@ -448,7 +403,8 @@ def alleles_by_similarity(allele):
if allele not in allele_similarity.columns:
# Use random alleles
print("No similar alleles for: %s" % allele)
return [allele] + list(allele_similarity.columns.to_series().sample(frac=1.0))
return [allele] + list(
allele_similarity.columns.to_series().sample(frac=1.0))
return (
allele_similarity[allele] + (
allele_similarity.index == allele) # force that we return specified allele first
......@@ -528,30 +484,21 @@ def train_model(
return predictor
def calibrate_percentile_ranks(allele, predictor, peptides=None):
"""
Private helper function.
"""
global GLOBAL_DATA
if peptides is None:
peptides = GLOBAL_DATA["calibration_peptides"]
predictor.calibrate_percentile_ranks(
peptides=peptides,
alleles=[allele])
return {
allele: predictor.allele_to_percent_rank_transform[allele],
}
def worker_init(keras_backend=None, gpu_device_nums=None):
# Each worker needs distinct random numbers
numpy.random.seed()
random.seed()
if keras_backend or gpu_device_nums:
print("WORKER pid=%d assigned GPU devices: %s" % (
os.getpid(), gpu_device_nums))
set_keras_backend(
keras_backend, gpu_device_nums=gpu_device_nums)
def subselect_df_held_out(df, recriprocal_held_out_fraction=10, seed=0):
kf = StratifiedKFold(
n_splits=recriprocal_held_out_fraction,
shuffle=True,
random_state=seed)
# Stratify by both allele and binder vs. nonbinder.
df["key"] = [
"%s_%s" % (
row.allele,
"binder" if row.measurement_value <= 500 else "nonbinder")
for (_, row) in df.iterrows()
]
(train, test) = next(kf.split(df, df.key))
return df.iloc[train]
if __name__ == '__main__':
run()
......@@ -78,6 +78,10 @@ if __name__ == '__main__':
'mhcflurry-predict = mhcflurry.predict_command:run',
'mhcflurry-class1-train-allele-specific-models = '
'mhcflurry.train_allele_specific_models_command:run',
'mhcflurry-class1-select-allele-specific-models = '
'mhcflurry.select_allele_specific_models_command:run',
'mhcflurry-calibrate-percentile-ranks = '
'mhcflurry.calibrate_percentile_ranks_command:run',
]
},
classifiers=[
......
"""
Test train, calibrate percentile ranks, and model selection commands.
"""
import json
import os
import shutil
import tempfile
import subprocess
from copy import deepcopy
from numpy.testing import assert_array_less, assert_equal
......@@ -12,8 +17,9 @@ from mhcflurry.downloads import get_path
HYPERPARAMETERS = [
{
"n_models": 2,
"max_epochs": 100,
"max_epochs": 500,
"patience": 10,
"minibatch_size": 128,
"early_stopping": True,
"validation_split": 0.2,
......@@ -25,21 +31,16 @@ HYPERPARAMETERS = [
"kmer_size": 15,
"batch_normalization": False,
"locally_connected_layers": [
{
"filters": 8,
"activation": "tanh",
"kernel_size": 3
},
{
"filters": 8,
"activation": "tanh",
"kernel_size": 3
}
],
"activation": "relu",
"activation": "tanh",
"output_activation": "sigmoid",
"layer_sizes": [
32
16
],
"random_negative_affinity_min": 20000.0,
"random_negative_affinity_max": 50000.0,
......@@ -60,15 +61,24 @@ def run_and_check(n_jobs=0):
"mhcflurry-class1-train-allele-specific-models",
"--data", get_path("data_curated", "curated_training_data.no_mass_spec.csv.bz2"),
"--hyperparameters", hyperparameters_filename,
"--allele", "HLA-A*02:01", "HLA-A*01:01", "HLA-A*03:01",
"--allele", "HLA-A*02:01", "HLA-A*03:01",
"--out-models-dir", models_dir,
"--percent-rank-calibration-num-peptides-per-length", "10000",
"--num-jobs", str(n_jobs),
"--ignore-inequalities",
]
print("Running with args: %s" % args)
subprocess.check_call(args)
# Calibrate percentile ranks
args = [
"mhcflurry-calibrate-percentile-ranks",
"--models-dir", models_dir,
"--num-peptides-per-length", "10000",
"--num-jobs", str(n_jobs),
]
print("Running with args: %s" % args)
subprocess.check_call(args)
result = Class1AffinityPredictor.load(models_dir)
predictions = result.predict(
peptides=["SLYNTVATL"],
......@@ -85,9 +95,72 @@ def run_and_check(n_jobs=0):
shutil.rmtree(models_dir)
def test_run_and_check_with_model_selection(n_jobs=1):
models_dir1 = tempfile.mkdtemp(prefix="mhcflurry-test-models")
hyperparameters_filename = os.path.join(
models_dir1, "hyperparameters.yaml")
# Include one architecture that has max_epochs = 0. We check that it never
# gets selected in model selection.
hyperparameters = [
deepcopy(HYPERPARAMETERS[0]),
deepcopy(HYPERPARAMETERS[0]),
]
hyperparameters[-1]["max_epochs"] = 0
with open(hyperparameters_filename, "w") as fd:
json.dump(hyperparameters, fd)
args = [
"mhcflurry-class1-train-allele-specific-models",
"--data", get_path("data_curated", "curated_training_data.no_mass_spec.csv.bz2"),
"--hyperparameters", hyperparameters_filename,
"--allele", "HLA-A*02:01", "HLA-A*03:01",
"--out-models-dir", models_dir1,
"--num-jobs", str(n_jobs),
"--ignore-inequalities",
"--held-out-fraction-reciprocal", "10",
"--n-models", "1",
]
print("Running with args: %s" % args)
subprocess.check_call(args)
result = Class1AffinityPredictor.load(models_dir1)
assert_equal(len(result.neural_networks), 4)
models_dir2 = tempfile.mkdtemp(prefix="mhcflurry-test-models")
args = [
"mhcflurry-class1-select-allele-specific-models",
"--data",
get_path("data_curated", "curated_training_data.no_mass_spec.csv.bz2"),
"--exclude-data", models_dir1 + "/train_data.csv.bz2",
"--out-models-dir", models_dir2,
"--models-dir", models_dir1,
"--num-jobs", str(n_jobs),
"--max-models", "1"
]
print("Running with args: %s" % args)
subprocess.check_call(args)
result = Class1AffinityPredictor.load(models_dir2)
assert_equal(len(result.neural_networks), 2)
assert_equal(
len(result.allele_to_allele_specific_models["HLA-A*02:01"]), 1)
assert_equal(
len(result.allele_to_allele_specific_models["HLA-A*03:01"]), 1)
assert_equal(
result.allele_to_allele_specific_models["HLA-A*02:01"][0].hyperparameters["max_epochs"], 500)
assert_equal(
result.allele_to_allele_specific_models["HLA-A*03:01"][
0].hyperparameters["max_epochs"], 500)
print("Deleting: %s" % models_dir1)
print("Deleting: %s" % models_dir2)
shutil.rmtree(models_dir1)
if os.environ.get("KERAS_BACKEND") != "theano":
def test_run_parallel():
run_and_check(n_jobs=3)
run_and_check(n_jobs=2)
def test_run_serial():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment