From 11ad19cd4d9ad96903242b9f8a8c87a064ccf6db Mon Sep 17 00:00:00 2001 From: Tim O'Donnell <timodonnell@gmail.com> Date: Mon, 19 Feb 2018 21:39:27 -0500 Subject: [PATCH] Basic support for model selection --- .../calibrate_percentile_ranks_command.py | 173 +++++++++ mhcflurry/class1_affinity_predictor.py | 130 ++++++- mhcflurry/common.py | 2 + mhcflurry/parallelism.py | 16 + .../select_allele_specific_models_command.py | 348 ++++++++++++++++++ .../train_allele_specific_models_command.py | 183 ++++----- setup.py | 4 + ...st_train_allele_specific_models_command.py | 94 ----- test/test_train_and_related_commands.py | 167 +++++++++ 9 files changed, 892 insertions(+), 225 deletions(-) create mode 100644 mhcflurry/calibrate_percentile_ranks_command.py create mode 100644 mhcflurry/select_allele_specific_models_command.py delete mode 100644 test/test_train_allele_specific_models_command.py create mode 100644 test/test_train_and_related_commands.py diff --git a/mhcflurry/calibrate_percentile_ranks_command.py b/mhcflurry/calibrate_percentile_ranks_command.py new file mode 100644 index 00000000..073f7ff5 --- /dev/null +++ b/mhcflurry/calibrate_percentile_ranks_command.py @@ -0,0 +1,173 @@ +""" +Calibrate percentile ranks for models. Runs in-place. +""" +import argparse +import os +import signal +import sys +import time +import traceback +import random +from functools import partial + +import numpy +import pandas +import yaml +from sklearn.metrics.pairwise import cosine_similarity +from mhcnames import normalize_allele_name +import tqdm # progress bar +tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481 + +from .class1_affinity_predictor import Class1AffinityPredictor +from .common import configure_logging +from .parallelism import ( + make_worker_pool, call_wrapped) + + +# To avoid pickling large matrices to send to child processes when running in +# parallel, we use this global variable as a place to store data. Data that is +# stored here before creating the thread pool will be inherited to the child +# processes upon fork() call, allowing us to share large data with the workers +# via shared memory. +GLOBAL_DATA = {} + +parser = argparse.ArgumentParser(usage=__doc__) + + +parser.add_argument( + "--models-dir", + metavar="DIR", + required=True, + help="Directory to read and write models") +parser.add_argument( + "--allele", + default=None, + nargs="+", + help="Alleles to train models for. If not specified, all alleles with " + "enough measurements will be used.") +parser.add_argument( + "--num-peptides-per-length", + type=int, + metavar="N", + default=int(1e5), + help="Number of peptides per length to use to calibrate percent ranks. " + "Default: %(default)s.") +parser.add_argument( + "--num-jobs", + default=1, + type=int, + metavar="N", + help="Number of processes to parallelize over. " + "Set to 1 for serial run. Set to 0 to use number of cores. Default: %(default)s.") +parser.add_argument( + "--max-tasks-per-worker", + type=int, + metavar="N", + default=None, + help="Restart workers after N tasks. Workaround for tensorflow memory " + "leaks. Requires Python >=3.2.") +parser.add_argument( + "--verbosity", + type=int, + help="Keras verbosity. Default: %(default)s", + default=0) + + +def run(argv=sys.argv[1:]): + global GLOBAL_DATA + + # On sigusr1 print stack trace + print("To show stack trace, run:\nkill -s USR1 %d" % os.getpid()) + signal.signal(signal.SIGUSR1, lambda sig, frame: traceback.print_stack()) + + args = parser.parse_args(argv) + + args.models_dir = os.path.abspath(args.models_dir) + + configure_logging(verbose=args.verbosity > 1) + + predictor = Class1AffinityPredictor.load(args.models_dir) + + if args.allele: + alleles = [normalize_allele_name(a) for a in args.allele] + else: + alleles = predictor.supported_alleles + + start = time.time() + + print("Performing percent rank calibration. Encoding peptides.") + encoded_peptides = predictor.calibrate_percentile_ranks( + alleles=[], # don't actually do any calibration, just return peptides + num_peptides_per_length=args.num_peptides_per_length) + + # Now we encode the peptides for each neural network, so the encoding + # becomes cached. + for network in predictor.neural_networks: + network.peptides_to_network_input(encoded_peptides) + assert encoded_peptides.encoding_cache # must have cached the encoding + print("Finished encoding peptides for percent ranks in %0.2f sec." % ( + time.time() - start)) + print("Calibrating percent rank calibration for %d alleles." % len(alleles)) + + if args.num_jobs == 1: + # Serial run + print("Running in serial.") + worker_pool = None + results = ( + calibrate_percentile_ranks( + allele=allele, + predictor=predictor, + peptides=encoded_peptides) + for allele in alleles) + else: + # Parallel run + # Store peptides in global variable so they are in shared memory + # after fork, instead of needing to be pickled. + GLOBAL_DATA["calibration_peptides"] = encoded_peptides + + worker_pool = make_worker_pool( + processes=( + args.num_jobs + if args.num_jobs else None), + max_tasks_per_worker=args.max_tasks_per_worker) + + results = worker_pool.imap_unordered( + partial( + partial(call_wrapped, calibrate_percentile_ranks), + predictor=predictor), + alleles, + chunksize=1) + + for result in tqdm.tqdm(results, total=len(alleles)): + predictor.allele_to_percent_rank_transform.update(result) + print("Done calibrating %d additional alleles." % len(alleles)) + predictor.save(args.models_dir, model_names_to_write=[]) + + percent_rank_calibration_time = time.time() - start + + if worker_pool: + worker_pool.close() + worker_pool.join() + + print("Percent rank calibration time: %0.2f min." % ( + percent_rank_calibration_time / 60.0)) + print("Predictor written to: %s" % args.models_dir) + + +def calibrate_percentile_ranks(allele, predictor, peptides=None): + """ + Private helper function. + """ + global GLOBAL_DATA + if peptides is None: + peptides = GLOBAL_DATA["calibration_peptides"] + predictor.calibrate_percentile_ranks( + peptides=peptides, + alleles=[allele]) + return { + allele: predictor.allele_to_percent_rank_transform[allele], + } + + +if __name__ == '__main__': + run() diff --git a/mhcflurry/class1_affinity_predictor.py b/mhcflurry/class1_affinity_predictor.py index a691979c..50c13d6d 100644 --- a/mhcflurry/class1_affinity_predictor.py +++ b/mhcflurry/class1_affinity_predictor.py @@ -47,7 +47,8 @@ class Class1AffinityPredictor(object): class1_pan_allele_models=None, allele_to_fixed_length_sequence=None, manifest_df=None, - allele_to_percent_rank_transform=None): + allele_to_percent_rank_transform=None, + metadata_dataframes=None): """ Parameters ---------- @@ -68,6 +69,10 @@ class Class1AffinityPredictor(object): allele_to_percent_rank_transform : dict of string -> `PercentRankTransform`, optional `PercentRankTransform` instances to use for each allele + + metadata_dataframes : dict of string -> pandas.DataFrame, optional + Optional additional dataframes to write to the models dir when + save() is called. Useful for tracking provenance. """ if allele_to_allele_specific_models is None: @@ -107,6 +112,7 @@ class Class1AffinityPredictor(object): if not allele_to_percent_rank_transform: allele_to_percent_rank_transform = {} self.allele_to_percent_rank_transform = allele_to_percent_rank_transform + self.metadata_dataframes = metadata_dataframes self._cache = {} def clear_cache(self): @@ -264,7 +270,7 @@ class Class1AffinityPredictor(object): self._cache["supported_peptide_lengths"] = result return self._cache["supported_peptide_lengths"] - def save(self, models_dir, model_names_to_write=None): + def save(self, models_dir, model_names_to_write=None, write_metadata=True): """ Serialize the predictor to a directory on disk. If the directory does not exist it will be created. @@ -284,6 +290,9 @@ class Class1AffinityPredictor(object): model_names_to_write : list of string, optional Only write the weights for the specified models. Useful for incremental updates during training. + + write_metadata : boolean, optional + Whether to write optional metadata """ num_models = len(self.class1_pan_allele_models) + sum( len(v) for v in self.allele_to_allele_specific_models.values()) @@ -315,16 +324,22 @@ class Class1AffinityPredictor(object): write_manifest_df.to_csv(manifest_path, index=False) logging.info("Wrote: %s" % manifest_path) - # Write "info.txt" - info_path = join(models_dir, "info.txt") - rows = [ - ("trained on", time.asctime()), - ("package ", "mhcflurry %s" % __version__), - ("hostname ", gethostname()), - ("user ", getuser()), - ] - pandas.DataFrame(rows).to_csv( - info_path, sep="\t", header=False, index=False) + if write_metadata: + # Write "info.txt" + info_path = join(models_dir, "info.txt") + rows = [ + ("trained on", time.asctime()), + ("package ", "mhcflurry %s" % __version__), + ("hostname ", gethostname()), + ("user ", getuser()), + ] + pandas.DataFrame(rows).to_csv( + info_path, sep="\t", header=False, index=False) + + if self.metadata_dataframes: + for (name, df) in self.metadata_dataframes.items(): + metadata_df_path = join(models_dir, "%s.csv.bz2" % name) + df.to_csv(metadata_df_path, index=False, compression="bz2") if self.allele_to_fixed_length_sequence is not None: allele_to_sequence_df = pandas.DataFrame( @@ -1132,4 +1147,93 @@ class Class1AffinityPredictor(object): allele_to_allele_specific_models=allele_to_allele_specific_models, class1_pan_allele_models=class1_pan_allele_models, allele_to_fixed_length_sequence=self.allele_to_fixed_length_sequence, - ) \ No newline at end of file + ) + + def model_select( + self, + score_function, + alleles=None, + min_models=1, + max_models=10000): + """ + Perform model selection using a user-specified scoring function. + + Model selection is done using a "step up" variable selection procedure, + in which models are repeatedly added to an ensemble until the score + stops improving. + + Parameters + ---------- + score_function : Class1AffinityPredictor -> float function + Scoring function + + alleles : list of string, optional + If not specified, model selection is performed for all alleles. + + min_models : int, optional + Min models to select per allele + + max_models : int, optional + Max models to select per allele + + Returns + ------- + Class1AffinityPredictor : predictor containing the selected models + """ + + if alleles is None: + alleles = self.supported_alleles + + dfs = [] + allele_to_allele_specific_models = {} + for allele in alleles: + df = pandas.DataFrame({ + 'model': self.allele_to_allele_specific_models[allele] + }) + df["model_num"] = df.index + df["allele"] = allele + df["selected"] = False + + round_num = 1 + + while not df.selected.all() and sum(df.selected) < max_models: + score_col = "score_%2d" % round_num + prev_score_col = "score_%2d" % (round_num - 1) + + existing_selected = list(df[df.selected].model) + df[score_col] = [ + numpy.nan if row.selected else + score_function( + Class1AffinityPredictor( + allele_to_allele_specific_models={ + allele: [row.model] + existing_selected + })) + for (_, row) in df.iterrows() + ] + + if round_num > min_models and ( + df[score_col].max() < df[prev_score_col].max()): + break + + # In case of a tie, pick a model at random. + (best_model_index,) = df.loc[ + (df[score_col] == df[score_col].max()) + ].sample(1).index + df.loc[best_model_index, "selected"] = True + round_num += 1 + + dfs.append(df) + print("Selected %d models for allele %s" % ( + df.selected.sum(), allele)) + allele_to_allele_specific_models[allele] = list( + df.loc[df.selected].model) + + df = pandas.concat(dfs, ignore_index=True) + + new_predictor = Class1AffinityPredictor( + allele_to_allele_specific_models, + metadata_dataframes={ + "model_selection": df, + }) + return new_predictor + diff --git a/mhcflurry/common.py b/mhcflurry/common.py index 4b6f6d56..4d4afe01 100644 --- a/mhcflurry/common.py +++ b/mhcflurry/common.py @@ -3,6 +3,8 @@ import collections import logging import sys import os +from struct import unpack +from hashlib import sha256 import numpy import pandas diff --git a/mhcflurry/parallelism.py b/mhcflurry/parallelism.py index f4d0d910..10da1f80 100644 --- a/mhcflurry/parallelism.py +++ b/mhcflurry/parallelism.py @@ -4,6 +4,11 @@ from multiprocessing import Pool, Queue, cpu_count from six.moves import queue from multiprocessing.util import Finalize from pprint import pprint +import random + +import numpy + +from .common import set_keras_backend def make_worker_pool( @@ -102,6 +107,17 @@ def worker_init_entry_point( init_function(**kwargs) +def worker_init(keras_backend=None, gpu_device_nums=None): + # Each worker needs distinct random numbers + numpy.random.seed() + random.seed() + if keras_backend or gpu_device_nums: + print("WORKER pid=%d assigned GPU devices: %s" % ( + os.getpid(), gpu_device_nums)) + set_keras_backend( + keras_backend, gpu_device_nums=gpu_device_nums) + + # Solution suggested in https://bugs.python.org/issue13831 class WrapException(Exception): """ diff --git a/mhcflurry/select_allele_specific_models_command.py b/mhcflurry/select_allele_specific_models_command.py new file mode 100644 index 00000000..78650c16 --- /dev/null +++ b/mhcflurry/select_allele_specific_models_command.py @@ -0,0 +1,348 @@ +""" +Model select class1 single allele models. +""" +import argparse +import os +import signal +import sys +import time +import traceback +import random +from functools import partial + +import pandas +from scipy.stats import kendalltau + +from mhcnames import normalize_allele_name +import tqdm # progress bar +tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481 + +from .class1_affinity_predictor import Class1AffinityPredictor +from .encodable_sequences import EncodableSequences +from .common import configure_logging, random_peptides +from .parallelism import make_worker_pool +from .regression_target import from_ic50 + + +# To avoid pickling large matrices to send to child processes when running in +# parallel, we use this global variable as a place to store data. Data that is +# stored here before creating the thread pool will be inherited to the child +# processes upon fork() call, allowing us to share large data with the workers +# via shared memory. +GLOBAL_DATA = {} + + +parser = argparse.ArgumentParser(usage=__doc__) + +parser.add_argument( + "--data", + metavar="FILE.csv", + required=False, + help=( + "Model selection data CSV. Expected columns: " + "allele, peptide, measurement_value")) +parser.add_argument( + "--exclude-data", + metavar="FILE.csv", + required=False, + help=( + "Data to EXCLUDE from model selection. Useful to specify the original " + "training data used")) +parser.add_argument( + "--models-dir", + metavar="DIR", + required=True, + help="Directory to read models") +parser.add_argument( + "--out-models-dir", + metavar="DIR", + required=True, + help="Directory to write selected models") +parser.add_argument( + "--allele", + default=None, + nargs="+", + help="Alleles to select models for. If not specified, all alleles with " + "enough measurements will be used.") +parser.add_argument( + "--min-measurements-per-allele", + type=int, + metavar="N", + default=50, + help="Min number of data points required for data-driven model selection") +parser.add_argument( + "--min-models", + type=int, + default=8, + metavar="N", + help="Min number of models to select per allele") +parser.add_argument( + "--max-models", + type=int, + default=15, + metavar="N", + help="Max number of models to select per allele") +parser.add_argument( + "--scoring", + nargs="+", + choices=("mse", "mass-spec", "consensus"), + default=["mse", "consensus"], + help="Scoring procedures to use in order") +parser.add_argument( + "--consensus-num-peptides-per-length", + type=int, + default=100000, + help="Num peptides per length to use for consensus scoring") +parser.add_argument( + "--num-jobs", + default=1, + type=int, + metavar="N", + help="Number of processes to parallelize selection over. " + "Set to 1 for serial run. Set to 0 to use number of cores. Default: %(default)s.") +parser.add_argument( + "--backend", + choices=("tensorflow-gpu", "tensorflow-cpu", "tensorflow-default"), + help="Keras backend. If not specified will use system default.") +parser.add_argument( + "--verbosity", + type=int, + help="Keras verbosity. Default: %(default)s", + default=0) + + +def run(argv=sys.argv[1:]): + global GLOBAL_DATA + + # On sigusr1 print stack trace + print("To show stack trace, run:\nkill -s USR1 %d" % os.getpid()) + signal.signal(signal.SIGUSR1, lambda sig, frame: traceback.print_stack()) + + args = parser.parse_args(argv) + + args.out_models_dir = os.path.abspath(args.out_models_dir) + + configure_logging(verbose=args.verbosity > 1) + + input_predictor = Class1AffinityPredictor.load(args.models_dir) + print("Loaded: %s" % input_predictor) + + if args.allele: + alleles = [normalize_allele_name(a) for a in args.allele] + else: + alleles = input_predictor.supported_alleles + + if args.data: + df = pandas.read_csv(args.data) + print("Loaded data: %s" % (str(df.shape))) + + df = df.ix[ + (df.peptide.str.len() >= 8) & (df.peptide.str.len() <= 15) + ] + print("Subselected to 8-15mers: %s" % (str(df.shape))) + + # Allele names in data are assumed to be already normalized. + df = df.loc[df.allele.isin(alleles)].dropna() + print("Selected %d alleles: %s" % (len(alleles), ' '.join(alleles))) + + metadata_dfs = {} + if args.exclude_data: + exclude_df = pandas.read_csv(args.exclude_data) + metadata_dfs["model_selection_exclude"] = exclude_df + print("Loaded exclude data: %s" % (str(df.shape))) + + df["_key"] = df.allele + "__" + df.peptide + exclude_df["_key"] = exclude_df.allele + "__" + exclude_df.peptide + df["_excluded"] = df._key.isin(exclude_df._key.unique()) + print("Excluding measurements per allele (counts): ") + print(df.groupby("allele")._excluded.sum()) + + print("Excluding measurements per allele (fractions): ") + print(df.groupby("allele")._excluded.mean()) + + df = df.loc[~df._excluded] + print("Reduced data to: %s" % (str(df.shape))) + else: + df = None + + model_selection_kwargs = { + 'min_models': args.min_models, + 'max_models': args.max_models, + } + + selectors = {} + for scoring in args.scoring: + if scoring == "mse": + selector = MSEModelSelector( + df=df, + predictor=input_predictor, + min_measurements=args.min_measurements_per_allele, + model_selection_kwargs=model_selection_kwargs) + elif scoring == "consensus": + selector = ConsensusModelSelector( + predictor=input_predictor, + num_peptides_per_length=args.consensus_num_peptides_per_length, + model_selection_kwargs=model_selection_kwargs) + selectors[scoring] = selector + + print("Selectors for alleles:") + allele_to_selector = {} + for allele in alleles: + selector = None + for possible_selector in args.scoring: + if selectors[possible_selector].usable_for_allele(allele=allele): + selector = selectors[possible_selector] + print("%20s %s" % (allele, possible_selector)) + break + if selector is None: + raise ValueError("No selectors usable for allele: %s" % allele) + allele_to_selector[allele] = selector + + GLOBAL_DATA["allele_to_selector"] = allele_to_selector + + if not os.path.exists(args.out_models_dir): + print("Attempting to create directory: %s" % args.out_models_dir) + os.mkdir(args.out_models_dir) + print("Done.") + + metadata_dfs["model_selection_data"] = df + result_predictor = Class1AffinityPredictor(metadata_dataframes=metadata_dfs) + + start = time.time() + if args.num_jobs == 1: + # Serial run + print("Running in serial.") + worker_pool = None + results = ( + model_select(allele) for allele in alleles) + else: + worker_pool = make_worker_pool( + processes=( + args.num_jobs + if args.num_jobs else None), + max_tasks_per_worker=args.max_tasks_per_worker) + + random.shuffle(alleles) + results = worker_pool.imap_unordered( + model_select, + alleles, + chunksize=1) + + for result in tqdm.tqdm(results, total=len(alleles)): + result_predictor.merge_in_place([result]) + + print("Done model selecting for %d alleles." % len(alleles)) + result_predictor.save(args.out_models_dir) + + model_selection_time = time.time() - start + + if worker_pool: + worker_pool.close() + worker_pool.join() + + print("Model selection time %0.2f min." % (model_selection_time / 60.0)) + print("Predictor written to: %s" % args.models_dir) + + +def model_select(allele): + global GLOBAL_DATA + selector = GLOBAL_DATA["allele_to_selector"][allele] + return selector.select(allele) + + +class ConsensusModelSelector(object): + def __init__( + self, + predictor, + num_peptides_per_length=100000, + model_selection_kwargs={}): + + (min_length, max_length) = predictor.supported_peptide_lengths + peptides = [] + for length in range(min_length, max_length + 1): + peptides.extend( + random_peptides(num_peptides_per_length, length=length)) + + self.peptides = EncodableSequences.create(peptides) + self.predictor = predictor + self.model_selection_kwargs = model_selection_kwargs + + # Run predictions for one allele so self.peptides caches the encoding. + self.predictor.predict( + allele=self.predictor.supported_alleles[0], + peptides=self.peptides) + + def usable_for_allele(self, allele): + return True + + def score_function(self, allele, ensemble_predictions, predictor): + predictions = predictor.predict( + allele=allele, + peptides=self.peptides, + ) + return kendalltau(predictions, ensemble_predictions).correlation + + def select(self, allele): + full_ensemble_predictions = self.predictor.predict( + allele=allele, + peptides=self.peptides) + + return self.predictor.model_select( + score_function=partial( + self.score_function, allele, full_ensemble_predictions), + alleles=[allele], + **self.model_selection_kwargs + ) + + +class MSEModelSelector(object): + def __init__( + self, + df, + predictor, + model_selection_kwargs={}, + min_measurements=1): + + self.df = df + self.predictor = predictor + self.model_selection_kwargs = model_selection_kwargs + self.min_measurements = min_measurements + + def usable_for_allele(self, allele): + return (self.df.allele == allele).sum() >= self.min_measurements + + @staticmethod + def score_function(allele, sub_df, peptides, predictor): + predictions = predictor.predict( + allele=allele, + peptides=peptides, + ) + deviations = from_ic50(predictions) - from_ic50(sub_df.measurement_value) + + if 'measurement_inequality' in sub_df.columns: + # Must reverse meaning of inequality since we are working with + # transformed 0-1 values, which are anti-correlated with the ic50s. + # The measurement_inequality column is given in terms of ic50s. + deviations.loc[ + ((sub_df.measurement_inequality == "<") & (deviations > 0)) | + ((sub_df.measurement_inequality == ">") & (deviations < 0)) + ] = 0.0 + + return -1 * (deviations**2).mean() + + def select(self, allele): + sub_df = self.df.loc[self.df.allele == allele] + peptides = EncodableSequences.create(sub_df.peptide.values) + + return self.predictor.model_select( + score_function=partial( + self.score_function, allele, sub_df, peptides), + alleles=[allele], + **self.model_selection_kwargs + ) + + + + +if __name__ == '__main__': + run() diff --git a/mhcflurry/train_allele_specific_models_command.py b/mhcflurry/train_allele_specific_models_command.py index d06016e2..d3d2adfc 100644 --- a/mhcflurry/train_allele_specific_models_command.py +++ b/mhcflurry/train_allele_specific_models_command.py @@ -10,10 +10,10 @@ import traceback import random from functools import partial -import numpy import pandas import yaml from sklearn.metrics.pairwise import cosine_similarity +from sklearn.model_selection import StratifiedKFold from mhcnames import normalize_allele_name import tqdm # progress bar tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481 @@ -21,7 +21,7 @@ tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481 from .class1_affinity_predictor import Class1AffinityPredictor from .common import configure_logging, set_keras_backend from .parallelism import ( - make_worker_pool, cpu_count, call_wrapped, call_wrapped_kwargs) + make_worker_pool, cpu_count, call_wrapped_kwargs, worker_init) from .hyperparameters import HyperparameterDefaults from .allele_encoding import AlleleEncoding @@ -70,19 +70,25 @@ parser.add_argument( metavar="N", default=50, help="Train models for alleles with >=N measurements.") +parser.add_argument( + "--held-out-fraction-reciprocal", + type=int, + metavar="N", + default=None, + help="Hold out 1/N fraction of data (for e.g. subsequent model selection. " + "For example, specify 5 to hold out 20% of the data.") +parser.add_argument( + "--held-out-fraction-seed", + type=int, + metavar="N", + default=0, + help="Seed for randomizing which measurements are held out. Only matters " + "when --held-out-fraction is specified. Default: %(default)s.") parser.add_argument( "--ignore-inequalities", action="store_true", default=False, help="Do not use affinity value inequalities even when present in data") -parser.add_argument( - "--percent-rank-calibration-num-peptides-per-length", - type=int, - metavar="N", - default=int(1e5), - help="Number of peptides per length to use to calibrate percent ranks. " - "Set to 0 to disable percent rank calibration. The resulting models will " - "not support percent ranks. Default: %(default)s.") parser.add_argument( "--n-models", type=int, @@ -100,20 +106,12 @@ parser.add_argument( "--allele-sequences", metavar="FILE.csv", help="Allele sequences file. Used for computing allele similarity matrix.") -parser.add_argument( - "--verbosity", - type=int, - help="Keras verbosity. Default: %(default)s", - default=0) parser.add_argument( "--num-jobs", - default=[1], + default=1, type=int, metavar="N", - nargs="+", - help="Number of processes to parallelize training and percent rank " - "calibration over, respectively. Experimental. If only one value is specified " - "then the same number of jobs is used for both phases." + help="Number of processes to parallelize training over. Experimental. " "Set to 1 for serial run. Set to 0 to use number of cores. Default: %(default)s.") parser.add_argument( "--backend", @@ -146,6 +144,11 @@ parser.add_argument( default=None, help="Restart workers after N tasks. Workaround for tensorflow memory " "leaks. Requires Python >=3.2.") +parser.add_argument( + "--verbosity", + type=int, + help="Keras verbosity. Default: %(default)s", + default=0) TRAIN_DATA_HYPERPARAMETER_DEFAULTS = HyperparameterDefaults( @@ -196,8 +199,14 @@ def run(argv=sys.argv[1:]): # Allele names in data are assumed to be already normalized. df = df.loc[df.allele.isin(alleles)].dropna() - print("Selected %d alleles: %s" % (len(alleles), ' '.join(alleles))) + + if args.held_out_fraction_reciprocal: + df = subselect_df_held_out( + df, + recriprocal_held_out_fraction=args.held_out_fraction_reciprocal, + seed=args.held_out_fraction_seed) + print("Training data: %s" % (str(df.shape))) GLOBAL_DATA["train_data"] = df @@ -208,8 +217,11 @@ def run(argv=sys.argv[1:]): os.mkdir(args.out_models_dir) print("Done.") - predictor = Class1AffinityPredictor() - serial_run = args.num_jobs[0] == 1 + predictor = Class1AffinityPredictor( + metadata_dataframes={ + 'train_data': df, + }) + serial_run = args.num_jobs == 1 work_items = [] for (h, hyperparameters) in enumerate(hyperparameters_lst): @@ -219,14 +231,15 @@ def run(argv=sys.argv[1:]): if args.n_models: n_models = args.n_models if not n_models: - raise ValueError("Specify --ensemble-size or n_models hyperparameter") + raise ValueError( + "Specify --ensemble-size or n_models hyperparameter") if args.max_epochs: hyperparameters['max_epochs'] = args.max_epochs - hyperparameters['train_data'] = TRAIN_DATA_HYPERPARAMETER_DEFAULTS.with_defaults( - hyperparameters.get('train_data', {}) - ) + hyperparameters['train_data'] = ( + TRAIN_DATA_HYPERPARAMETER_DEFAULTS.with_defaults( + hyperparameters.get('train_data', {}))) if hyperparameters['train_data']['pretrain_min_points'] and ( 'allele_similarity_matrix' not in GLOBAL_DATA): @@ -281,7 +294,7 @@ def run(argv=sys.argv[1:]): set_keras_backend(args.backend) else: # Parallel run. - num_workers = args.num_jobs[0] if args.num_jobs[0] else cpu_count() + num_workers = args.num_jobs if args.num_jobs else cpu_count() worker_init_kwargs = None if args.gpus: print("Attempting to round-robin assign each worker a GPU.") @@ -342,7 +355,9 @@ def run(argv=sys.argv[1:]): save_start = time.time() new_model_names = predictor.merge_in_place(unsaved_predictors) predictor.save( - args.out_models_dir, model_names_to_write=new_model_names) + args.out_models_dir, + model_names_to_write=new_model_names, + write_metadata=False) print( "Saved predictor (%d models total) including %d new models " "in %0.2f sec to %s" % ( @@ -353,10 +368,7 @@ def run(argv=sys.argv[1:]): unsaved_predictors = [] last_save_time = time.time() - print("Saving final predictor to: %s" % args.out_models_dir) predictor.merge_in_place(unsaved_predictors) - predictor.save(args.out_models_dir) # write all models just to be sure - print("Done.") else: # Run in serial. In this case, every worker is passed the same predictor, @@ -368,77 +380,20 @@ def run(argv=sys.argv[1:]): assert work_predictor is predictor assert not work_items + print("Saving final predictor to: %s" % args.out_models_dir) + predictor.save(args.out_models_dir) # write all models just to be sure + print("Done.") + print("*" * 30) training_time = time.time() - start print("Trained affinity predictor with %d networks in %0.2f min." % ( len(predictor.neural_networks), training_time / 60.0)) print("*" * 30) - if worker_pool: - worker_pool.close() - worker_pool.join() - worker_pool = None - - start = time.time() - if args.percent_rank_calibration_num_peptides_per_length > 0: - alleles = list(predictor.supported_alleles) - - print("Performing percent rank calibration. Encoding peptides.") - encoded_peptides = predictor.calibrate_percentile_ranks( - alleles=[], # don't actually do any calibration, just return peptides - num_peptides_per_length=args.percent_rank_calibration_num_peptides_per_length) - - # Now we encode the peptides for each neural network, so the encoding - # becomes cached. - for network in predictor.neural_networks: - network.peptides_to_network_input(encoded_peptides) - assert encoded_peptides.encoding_cache # must have cached the encoding - print("Finished encoding peptides for percent ranks in %0.2f sec." % ( - time.time() - start)) - print("Calibrating percent rank calibration for %d alleles." % len(alleles)) - - if args.num_jobs[-1] == 1: - # Serial run - print("Running in serial.") - worker_pool = None - results = ( - calibrate_percentile_ranks( - allele=allele, - predictor=predictor, - peptides=encoded_peptides) - for allele in alleles) - else: - # Parallel run - # Store peptides in global variable so they are in shared memory - # after fork, instead of needing to be pickled. - GLOBAL_DATA["calibration_peptides"] = encoded_peptides - - worker_pool = make_worker_pool( - processes=( - args.num_jobs[-1] - if args.num_jobs[-1] else None), - max_tasks_per_worker=args.max_tasks_per_worker) - - results = worker_pool.imap_unordered( - partial( - partial(call_wrapped, calibrate_percentile_ranks), - predictor=predictor), - alleles, - chunksize=1) - - for result in tqdm.tqdm(results, total=len(alleles)): - predictor.allele_to_percent_rank_transform.update(result) - print("Done calibrating %d additional alleles." % len(alleles)) - predictor.save(args.out_models_dir, model_names_to_write=[]) - - percent_rank_calibration_time = time.time() - start - if worker_pool: worker_pool.close() worker_pool.join() - print("Train time: %0.2f min. Percent rank calibration time: %0.2f min." % ( - training_time / 60.0, percent_rank_calibration_time / 60.0)) print("Predictor written to: %s" % args.out_models_dir) @@ -448,7 +403,8 @@ def alleles_by_similarity(allele): if allele not in allele_similarity.columns: # Use random alleles print("No similar alleles for: %s" % allele) - return [allele] + list(allele_similarity.columns.to_series().sample(frac=1.0)) + return [allele] + list( + allele_similarity.columns.to_series().sample(frac=1.0)) return ( allele_similarity[allele] + ( allele_similarity.index == allele) # force that we return specified allele first @@ -528,30 +484,21 @@ def train_model( return predictor -def calibrate_percentile_ranks(allele, predictor, peptides=None): - """ - Private helper function. - """ - global GLOBAL_DATA - if peptides is None: - peptides = GLOBAL_DATA["calibration_peptides"] - predictor.calibrate_percentile_ranks( - peptides=peptides, - alleles=[allele]) - return { - allele: predictor.allele_to_percent_rank_transform[allele], - } - - -def worker_init(keras_backend=None, gpu_device_nums=None): - # Each worker needs distinct random numbers - numpy.random.seed() - random.seed() - if keras_backend or gpu_device_nums: - print("WORKER pid=%d assigned GPU devices: %s" % ( - os.getpid(), gpu_device_nums)) - set_keras_backend( - keras_backend, gpu_device_nums=gpu_device_nums) +def subselect_df_held_out(df, recriprocal_held_out_fraction=10, seed=0): + kf = StratifiedKFold( + n_splits=recriprocal_held_out_fraction, + shuffle=True, + random_state=seed) + + # Stratify by both allele and binder vs. nonbinder. + df["key"] = [ + "%s_%s" % ( + row.allele, + "binder" if row.measurement_value <= 500 else "nonbinder") + for (_, row) in df.iterrows() + ] + (train, test) = next(kf.split(df, df.key)) + return df.iloc[train] if __name__ == '__main__': run() diff --git a/setup.py b/setup.py index 9076caf1..fe12d963 100644 --- a/setup.py +++ b/setup.py @@ -78,6 +78,10 @@ if __name__ == '__main__': 'mhcflurry-predict = mhcflurry.predict_command:run', 'mhcflurry-class1-train-allele-specific-models = ' 'mhcflurry.train_allele_specific_models_command:run', + 'mhcflurry-class1-select-allele-specific-models = ' + 'mhcflurry.select_allele_specific_models_command:run', + 'mhcflurry-calibrate-percentile-ranks = ' + 'mhcflurry.calibrate_percentile_ranks_command:run', ] }, classifiers=[ diff --git a/test/test_train_allele_specific_models_command.py b/test/test_train_allele_specific_models_command.py deleted file mode 100644 index ab79e680..00000000 --- a/test/test_train_allele_specific_models_command.py +++ /dev/null @@ -1,94 +0,0 @@ -import json -import os -import shutil -import tempfile -import subprocess - -from numpy.testing import assert_array_less, assert_equal - -from mhcflurry import Class1AffinityPredictor -from mhcflurry.downloads import get_path - -HYPERPARAMETERS = [ - { - "n_models": 2, - "max_epochs": 100, - "patience": 10, - "early_stopping": True, - "validation_split": 0.2, - - "random_negative_rate": 0.0, - "random_negative_constant": 25, - - "peptide_amino_acid_encoding": "BLOSUM62", - "use_embedding": False, - "kmer_size": 15, - "batch_normalization": False, - "locally_connected_layers": [ - { - "filters": 8, - "activation": "tanh", - "kernel_size": 3 - }, - { - "filters": 8, - "activation": "tanh", - "kernel_size": 3 - } - ], - "activation": "relu", - "output_activation": "sigmoid", - "layer_sizes": [ - 32 - ], - "random_negative_affinity_min": 20000.0, - "random_negative_affinity_max": 50000.0, - "dense_layer_l1_regularization": 0.001, - "dropout_probability": 0.0 - } -] - - -def run_and_check(n_jobs=0): - models_dir = tempfile.mkdtemp(prefix="mhcflurry-test-models") - hyperparameters_filename = os.path.join( - models_dir, "hyperparameters.yaml") - with open(hyperparameters_filename, "w") as fd: - json.dump(HYPERPARAMETERS, fd) - - args = [ - "mhcflurry-class1-train-allele-specific-models", - "--data", get_path("data_curated", "curated_training_data.no_mass_spec.csv.bz2"), - "--hyperparameters", hyperparameters_filename, - "--allele", "HLA-A*02:01", "HLA-A*01:01", "HLA-A*03:01", - "--out-models-dir", models_dir, - "--percent-rank-calibration-num-peptides-per-length", "10000", - "--num-jobs", str(n_jobs), - "--ignore-inequalities", - ] - print("Running with args: %s" % args) - subprocess.check_call(args) - - result = Class1AffinityPredictor.load(models_dir) - predictions = result.predict( - peptides=["SLYNTVATL"], - alleles=["HLA-A*02:01"]) - assert_equal(predictions.shape, (1,)) - assert_array_less(predictions, 500) - df = result.predict_to_dataframe( - peptides=["SLYNTVATL"], - alleles=["HLA-A*02:01"]) - print(df) - assert "prediction_percentile" in df.columns - - print("Deleting: %s" % models_dir) - shutil.rmtree(models_dir) - - -if os.environ.get("KERAS_BACKEND") != "theano": - def test_run_parallel(): - run_and_check(n_jobs=3) - - -def test_run_serial(): - run_and_check(n_jobs=1) \ No newline at end of file diff --git a/test/test_train_and_related_commands.py b/test/test_train_and_related_commands.py new file mode 100644 index 00000000..91394102 --- /dev/null +++ b/test/test_train_and_related_commands.py @@ -0,0 +1,167 @@ +""" +Test train, calibrate percentile ranks, and model selection commands. +""" + +import json +import os +import shutil +import tempfile +import subprocess +from copy import deepcopy + +from numpy.testing import assert_array_less, assert_equal + +from mhcflurry import Class1AffinityPredictor +from mhcflurry.downloads import get_path + +HYPERPARAMETERS = [ + { + "n_models": 2, + "max_epochs": 500, + "patience": 10, + "minibatch_size": 128, + "early_stopping": True, + "validation_split": 0.2, + + "random_negative_rate": 0.0, + "random_negative_constant": 25, + + "peptide_amino_acid_encoding": "BLOSUM62", + "use_embedding": False, + "kmer_size": 15, + "batch_normalization": False, + "locally_connected_layers": [ + { + "filters": 8, + "activation": "tanh", + "kernel_size": 3 + } + ], + "activation": "tanh", + "output_activation": "sigmoid", + "layer_sizes": [ + 16 + ], + "random_negative_affinity_min": 20000.0, + "random_negative_affinity_max": 50000.0, + "dense_layer_l1_regularization": 0.001, + "dropout_probability": 0.0 + } +] + + +def run_and_check(n_jobs=0): + models_dir = tempfile.mkdtemp(prefix="mhcflurry-test-models") + hyperparameters_filename = os.path.join( + models_dir, "hyperparameters.yaml") + with open(hyperparameters_filename, "w") as fd: + json.dump(HYPERPARAMETERS, fd) + + args = [ + "mhcflurry-class1-train-allele-specific-models", + "--data", get_path("data_curated", "curated_training_data.no_mass_spec.csv.bz2"), + "--hyperparameters", hyperparameters_filename, + "--allele", "HLA-A*02:01", "HLA-A*03:01", + "--out-models-dir", models_dir, + "--num-jobs", str(n_jobs), + "--ignore-inequalities", + ] + print("Running with args: %s" % args) + subprocess.check_call(args) + + # Calibrate percentile ranks + args = [ + "mhcflurry-calibrate-percentile-ranks", + "--models-dir", models_dir, + "--num-peptides-per-length", "10000", + "--num-jobs", str(n_jobs), + ] + print("Running with args: %s" % args) + subprocess.check_call(args) + + result = Class1AffinityPredictor.load(models_dir) + predictions = result.predict( + peptides=["SLYNTVATL"], + alleles=["HLA-A*02:01"]) + assert_equal(predictions.shape, (1,)) + assert_array_less(predictions, 500) + df = result.predict_to_dataframe( + peptides=["SLYNTVATL"], + alleles=["HLA-A*02:01"]) + print(df) + assert "prediction_percentile" in df.columns + + print("Deleting: %s" % models_dir) + shutil.rmtree(models_dir) + + +def test_run_and_check_with_model_selection(n_jobs=1): + models_dir1 = tempfile.mkdtemp(prefix="mhcflurry-test-models") + hyperparameters_filename = os.path.join( + models_dir1, "hyperparameters.yaml") + + # Include one architecture that has max_epochs = 0. We check that it never + # gets selected in model selection. + hyperparameters = [ + deepcopy(HYPERPARAMETERS[0]), + deepcopy(HYPERPARAMETERS[0]), + ] + hyperparameters[-1]["max_epochs"] = 0 + with open(hyperparameters_filename, "w") as fd: + json.dump(hyperparameters, fd) + + args = [ + "mhcflurry-class1-train-allele-specific-models", + "--data", get_path("data_curated", "curated_training_data.no_mass_spec.csv.bz2"), + "--hyperparameters", hyperparameters_filename, + "--allele", "HLA-A*02:01", "HLA-A*03:01", + "--out-models-dir", models_dir1, + "--num-jobs", str(n_jobs), + "--ignore-inequalities", + "--held-out-fraction-reciprocal", "10", + "--n-models", "1", + ] + print("Running with args: %s" % args) + subprocess.check_call(args) + + result = Class1AffinityPredictor.load(models_dir1) + assert_equal(len(result.neural_networks), 4) + + models_dir2 = tempfile.mkdtemp(prefix="mhcflurry-test-models") + args = [ + "mhcflurry-class1-select-allele-specific-models", + "--data", + get_path("data_curated", "curated_training_data.no_mass_spec.csv.bz2"), + "--exclude-data", models_dir1 + "/train_data.csv.bz2", + "--out-models-dir", models_dir2, + "--models-dir", models_dir1, + "--num-jobs", str(n_jobs), + "--max-models", "1" + ] + print("Running with args: %s" % args) + subprocess.check_call(args) + + result = Class1AffinityPredictor.load(models_dir2) + assert_equal(len(result.neural_networks), 2) + assert_equal( + len(result.allele_to_allele_specific_models["HLA-A*02:01"]), 1) + assert_equal( + len(result.allele_to_allele_specific_models["HLA-A*03:01"]), 1) + assert_equal( + result.allele_to_allele_specific_models["HLA-A*02:01"][0].hyperparameters["max_epochs"], 500) + assert_equal( + result.allele_to_allele_specific_models["HLA-A*03:01"][ + 0].hyperparameters["max_epochs"], 500) + + print("Deleting: %s" % models_dir1) + print("Deleting: %s" % models_dir2) + shutil.rmtree(models_dir1) + + +if os.environ.get("KERAS_BACKEND") != "theano": + def test_run_parallel(): + run_and_check(n_jobs=2) + + +def test_run_serial(): + run_and_check(n_jobs=1) \ No newline at end of file -- GitLab