Skip to content
Snippets Groups Projects
Commit 1d37637c authored by Tim O'Donnell's avatar Tim O'Donnell
Browse files

Remove antigen presentation submodule (for now)

parent 55b6e70c
No related branches found
No related tags found
No related merge requests found
Showing
with 0 additions and 1954 deletions
# Prediction of antigen presention
This submodule contains predictors for naturally presented MHC ligands. These predictors are typically trained on peptides eluted from cell surfaces and identified with mass-spec. The models combine MHC binding affinity with cleavage prediction and the level of expression of transcripts containing the given peptide.
This is a work in progress and not ready for production use.
from .presentation_model import PresentationModel, build_presentation_models
from .percent_rank_transform import PercentRankTransform
from . import presentation_component_models, decoy_strategies
__all__ = [
"PresentationModel",
"build_presentation_models",
"PercentRankTransform",
"presentation_component_models",
"decoy_strategies",
]
from .decoy_strategy import DecoyStrategy
from .same_transcripts_as_hits import SameTranscriptsAsHits
from .uniform_random import UniformRandom
__all__ = [
"DecoyStrategy",
"SameTranscriptsAsHits",
"UniformRandom",
]
import pandas
class DecoyStrategy(object):
"""
A mechanism for selecting decoys (non-hit peptides) given hits (
peptides detected via mass-spec).
Subclasses should override either decoys() or decoys_for_experiment().
Whichever one is not overriden is implemented using the other.
"""
def __init__(self):
pass
def decoys(self, hits_df):
"""
Given a df of hits with columns 'experiment_name' and 'peptide',
return a df with the same structure giving decoys.
Subclasses should override either this or decoys_for_experiment()
"""
assert 'experiment_name' in hits_df.columns
assert 'peptide' in hits_df.columns
assert len(hits_df) > 0
grouped = hits_df.groupby("experiment_name")
dfs = []
for (experiment_name, sub_df) in grouped:
decoys = self.decoys_for_experiment(
experiment_name,
sub_df.peptide.values)
df = pandas.DataFrame({
'peptide': decoys,
})
df["experiment_name"] = experiment_name
dfs.append(df)
return pandas.concat(dfs, ignore_index=True)
def decoys_for_experiment(self, experiment_name, hit_list):
"""
Return decoys for a single experiment.
Parameters
------------
experiment_name : string
hit_list : list of string
List of hits
"""
# prevent infinite recursion:
assert self.decoys is not DecoyStrategy.decoys
hits_df = pandas.DataFrame({'peptide': hit_list})
hits_df["experiment_name"] = experiment_name
return self.decoys(hits_df)
import numpy
from .decoy_strategy import DecoyStrategy
class SameTranscriptsAsHits(DecoyStrategy):
"""
Decoy strategy that selects decoys from the same transcripts the
hits come from. The transcript for each hit is taken to be the
transcript containing the hit with the the highest expression for
the given experiment.
Parameters
------------
experiment_to_expression_group : dict of string -> string
Maps experiment names to expression groups.
peptides_and_transcripts: pandas.DataFrame
Must have columns 'peptide' and 'transcript', index unimportant.
peptide_to_expression_group_to_transcript : pandas.DataFrame
Indexed by peptides, columns are expression groups. Values
give transcripts to use.
decoys_per_hit : int
"""
def __init__(
self,
experiment_to_expression_group,
peptides_and_transcripts,
peptide_to_expression_group_to_transcript,
decoys_per_hit=10):
DecoyStrategy.__init__(self)
assert decoys_per_hit > 0
self.experiment_to_expression_group = experiment_to_expression_group
self.peptides_and_transcripts = peptides_and_transcripts
self.peptide_to_expression_group_to_transcript = (
peptide_to_expression_group_to_transcript)
self.decoys_per_hit = decoys_per_hit
def decoys_for_experiment(self, experiment_name, hit_list):
assert len(hit_list) > 0, "No hits for %s" % experiment_name
expression_group = self.experiment_to_expression_group[experiment_name]
transcripts = self.peptide_to_expression_group_to_transcript.ix[
hit_list, expression_group
]
assert len(transcripts) > 0, experiment_name
universe = self.peptides_and_transcripts.ix[
self.peptides_and_transcripts.transcript.isin(transcripts) &
(~ self.peptides_and_transcripts.peptide.isin(hit_list))
].peptide.values
assert len(universe) > 0, experiment_name
return numpy.random.choice(
universe,
replace=True,
size=self.decoys_per_hit * len(hit_list))
import numpy
from .decoy_strategy import DecoyStrategy
class UniformRandom(DecoyStrategy):
"""
Decoy strategy that selects decoys randomly from a provided universe
of peptides.
"""
def __init__(self, all_peptides, decoys_per_hit=999):
DecoyStrategy.__init__(self)
self.all_peptides = set(all_peptides)
self.decoys_per_hit = decoys_per_hit
def decoys_for_experiment(self, experiment_name, hit_list):
decoy_pool = self.all_peptides.difference(set(hit_list))
return numpy.random.choice(
list(decoy_pool),
replace=True,
size=self.decoys_per_hit * len(hit_list))
import numpy
class PercentRankTransform(object):
"""
Transform arbitrary values into percent ranks.
"""
def __init__(self, n_bins=1e5):
self.n_bins = int(n_bins)
self.cdf = None
self.bin_edges = None
def fit(self, values):
"""
Fit the transform using the given values, which are used to
establish percentiles.
"""
assert self.cdf is None
assert self.bin_edges is None
assert len(values) > 0
(hist, self.bin_edges) = numpy.histogram(values, bins=self.n_bins)
self.cdf = numpy.ones(len(hist) + 3) * numpy.nan
self.cdf[0] = 0.0
self.cdf[1] = 0.0
self.cdf[-1] = 100.0
numpy.cumsum(hist * 100.0 / numpy.sum(hist), out=self.cdf[2:-1])
assert not numpy.isnan(self.cdf).any()
def transform(self, values):
"""
Return percent ranks (range [0, 100]) for the given values.
"""
assert self.cdf is not None
assert self.bin_edges is not None
indices = numpy.searchsorted(self.bin_edges, values)
result = self.cdf[indices]
assert len(result) == len(values)
return result
from .presentation_component_model import PresentationComponentModel
from .expression import Expression
from .mhcflurry_released import MHCflurryReleased
from .mhcflurry_trained_on_hits import MHCflurryTrainedOnHits
from .fixed_affinity_predictions import FixedAffinityPredictions
from .fixed_per_peptide_quantity import FixedPerPeptideQuantity
from .fixed_per_peptide_and_transcript_quantity import (
FixedPerPeptideAndTranscriptQuantity)
__all__ = [
"PresentationComponentModel",
"Expression",
"MHCflurryReleased",
"MHCflurryTrainedOnHits",
"FixedAffinityPredictions",
"FixedPerPeptideQuantity",
"FixedPerPeptideAndTranscriptQuantity",
]
from .presentation_component_model import PresentationComponentModel
from ...common import assert_no_null
class Expression(PresentationComponentModel):
"""
Model input for transcript expression.
Parameters
------------
experiment_to_expression_group : dict of string -> string
Maps experiment names to expression groups.
expression_values : pandas.DataFrame
Columns should be expression groups. Indices should be peptide.
"""
def __init__(
self, experiment_to_expression_group, expression_values, **kwargs):
PresentationComponentModel.__init__(self, **kwargs)
assert all(
group in expression_values.columns
for group in experiment_to_expression_group.values())
assert_no_null(expression_values)
self.experiment_to_expression_group = experiment_to_expression_group
self.expression_values = expression_values
def column_names(self):
return ["expression"]
def requires_fitting(self):
return False
def predict_for_experiment(self, experiment_name, peptides):
expression_group = self.experiment_to_expression_group[experiment_name]
return {
"expression": (
self.expression_values.ix[peptides, expression_group]
.values)
}
from .presentation_component_model import PresentationComponentModel
from ...common import assert_no_null
class FixedAffinityPredictions(PresentationComponentModel):
"""
Parameters
------------
experiment_to_alleles : dict: string -> string list
Normalized allele names for each experiment.
panel : pandas.Panel
Dimensions should be:
- "value", "percentile_rank" (IC50 and percent rank)
- peptide (string)
- allele (string)
name : string
Used to name output columns and in debug messages
"""
def __init__(
self,
experiment_to_alleles,
panel,
name='precomputed',
**kwargs):
PresentationComponentModel.__init__(self, **kwargs)
self.experiment_to_alleles = experiment_to_alleles
for key in panel.items:
assert_no_null(panel[key])
self.panel = panel
self.name = name
def column_names(self):
return [
"%s_affinity" % self.name,
"%s_percentile_rank" % self.name
]
def requires_fitting(self):
return False
def predict_min_across_alleles(self, alleles, peptides):
return {
("%s_affinity" % self.name): (
self.panel
.value[alleles]
.min(axis=1)
.ix[peptides].values),
("%s_percentile_rank" % self.name): (
self.panel
.percentile_rank[alleles]
.min(axis=1)
.ix[peptides].values)
}
def predict_for_experiment(self, experiment_name, peptides):
alleles = self.experiment_to_alleles[experiment_name]
return self.predict_min_across_alleles(alleles, peptides)
import logging
from .presentation_component_model import PresentationComponentModel
from ...common import assert_no_null
class FixedPerPeptideAndTranscriptQuantity(PresentationComponentModel):
"""
Model input for arbitrary fixed (i.e. not fitted) quantities that
depend only on the peptide and the transcript it comes from, which
is taken to be the most-expressed transcript in the experiment.
Motivating example: netChop cleavage predictions.
Parameters
------------
name : string
Name for this final model input. Used in debug messages.
experiment_to_expression_group : dict of string -> string
Maps experiment names to expression groups.
top_transcripts : pandas.DataFrame
Columns should be expression groups. Indices should be peptide. Values
should be transcript names.
df : pandas.DataFrame
Must have columns 'peptide' and 'transcript'. Remaining columns are
the values emitted by this model input.
"""
def __init__(
self,
name,
experiment_to_expression_group,
top_transcripts,
df,
**kwargs):
PresentationComponentModel.__init__(self, **kwargs)
self.name = name
self.experiment_to_expression_group = experiment_to_expression_group
self.top_transcripts = top_transcripts.copy()
self.df = df.drop_duplicates(['peptide', 'transcript'])
# This hack seems to be faster than using a multindex.
self.df.index = self.df.peptide.str.cat(self.df.transcript, sep=":")
del self.df["peptide"]
del self.df["transcript"]
assert_no_null(self.df)
df_set = set(self.df.index)
missing = set()
for expression_group in self.top_transcripts.columns:
self.top_transcripts[expression_group] = (
self.top_transcripts.index.str.cat(
self.top_transcripts[expression_group],
sep=":"))
missing.update(
set(self.top_transcripts[expression_group]).difference(df_set))
if missing:
logging.warn(
"%s: missing %d (peptide, transcript) pairs from df: %s" % (
self.name,
len(missing),
sorted(missing)[:1000]))
def column_names(self):
return list(self.df.columns)
def requires_fitting(self):
return False
def predict_for_experiment(self, experiment_name, peptides):
expression_group = self.experiment_to_expression_group[experiment_name]
indices = self.top_transcripts.ix[peptides, expression_group]
assert len(indices) == len(peptides)
sub_df = self.df.ix[indices]
assert len(sub_df) == len(peptides)
result = {}
for col in self.column_names():
result_series = sub_df[col]
num_nulls = result_series.isnull().sum()
if num_nulls > 0:
logging.warning("%s: mean-filling for %d nulls" % (
self.name, num_nulls))
result_series = result_series.fillna(self.df[col].mean())
result[col] = result_series.values
return result
from .presentation_component_model import PresentationComponentModel
from ...common import assert_no_null
class FixedPerPeptideQuantity(PresentationComponentModel):
"""
Model input for arbitrary fixed (i.e. not fitted) quantities that
depend only on the peptide. Motivating example: Mike's cleavage
predictions.
Parameters
------------
name : string
Name for this final model input. Used in debug messages.
df : pandas.DataFrame
index must be named 'peptide'. The columns of the dataframe are
the columns emitted by this final modle input.
"""
def __init__(self, name, df, **kwargs):
PresentationComponentModel.__init__(self, **kwargs)
self.name = name
assert df.index.name == "peptide"
assert_no_null(df)
self.df = df
def column_names(self):
return list(self.df.columns)
def requires_fitting(self):
return False
def predict(self, peptides_df):
sub_df = self.df.ix[peptides_df.peptide]
return dict(
(col, sub_df[col].values)
for col in self.column_names())
import logging
import pandas
from numpy import array
from ...common import dataframe_cryptographic_hash
from .presentation_component_model import PresentationComponentModel
from ..decoy_strategies import SameTranscriptsAsHits
from ..percent_rank_transform import PercentRankTransform
class MHCBindingComponentModelBase(PresentationComponentModel):
"""
Base class for single-allele MHC binding predictors.
Parameters
------------
predictor_name : string
used on column name. Example: 'vanilla'
experiment_to_alleles : dict: string -> string list
Normalized allele names for each experiment.
experiment_to_expression_group : dict of string -> string
Maps experiment names to expression groups.
transcripts : pandas.DataFrame
Index is peptide, columns are expression groups, values are
which transcript to use for the given peptide.
Not required if decoy_strategy specified.
peptides_and_transcripts : pandas.DataFrame
Dataframe with columns 'peptide' and 'transcript'
Not required if decoy_strategy specified.
decoy_strategy : decoy_strategy.DecoyStrategy
how to pick decoys. If not specified peptides_and_transcripts and
transcripts must be specified.
fallback_predictor : function: (allele, peptides) -> predictions
Used when missing an allele.
iedb_dataset : mhcflurry.AffinityMeasurementDataset
IEDB data for this allele. If not specified no iedb data is used.
decoys_per_hit : int
random_peptides_for_percent_rank : list of string
If specified, then percentile rank will be calibrated and emitted
using the given peptides.
**kwargs : dict
passed to PresentationComponentModel()
"""
def __init__(
self,
predictor_name,
experiment_to_alleles,
experiment_to_expression_group=None,
transcripts=None,
peptides_and_transcripts=None,
decoy_strategy=None,
fallback_predictor=None,
iedb_dataset=None,
decoys_per_hit=10,
random_peptides_for_percent_rank=None,
**kwargs):
PresentationComponentModel.__init__(self, **kwargs)
self.predictor_name = predictor_name
self.experiment_to_alleles = experiment_to_alleles
self.fallback_predictor = fallback_predictor
self.iedb_dataset = iedb_dataset
self.fit_alleles = set()
if decoy_strategy is None:
assert peptides_and_transcripts is not None
assert transcripts is not None
self.decoy_strategy = SameTranscriptsAsHits(
experiment_to_expression_group=experiment_to_expression_group,
peptides_and_transcripts=peptides_and_transcripts,
peptide_to_expression_group_to_transcript=transcripts,
decoys_per_hit=decoys_per_hit)
else:
self.decoy_strategy = decoy_strategy
if random_peptides_for_percent_rank is None:
self.percent_rank_transforms = None
self.random_peptides_for_percent_rank = None
else:
self.percent_rank_transforms = {}
self.random_peptides_for_percent_rank = array(
random_peptides_for_percent_rank)
def stratification_groups(self, hits_df):
return [
self.experiment_to_alleles[e][0]
for e in hits_df.experiment_name
]
def column_name_value(self):
return "%s_value" % self.predictor_name
def column_name_percentile_rank(self):
return "%s_percentile_rank" % self.predictor_name
def column_names(self):
columns = [self.column_name_value()]
if self.percent_rank_transforms is not None:
columns.append(self.column_name_percentile_rank())
return columns
def requires_fitting(self):
return True
def fit_percentile_rank_if_needed(self, alleles):
for allele in alleles:
if allele not in self.percent_rank_transforms:
logging.info('fitting percent rank for allele: %s' % allele)
self.percent_rank_transforms[allele] = PercentRankTransform()
self.percent_rank_transforms[allele].fit(
self.predict_affinity_for_allele(
allele,
self.random_peptides_for_percent_rank))
def fit(self, hits_df):
assert 'experiment_name' in hits_df.columns
assert 'peptide' in hits_df.columns
if 'hit' in hits_df.columns:
assert (hits_df.hit == 1).all()
grouped = hits_df.groupby("experiment_name")
for (experiment_name, sub_df) in grouped:
self.fit_to_experiment(experiment_name, sub_df.peptide.values)
# No longer required after fitting.
self.decoy_strategy = None
self.iedb_dataset = None
def fit_allele(self, allele, hit_list, decoys_list):
raise NotImplementedError()
def predict_allele(self, allele, peptide_list):
raise NotImplementedError()
def supports_predicting_allele(self, allele):
raise NotImplementedError()
def fit_to_experiment(self, experiment_name, hit_list):
assert len(hit_list) > 0
alleles = self.experiment_to_alleles[experiment_name]
if len(alleles) != 1:
raise ValueError("Monoallelic data required")
(allele,) = alleles
decoys = self.decoy_strategy.decoys_for_experiment(
experiment_name, hit_list)
self.fit_allele(allele, hit_list, decoys)
self.fit_alleles.add(allele)
def predict_affinity_for_allele(self, allele, peptides):
if self.cached_predictions is None:
cache_key = None
cached_result = None
else:
cache_key = (
allele,
dataframe_cryptographic_hash(pandas.Series(peptides)))
cached_result = self.cached_predictions.get(cache_key)
if cached_result is not None:
print("Cache hit in predict_affinity_for_allele: %s %s %s" % (
allele, str(self), id(cached_result)))
return cached_result
else:
print("Cache miss in predict_affinity_for_allele: %s %s" % (
allele, str(self)))
if self.supports_predicting_allele(allele):
result = self.predict_allele(allele, peptides)
elif self.fallback_predictor:
print("Falling back on allee %s" % allele)
result = self.fallback_predictor(allele, peptides)
else:
raise ValueError("No model for allele: %s" % allele)
if self.cached_predictions is not None:
self.cached_predictions[cache_key] = result
return result
def predict_for_experiment(self, experiment_name, peptides):
peptides_deduped = pandas.unique(peptides)
print(len(peptides_deduped))
alleles = self.experiment_to_alleles[experiment_name]
predictions = pandas.DataFrame(index=peptides_deduped)
for allele in alleles:
predictions[allele] = self.predict_affinity_for_allele(
allele, peptides_deduped)
result = {
self.column_name_value(): (
predictions.min(axis=1).ix[peptides].values)
}
if self.percent_rank_transforms is not None:
self.fit_percentile_rank_if_needed(alleles)
percentile_ranks = pandas.DataFrame(index=peptides_deduped)
for allele in alleles:
percentile_ranks[allele] = (
self.percent_rank_transforms[allele]
.transform(predictions[allele].values))
result[self.column_name_percentile_rank()] = (
percentile_ranks.min(axis=1).ix[peptides].values)
assert all(len(x) == len(peptides) for x in result.values()), (
"Result lengths don't match peptide lengths. peptides=%d, "
"peptides_deduped=%d, %s" % (
len(peptides),
len(peptides_deduped),
", ".join(
"%s=%d" % (key, len(value))
for (key, value) in result.items())))
return result
import logging
import numpy
import pandas
from mhcnames import normalize_allele_name
from ..percent_rank_transform import PercentRankTransform
from ...encodable_sequences import EncodableSequences
from .presentation_component_model import PresentationComponentModel
from ...class1_affinity_prediction.class1_affinity_predictor import (
Class1AffinityPredictor)
class MHCflurryReleased(PresentationComponentModel):
"""
Final model input that uses the standard downloaded MHCflurry models.
Parameters
------------
experiment_to_alleles : dict: string -> string list
Normalized allele names for each experiment.
random_peptides_for_percent_rank : list of string
If specified, then percentile rank will be calibrated and emitted
using the given peptides.
predictor : Class1EnsembleMultiAllelePredictor-like object
Predictor to use.
"""
def __init__(
self,
experiment_to_alleles,
random_peptides_for_percent_rank=None,
predictor=None,
predictor_name="mhcflurry_released",
**kwargs):
PresentationComponentModel.__init__(self, **kwargs)
self.experiment_to_alleles = experiment_to_alleles
if predictor is None:
predictor = Class1AffinityPredictor.load()
self.predictor = predictor
self.predictor_name = predictor_name
if random_peptides_for_percent_rank is None:
self.percent_rank_transforms = None
self.random_peptides_for_percent_rank = None
else:
self.percent_rank_transforms = {}
self.random_peptides_for_percent_rank = numpy.array(
random_peptides_for_percent_rank)
def column_names(self):
columns = [self.predictor_name + '_affinity']
if self.percent_rank_transforms is not None:
columns.append(self.predictor_name + '_percentile_rank')
return columns
def requires_fitting(self):
return False
def fit_percentile_rank_if_needed(self, alleles):
for allele in alleles:
if allele not in self.percent_rank_transforms:
logging.info('fitting percent rank for allele: %s' % allele)
self.percent_rank_transforms[allele] = PercentRankTransform()
self.percent_rank_transforms[allele].fit(
self.predictor.predict(
allele=allele,
peptides=self.random_peptides_for_percent_rank))
def predict_min_across_alleles(self, alleles, peptides):
alleles = list(set([
normalize_allele_name(allele)
for allele in alleles
]))
peptides = EncodableSequences.create(peptides)
df = pandas.DataFrame()
df["peptide"] = peptides.sequences
for allele in alleles:
df[allele] = self.predictor.predict(peptides, allele=allele)
result = {
self.predictor_name + '_affinity': (
df[list(df.columns)[1:]].min(axis=1))
}
if self.percent_rank_transforms is not None:
self.fit_percentile_rank_if_needed(alleles)
percentile_ranks = pandas.DataFrame(index=df.index)
for allele in alleles:
percentile_ranks[allele] = (
self.percent_rank_transforms[allele]
.transform(df[allele].values))
result[self.predictor_name + '_percentile_rank'] = (
percentile_ranks.min(axis=1).values)
for (key, value) in result.items():
assert len(value) == len(peptides), (len(peptides), result)
return result
def predict_for_experiment(self, experiment_name, peptides):
alleles = self.experiment_to_alleles[experiment_name]
return self.predict_min_across_alleles(alleles, peptides)
from copy import copy
import pandas
from numpy import log, exp, nanmean
from ...class1_affinity_prediction import Class1AffinityPredictor
from mhcnames import normalize_allele_name
from .mhc_binding_component_model_base import MHCBindingComponentModelBase
class MHCflurryTrainedOnHits(MHCBindingComponentModelBase):
"""
Final model input that is a mhcflurry predictor trained on mass-spec
hits and, optionally, affinity measurements (for example from IEDB).
Parameters
------------
mhcflurry_hyperparameters : dict
hit_affinity : float
nM affinity to use for hits
decoy_affinity : float
nM affinity to use for decoys
**kwargs : dict
Passed to MHCBindingComponentModel()
"""
def __init__(
self,
mhcflurry_hyperparameters={},
hit_affinity=100,
decoy_affinity=20000,
ensemble_size=1,
**kwargs):
MHCBindingComponentModelBase.__init__(self, **kwargs)
self.mhcflurry_hyperparameters = mhcflurry_hyperparameters
self.hit_affinity = hit_affinity
self.decoy_affinity = decoy_affinity
self.ensemble_size = ensemble_size
self.predictor = Class1AffinityPredictor()
def combine_ensemble_predictions(self, column_name, values):
# Geometric mean
return exp(nanmean(log(values), axis=1))
def supports_predicting_allele(self, allele):
return allele in self.predictor.supported_alleles
def fit_allele(self, allele, hit_list, decoys_list):
allele = normalize_allele_name(allele)
hit_list = set(hit_list)
df = pandas.DataFrame({
"peptide": sorted(set(hit_list).union(decoys_list))
})
df["allele"] = allele
df["species"] = "human"
df["affinity"] = ((
~df.peptide.isin(hit_list))
.astype(float) * (
self.decoy_affinity - self.hit_affinity) + self.hit_affinity)
df["sample_weight"] = 1.0
df["peptide_length"] = 9
self.predictor.fit_allele_specific_predictors(
n_models=self.ensemble_size,
architecture_hyperparameters=self.mhcflurry_hyperparameters,
allele=allele,
peptides=df.peptide.values,
affinities=df.affinity.values,
)
def predict_allele(self, allele, peptides_list):
return self.predictor.predict(peptides=peptides_list, allele=allele)
def get_fit(self):
return {
'model': 'MHCflurryTrainedOnMassSpec',
'predictor': self.predictor,
}
def restore_fit(self, fit_info):
fit_info = dict(fit_info)
self.predictor = fit_info.pop('predictor')
model = fit_info.pop('model')
assert model == 'MHCflurryTrainedOnMassSpec', model
assert not fit_info, "Extra info in fit: %s" % str(fit_info)
def clone(self):
result = copy(self)
result.reset_cache()
result.predictor = copy(result.predictor)
return result
import weakref
from copy import copy
import numpy
import pandas
from ...common import (
dataframe_cryptographic_hash, assert_no_null, freeze_object)
def cache_dict_for_policy(policy):
if policy == "weak":
return weakref.WeakValueDictionary()
elif policy == "strong":
return {}
elif policy == "none":
return None
else:
raise ValueError("Unsupported cache policy: %s" % policy)
class PresentationComponentModel(object):
'''
Base class for component models to a presentation model.
The component models are things like mhc binding affinity and cleavage,
and the presentation model is typically a logistic regression model
over these.
'''
def __init__(
self, fit_cache_policy="weak", predictions_cache_policy="weak"):
self.fit_cache_policy = fit_cache_policy
self.predictions_cache_policy = predictions_cache_policy
self.reset_cache()
def reset_cache(self):
self.cached_fits = cache_dict_for_policy(self.fit_cache_policy)
self.cached_predictions = cache_dict_for_policy(
self.predictions_cache_policy)
def __getstate__(self):
d = dict(self.__dict__)
d["cached_fits"] = None
d["cached_predictions"] = None
return d
def __setstate__(self, state):
self.__dict__.update(state)
self.reset_cache()
def combine_ensemble_predictions(self, column_name, values):
return numpy.nanmean(values, axis=1)
def stratification_groups(self, hits_df):
return hits_df.experiment_name
def column_names(self):
"""
Names for the values this final model input emits.
Some final model inputs emit multiple related quantities, such as
"binding affinity" and "binding percentile rank".
"""
raise NotImplementedError(str(self))
def requires_fitting(self):
"""
Does this model require fitting to mass-spec data?
For example, the 'expression' componenet models don't need to be
fit, but some cleavage predictors and binding predictors can be
trained on the ms data.
"""
raise NotImplementedError(str(self))
def clone_and_fit(self, hits_df):
"""
Clone the object and fit to given dataset with a weakref cache.
"""
if not self.requires_fitting():
return self
if self.cached_fits is None:
key = None
result = None
else:
key = dataframe_cryptographic_hash(
hits_df[["experiment_name", "peptide"]])
result = self.cached_fits.get(key)
if result is None:
print("Cache miss in clone_and_fit: %s" % str(self))
result = self.clone()
result.fit(hits_df)
if self.cached_fits is not None:
self.cached_fits[key] = result
else:
print("Cache hit in clone_and_fit: %s" % str(self))
return result
def clone_and_restore_fit(self, fit_info):
if not self.requires_fitting():
assert fit_info is None
return self
if self.cached_fits is None:
key = None
result = None
else:
key = freeze_object(fit_info)
result = self.cached_fits.get(key)
if result is None:
print("Cache miss in clone_and_restore_fit: %s" % str(self))
result = self.clone()
result.restore_fit(fit_info)
if self.cached_fits is not None:
self.cached_fits[key] = result
else:
print("Cache hit in clone_and_restore_fit: %s" % str(self))
return result
def fit(self, hits_df):
"""
Train the model.
Parameters
-----------
hits_df : pandas.DataFrame
dataframe of hits with columns 'experiment_name' and 'peptide'
"""
if self.requires_fitting():
raise NotImplementedError(str(self))
def predict_for_experiment(self, experiment_name, peptides):
"""
A more convenient prediction method to implement.
Subclasses should override this method or predict().
Returns
------------
A dict of column name -> list of predictions for each peptide
"""
assert self.predict != PresentationComponentModel.predict, (
"Must override predict_for_experiment() or predict()")
peptides_df = pandas.DataFrame({
'peptide': peptides,
})
peptides_df["experiment_name"] = experiment_name
return self.predict(peptides_df)
def predict(self, peptides_df):
"""
Subclasses can override either this or predict_for_experiment.
This is the high-level predict method that users should call.
This convenience method groups the peptides_df by experiment
and calls predict_for_experiment on each experiment.
"""
assert (
self.predict_for_experiment !=
PresentationComponentModel.predict_for_experiment)
assert 'experiment_name' in peptides_df.columns
assert 'peptide' in peptides_df.columns
if self.cached_predictions is None:
cache_key = None
cached_result = None
else:
cache_key = dataframe_cryptographic_hash(peptides_df)
cached_result = self.cached_predictions.get(cache_key)
if cached_result is not None:
print("Cache hit in predict: %s" % str(self))
return cached_result
else:
print("Cache miss in predict: %s" % str(self))
grouped = peptides_df.groupby("experiment_name")
if len(grouped) == 1:
print("%s : using single-experiment predict optimization" % (
str(self)))
return_value = pandas.DataFrame(
self.predict_for_experiment(
str(peptides_df.iloc[0].experiment_name),
peptides_df.peptide.values))
assert len(return_value) == len(peptides_df), (
"%d != %d" % (len(return_value), len(peptides_df)),
str(self),
peptides_df.peptide.nunique(),
return_value,
peptides_df)
assert_no_null(return_value, str(self))
else:
peptides_df = (
peptides_df[["experiment_name", "peptide"]]
.reset_index(drop=True))
columns = self.column_names()
result_df = peptides_df.copy()
for col in columns:
result_df[col] = numpy.nan
for (experiment_name, sub_df) in grouped:
assert (
result_df.loc[sub_df.index, "experiment_name"] ==
experiment_name).all()
unique_peptides = list(set(sub_df.peptide))
if len(unique_peptides) == 0:
continue
result_dict = self.predict_for_experiment(
experiment_name, unique_peptides)
for col in columns:
assert len(result_dict[col]) == len(unique_peptides), (
"Final model input %s: wrong number of predictions "
"%d (expected %d) for col %s:\n%s\n"
"Input was: experiment: %s, peptides:\n%s" % (
str(self),
len(result_dict[col]),
len(unique_peptides),
col,
result_dict[col],
experiment_name,
unique_peptides))
prediction_series = pandas.Series(
result_dict[col],
index=unique_peptides)
prediction_values = (
prediction_series.ix[sub_df.peptide.values]).values
result_df.loc[
sub_df.index, col
] = prediction_values
assert len(result_df) == len(peptides_df), "%s != %s" % (
len(result_df),
len(peptides_df))
return_value = result_df[columns]
if self.cached_predictions is not None:
self.cached_predictions[cache_key] = return_value
return dict(
(col, return_value[col].values) for col in self.column_names())
def clone(self):
"""
Copy this object so that the original and copy can be fit
independently.
"""
if self.requires_fitting():
# shallow copy won't work here, subclass must override.
raise NotImplementedError(str(self))
result = copy(self)
# We do not want to share a cache with the clone.
result.reset_cache()
return result
def get_fit(self):
if self.requires_fitting():
raise NotImplementedError(str(self))
return None
def restore_fit(self, fit_info):
if self.requires_fitting():
raise NotImplementedError(str(self))
assert fit_info is None, (str(self), str(fit_info))
import collections
import time
from copy import copy
import logging
import pandas
import numpy
from sklearn.base import clone
from sklearn.model_selection import StratifiedKFold
from sklearn.linear_model import LogisticRegression
from ..common import assert_no_null, drop_nulls_and_warn
def build_presentation_models(term_dict, formulas, **kwargs):
"""
Convenience function for creating multiple final models based on
shared terms.
Parameters
------------
term_dict : dict of string -> (
list of PresentationComponentModel,
list of string)
Terms are named with arbitrary strings (e.g. "A_ms") and are
associated with some presentation component models and some
expressions (e.g. ["log(affinity_percentile_rank + .001)"]).
formulas : list of string
A formula is a string containing terms separated by "+". For example:
"A_ms + A_cleavage + A_expression".
**kwargs : dict
Passed to PresentationModel constructor
Returns
------------
dict of string -> PresentationModel
The keys of the result dict are formulas, and the values are (untrained)
PresentationModel instances.
"""
result = collections.OrderedDict()
for formula in formulas:
term_names = [x.strip() for x in formula.split("+")]
inputs = []
expressions = []
for name in term_names:
(term_inputs, term_expressions) = term_dict[name]
inputs.extend(term_inputs)
expressions.extend(term_expressions)
assert len(set(expressions)) == len(expressions), expressions
presentation_model = PresentationModel(
inputs,
expressions,
**kwargs)
result[formula] = presentation_model
return result
class PresentationModel(object):
"""
A predictor for whether a peptide is detected via mass-spec. Uses
"final model inputs" (e.g. expression, cleavage, mhc affinity) which
themselves may need to be fit.
Parameters
------------
component_models : list of PresentationComponentModel
feature_expressions : list of string
Expressions to use to generate features for the final model based
on the columns generated by the final model inputs.
Example: ["log(expression + .01)"]
decoy_strategy : DecoyStrategy
Decoy strategy to use for training the final model. (The final
model inputs handle their own decoys.)
random-state : int
Random state to use for picking cross validation folds. We are
careful to be deterministic here (i.e. same folds used if the
random state is the same) because we want to have cache hits
for final model inputs that are being used more than once in
multiple final models fit to the same data.
ensemble_size : int
If specified, train an ensemble of each final model input, and use
the out-of-bag predictors to generate predictions to fit the final
model. If not specified (default), a two-fold fit is used.
"""
def __init__(
self,
component_models,
feature_expressions,
decoy_strategy,
predictor=LogisticRegression(),
random_state=0,
ensemble_size=None):
columns = set()
self.component_models_require_fitting = False
for component_model in component_models:
model_cols = component_model.column_names()
assert not columns.intersection(model_cols), model_cols
columns.update(model_cols)
if component_model.requires_fitting():
self.component_models_require_fitting = True
self.component_models = component_models
self.ensemble_size = ensemble_size
self.feature_expressions = feature_expressions
self.decoy_strategy = decoy_strategy
self.random_state = random_state
self.predictor = predictor
self.trained_component_models = None
self.presentation_models_predictors = None
self.fit_experiments = None
@property
def has_been_fit(self):
return self.fit_experiments is not None
def clone(self):
return copy(self)
def reset_cache(self):
for model in self.component_models:
model.reset_cache()
if self.trained_component_models is not None:
for models in self.trained_component_models:
for ensemble_group in models:
for model in ensemble_group:
model.reset_cache()
def fit(self, hits_df):
"""
Train the final model and its inputs (if necessary).
Parameters
-----------
hits_df : pandas.DataFrame
dataframe of hits with columns 'experiment_name' and 'peptide'
"""
start = time.time()
assert not self.has_been_fit
assert 'experiment_name' in hits_df.columns
assert 'peptide' in hits_df.columns
assert self.trained_component_models is None
assert self.presentation_models_predictors is None
hits_df = hits_df.reset_index(drop=True).copy()
self.fit_experiments = set(hits_df.experiment_name.unique())
if self.component_models_require_fitting and not self.ensemble_size:
# Use two fold CV to train model inputs then final models.
# In this strategy, we fit the component models on half the data,
# and train the final predictor (usually logistic regression) on
# the other half. We do this twice to end up with two final.
# At prediction time, the results of these predictors are averaged.
cv = StratifiedKFold(
n_splits=2, shuffle=True, random_state=self.random_state)
self.trained_component_models = []
self.presentation_models_predictors = []
fold_num = 1
for (fold1, fold2) in cv.split(hits_df, hits_df.experiment_name):
print("Two fold fit: fitting fold %d" % fold_num)
fold_num += 1
assert len(fold1) > 0
assert len(fold2) > 0
model_input_training_hits_df = hits_df.iloc[fold1]
hits_and_decoys_df = make_hits_and_decoys_df(
hits_df.iloc[fold2],
self.decoy_strategy)
self.trained_component_models.append([])
for sub_model in self.component_models:
sub_model = sub_model.clone_and_fit(
model_input_training_hits_df)
self.trained_component_models[-1].append((sub_model,))
predictions = sub_model.predict(hits_and_decoys_df)
for (col, values) in predictions.items():
hits_and_decoys_df[col] = values
final_predictor = self.fit_final_predictor(hits_and_decoys_df)
self.presentation_models_predictors.append(final_predictor)
else:
# Use an ensemble of component predictors. Each component model is
# trained on a random half of the data (self.ensemble_size folds
# in total). Predictions are generated using the out of bag
# predictors. A single final model predictor is trained.
if self.component_models_require_fitting:
print("Using ensemble fit, ensemble size: %d" % (
self.ensemble_size))
else:
print("Using single fold fit.")
component_model_index_to_stratification_groups = []
stratification_groups_to_ensemble_folds = {}
for (i, component_model) in enumerate(self.component_models):
if component_model.requires_fitting():
stratification_groups = tuple(
component_model.stratification_groups(hits_df))
component_model_index_to_stratification_groups.append(
stratification_groups)
stratification_groups_to_ensemble_folds[
stratification_groups
] = []
for (i, (stratification_groups, ensemble_folds)) in enumerate(
stratification_groups_to_ensemble_folds.items()):
print("Preparing folds for stratification group %d / %d" % (
i + 1, len(stratification_groups_to_ensemble_folds)))
while len(ensemble_folds) < self.ensemble_size:
cv = StratifiedKFold(
n_splits=2,
shuffle=True,
random_state=self.random_state + len(ensemble_folds))
for (indices, _) in cv.split(
hits_df, stratification_groups):
ensemble_folds.append(indices)
# We may have one extra fold.
if len(ensemble_folds) == self.ensemble_size + 1:
ensemble_folds.pop()
def fit_and_predict_component(model, fit_df, predict_df):
assert component_model.requires_fitting()
model = component_model.clone_and_fit(fit_df)
predictions = model.predict(predict_df)
return (model, predictions)
# Note: we depend on hits coming before decoys here, so that
# indices into hits_df are also indices into hits_and_decoys_df.
hits_and_decoys_df = make_hits_and_decoys_df(
hits_df, self.decoy_strategy)
self.trained_component_models = [[]]
for (i, component_model) in enumerate(self.component_models):
if component_model.requires_fitting():
print("Training component model %d / %d: %s" % (
i + 1, len(self.component_models), component_model))
stratification_groups = (
component_model_index_to_stratification_groups[i])
ensemble_folds = stratification_groups_to_ensemble_folds[
stratification_groups
]
(models, predictions) = train_and_predict_ensemble(
component_model,
hits_and_decoys_df,
ensemble_folds)
else:
models = (component_model,)
predictions = component_model.predict(hits_and_decoys_df)
self.trained_component_models[0].append(models)
for (col, values) in predictions.items():
hits_and_decoys_df[col] = values
final_predictor = self.fit_final_predictor(hits_and_decoys_df)
self.presentation_models_predictors = [final_predictor]
assert len(self.presentation_models_predictors) == \
len(self.trained_component_models)
for models_group in self.trained_component_models:
assert isinstance(models_group, list)
assert len(models_group) == len(self.component_models)
assert all(
isinstance(ensemble_group, tuple)
for ensemble_group in models_group)
print("Fit final model in %0.1f sec." % (time.time() - start))
# Decoy strategy is no longer required after fitting.
self.decoy_strategy = None
def fit_final_predictor(
self, hits_and_decoys_with_component_predictions_df):
"""
Private helper method.
"""
(x, y) = self.make_features_and_target(
hits_and_decoys_with_component_predictions_df)
print("Training final model predictor on data of shape %s" % (
str(x.shape)))
final_predictor = clone(self.predictor)
final_predictor.fit(x.values, y.values)
return final_predictor
def evaluate_expressions(self, input_df):
result = pandas.DataFrame()
for expression in self.feature_expressions:
# We use numpy module as globals here so math functions
# like log, log1p, exp, are in scope.
try:
values = eval(expression, numpy.__dict__, input_df)
except SyntaxError:
logging.error("Syntax error in expression: %s" % expression)
raise
assert len(values) == len(input_df), expression
if hasattr(values, 'values'):
values = values.values
series = pandas.Series(values)
assert_no_null(series, expression)
result[expression] = series
assert len(result) == len(input_df)
return result
def make_features_and_target(self, hits_and_decoys_df):
"""
Private helper method.
"""
assert 'peptide' in hits_and_decoys_df
assert 'hit' in hits_and_decoys_df
df = self.evaluate_expressions(hits_and_decoys_df)
df['hit'] = hits_and_decoys_df.hit.values
new_df = drop_nulls_and_warn(df, hits_and_decoys_df)
y = new_df["hit"]
del new_df["hit"]
return (new_df, y)
def predict_to_df(self, peptides_df):
"""
Predict for the given peptides_df, which should have columns
'experiment_name' and 'peptide'.
Returns a dataframe giving the predictions. If this final
model's inputs required fitting and therefore the final model
has two predictors trained each fold, the resulting dataframe
will have predictions for both final model predictors.
"""
assert self.has_been_fit
assert 'experiment_name' in peptides_df.columns
assert 'peptide' in peptides_df.columns
assert len(self.presentation_models_predictors) == \
len(self.trained_component_models)
prediction_cols = []
presentation_model_predictions = {}
zipped = enumerate(
zip(
self.trained_component_models,
self.presentation_models_predictors))
for (i, (component_models, presentation_model_predictor)) in zipped:
df = pandas.DataFrame()
for ensemble_models in component_models:
start_t = time.time()
predictions = ensemble_predictions(
ensemble_models, peptides_df)
print(
"Component '%s' (ensemble size=%d) generated %d "
"predictions in %0.2f sec." % (
ensemble_models[0],
len(ensemble_models),
len(peptides_df),
(time.time() - start_t)))
for (col, values) in predictions.items():
values = pandas.Series(values)
assert_no_null(values)
df[col] = values
x_df = self.evaluate_expressions(df)
assert_no_null(x_df)
prediction_col = "Prediction (Model %d)" % (i + 1)
assert prediction_col not in presentation_model_predictions
presentation_model_predictions[prediction_col] = (
presentation_model_predictor
.predict_proba(x_df.values)[:, 1])
prediction_cols.append(prediction_col)
if len(prediction_cols) == 1:
presentation_model_predictions["Prediction"] = (
presentation_model_predictions[prediction_cols[0]])
del presentation_model_predictions[prediction_cols[0]]
else:
presentation_model_predictions["Prediction"] = numpy.mean(
[
presentation_model_predictions[col]
for col in prediction_cols
],
axis=0)
return pandas.DataFrame(presentation_model_predictions)
def predict(self, peptides_df):
"""
Predict for the given peptides_df, which should have columns
'experiment_name' and 'peptide'.
Returns an array of floats giving the predictions for each
row in peptides_df. If the final model was trained in two
folds, the predictions from the two final model predictors
are averaged.
"""
assert self.has_been_fit
df = self.predict_to_df(peptides_df)
return df.Prediction.values
def score_from_peptides_df(
self, peptides_df, include_hit_indices=True):
"""
Given a DataFrame with columns 'peptide', 'experiment_name', and
'hit', calculate the PPV score. Return a dict of scoring info.
If include_hit_indices is True (default), then the indices the
hits occur in after sorting by prediction score, is also returned.
The top predicted peptide will have index 0.
"""
assert self.has_been_fit
assert 'peptide' in peptides_df.columns
assert 'experiment_name' in peptides_df.columns
assert 'hit' in peptides_df.columns
peptides_df = peptides_df.copy()
peptides_df["prediction"] = self.predict(peptides_df)
top_n = float(peptides_df.hit.sum())
if not include_hit_indices:
top = peptides_df.nlargest(top_n, "prediction")
result = {
'score': top.hit.mean()
}
else:
ranks = peptides_df.prediction.rank(ascending=False)
hit_indices = ranks[peptides_df.hit > 0].values
hit_lengths = peptides_df.peptide[
peptides_df.hit > 0
].str.len().values
result = {
'hit_indices': hit_indices,
'hit_lengths': hit_lengths,
'total_peptides': len(peptides_df),
}
result['score'] = (
numpy.sum(result['hit_indices'] <= top_n) / top_n)
return result
def score_from_hits_and_decoy_strategy(self, hits_df, decoy_strategy):
"""
Compute positive predictive value on the given hits_df.
Parameters
-----------
hits_df : pandas.DataFrame
dataframe of hits with columns 'experiment_name' and 'peptide'
decoy_strategy : DecoyStrategy
Strategy for selecting decoys
Returns
-----------
dict of scoring info, with keys 'score', 'hit_indices', and
'total_peptides'
"""
assert self.has_been_fit
peptides_df = make_hits_and_decoys_df(
hits_df,
decoy_strategy)
return self.score_from_peptides_df(peptides_df)
def get_fit(self):
"""
Return fit (i.e. trained) parameters.
"""
assert self.has_been_fit
result = {
'trained_component_model_fits': [],
'presentation_models_predictors': (
self.presentation_models_predictors),
'fit_experiments': self.fit_experiments,
'feature_expressions': self.feature_expressions,
}
for final_predictor_models_group in self.trained_component_models:
fits = []
for ensemble_group in final_predictor_models_group:
fits.append(tuple(model.get_fit() for model in ensemble_group))
result['trained_component_model_fits'].append(fits)
return result
def restore_fit(self, fit):
"""
Restore fit parameters.
Parameters
------------
fit : object
What was returned from a call to get_fit().
"""
assert not self.has_been_fit
fit = dict(fit)
self.presentation_models_predictors = (
fit.pop('presentation_models_predictors'))
self.fit_experiments = fit.pop('fit_experiments')
model_input_fits = fit.pop('trained_component_model_fits')
feature_expressions = fit.pop('feature_expressions', [])
if feature_expressions != self.feature_expressions:
logging.warn(
"Feature expressions restored from fit: '%s' do not match "
"those of this PresentationModel: '%s'" % (
feature_expressions, self.feature_expressions))
assert not fit, "Unhandled data in fit: %s" % fit
assert (
len(model_input_fits) == len(self.presentation_models_predictors))
self.trained_component_models = []
for model_input_fits_for_fold in model_input_fits:
self.trained_component_models.append([])
for (sub_model, sub_model_fits) in zip(
self.component_models,
model_input_fits_for_fold):
restored_models = tuple(
sub_model.clone_and_restore_fit(sub_model_fit)
for sub_model_fit in sub_model_fits)
self.trained_component_models[-1].append(restored_models)
def make_hits_and_decoys_df(hits_df, decoy_strategy):
"""
Given some hits (with columns 'experiment_name' and 'peptide'),
and a decoy strategy, return a "peptides_df", which has columns
'experiment_name', 'peptide', and 'hit.'
"""
hits_df = hits_df.copy()
hits_df["hit"] = 1
decoys_df = decoy_strategy.decoys(hits_df)
decoys_df["hit"] = 0
peptides_df = pandas.concat(
[hits_df, decoys_df],
ignore_index=True)
return peptides_df
# TODO: paralellize this.
def train_and_predict_ensemble(model, peptides_df, ensemble_folds):
assert model.requires_fitting()
fit_models = tuple(
model.clone_and_fit(peptides_df.iloc[indices])
for indices in ensemble_folds)
return (
fit_models,
ensemble_predictions(fit_models, peptides_df, ensemble_folds))
def ensemble_predictions(models, peptides_df, mask_indices_list=None):
typical_model = models[0]
panel = pandas.Panel(
items=numpy.arange(len(models)),
major_axis=peptides_df.index,
minor_axis=typical_model.column_names(),
dtype=numpy.float32)
for (i, model) in enumerate(models):
predictions = model.predict(peptides_df)
for (key, values) in predictions.items():
panel.loc[i, :, key] = values
if mask_indices_list is not None:
for (i, indices) in enumerate(mask_indices_list):
panel.iloc[i, indices] = numpy.nan
result = {}
for col in typical_model.column_names():
values = panel.ix[:, :, col]
assert values.shape == (len(peptides_df), len(models))
result[col] = model.combine_ensemble_predictions(col, values.values)
assert_no_null(pandas.Series(result[col]))
return result
......@@ -97,8 +97,5 @@ if __name__ == '__main__':
packages=[
'mhcflurry',
'mhcflurry.class1_affinity_prediction',
'mhcflurry.antigen_presentation',
'mhcflurry.antigen_presentation.decoy_strategies',
'mhcflurry.antigen_presentation.presentation_component_models',
],
)
import pickle
from nose.tools import eq_, assert_less
import numpy
from numpy.testing import assert_allclose
import pandas
from mhcflurry.antigen_presentation import (
decoy_strategies,
percent_rank_transform,
presentation_component_models,
presentation_model)
from mhcflurry.amino_acid import COMMON_AMINO_ACIDS
from mhcflurry.common import random_peptides
######################
# Helper functions
def hit_criterion(experiment_name, peptide):
# Peptides with 'A' are always hits. Easy for model to learn.
return 'A' in peptide
######################
# Small test dataset
PEPTIDES = random_peptides(1000, 9)
OTHER_PEPTIDES = random_peptides(1000, 9)
TRANSCRIPTS = [
"transcript-%d" % i
for i in range(1, 10)
]
EXPERIMENT_TO_ALLELES = {
'exp1': ['HLA-A*01:01'],
'exp2': ['HLA-A*02:01', 'HLA-B*51:01'],
}
EXPERIMENT_TO_EXPRESSION_GROUP = {
'exp1': 'group1',
'exp2': 'group2',
}
EXPERESSION_GROUPS = sorted(set(EXPERIMENT_TO_EXPRESSION_GROUP.values()))
TRANSCIPTS_DF = pandas.DataFrame(index=PEPTIDES, columns=EXPERESSION_GROUPS)
TRANSCIPTS_DF[:] = numpy.random.choice(TRANSCRIPTS, size=TRANSCIPTS_DF.shape)
PEPTIDES_AND_TRANSCRIPTS_DF = TRANSCIPTS_DF.stack().to_frame().reset_index()
PEPTIDES_AND_TRANSCRIPTS_DF.columns = ["peptide", "group", "transcript"]
del PEPTIDES_AND_TRANSCRIPTS_DF["group"]
PEPTIDES_DF = pandas.DataFrame({"peptide": PEPTIDES})
PEPTIDES_DF["experiment_name"] = "exp1"
PEPTIDES_DF["hit"] = [
hit_criterion(row.experiment_name, row.peptide)
for _, row in
PEPTIDES_DF.iterrows()
]
print("Hit rate: %0.3f" % PEPTIDES_DF.hit.mean())
AA_COMPOSITION_DF = pandas.DataFrame({
'peptide': sorted(set(PEPTIDES).union(set(OTHER_PEPTIDES))),
})
for aa in sorted(COMMON_AMINO_ACIDS):
AA_COMPOSITION_DF[aa] = AA_COMPOSITION_DF.peptide.str.count(aa)
AA_COMPOSITION_DF.index = AA_COMPOSITION_DF.peptide
del AA_COMPOSITION_DF['peptide']
HITS_DF = PEPTIDES_DF.ix[PEPTIDES_DF.hit].reset_index().copy()
# Add some duplicates:
HITS_DF = pandas.concat([HITS_DF, HITS_DF.iloc[:10]], ignore_index=True)
del HITS_DF["hit"]
######################
# Tests
def test_percent_rank_transform():
model = percent_rank_transform.PercentRankTransform()
model.fit(numpy.arange(1000))
assert_allclose(
model.transform([-2, 0, 50, 100, 2000]),
[0.0, 0.0, 5.0, 10.0, 100.0],
err_msg=str(model.__dict__))
def mhcflurry_basic_model():
return presentation_component_models.MHCflurryTrainedOnHits(
predictor_name="mhcflurry_affinity",
experiment_to_alleles=EXPERIMENT_TO_ALLELES,
experiment_to_expression_group=EXPERIMENT_TO_EXPRESSION_GROUP,
transcripts=TRANSCIPTS_DF,
peptides_and_transcripts=PEPTIDES_AND_TRANSCRIPTS_DF,
random_peptides_for_percent_rank=OTHER_PEPTIDES,
)
def mhcflurry_released_model():
return presentation_component_models.MHCflurryReleased(
predictor_name="mhcflurry_ensemble",
experiment_to_alleles=EXPERIMENT_TO_ALLELES,
random_peptides_for_percent_rank=OTHER_PEPTIDES,
fit_cache_policy="strong",
predictions_cache_policy="strong")
def test_mhcflurry_trained_on_hits():
mhcflurry_model = mhcflurry_basic_model()
mhcflurry_model.fit(HITS_DF)
peptides = PEPTIDES_DF.copy()
predictions = mhcflurry_model.predict(peptides)
peptides["affinity"] = predictions["mhcflurry_affinity_value"]
peptides["percent_rank"] = predictions[
"mhcflurry_affinity_percentile_rank"
]
assert_less(
peptides.affinity[peptides.hit].mean(),
peptides.affinity[~peptides.hit].mean())
assert_less(
peptides.percent_rank[peptides.hit].mean(),
peptides.percent_rank[~peptides.hit].mean())
def compare_predictions(peptides_df, model1, model2):
predictions1 = model1.predict(peptides_df)
predictions2 = model2.predict(peptides_df)
failed = False
for i in range(len(peptides_df)):
if abs(predictions1[i] - predictions2[i]) > .0001:
failed = True
print(
"Compare predictions: mismatch at index %d: "
"%f != %f, row: %s" % (
i,
predictions1[i],
predictions2[i],
str(peptides_df.iloc[i])))
assert not failed
def test_presentation_model():
mhcflurry_model = mhcflurry_basic_model()
mhcflurry_ie_model = mhcflurry_released_model()
aa_content_model = (
presentation_component_models.FixedPerPeptideQuantity(
"aa composition",
numpy.log1p(AA_COMPOSITION_DF)))
decoys = decoy_strategies.UniformRandom(
OTHER_PEPTIDES,
decoys_per_hit=50)
terms = {
'A_ie': (
[mhcflurry_ie_model],
["log1p(mhcflurry_ensemble_affinity)"]),
'A_ms': (
[mhcflurry_model],
["log1p(mhcflurry_affinity_value)"]),
'P': (
[aa_content_model],
list(AA_COMPOSITION_DF.columns)),
}
for kwargs in [{}, {'ensemble_size': 3}]:
models = presentation_model.build_presentation_models(
terms,
["A_ms", "A_ms + P", "A_ie + P"],
decoy_strategy=decoys,
**kwargs)
eq_(len(models), 3)
unfit_model = models["A_ms"]
model = unfit_model.clone()
model.fit(HITS_DF.ix[HITS_DF.experiment_name == "exp1"])
peptides = PEPTIDES_DF.copy()
peptides["prediction"] = model.predict(peptides)
print(peptides)
print("Hit mean", peptides.prediction[peptides.hit].mean())
print("Decoy mean", peptides.prediction[~peptides.hit].mean())
assert_less(
peptides.prediction[~peptides.hit].mean(),
peptides.prediction[peptides.hit].mean())
model2 = pickle.loads(pickle.dumps(model))
compare_predictions(peptides, model, model2)
model3 = unfit_model.clone()
assert not model3.has_been_fit
model3.restore_fit(model2.get_fit())
compare_predictions(peptides, model, model3)
better_unfit_model = models["A_ms + P"]
model = better_unfit_model.clone()
model.fit(HITS_DF.ix[HITS_DF.experiment_name == "exp1"])
peptides["prediction_better"] = model.predict(peptides)
assert_less(
peptides.prediction_better[~peptides.hit].mean(),
peptides.prediction[~peptides.hit].mean())
assert_less(
peptides.prediction[peptides.hit].mean(),
peptides.prediction_better[peptides.hit].mean())
another_unfit_model = models["A_ie + P"]
model = another_unfit_model.clone()
model.fit(HITS_DF.ix[HITS_DF.experiment_name == "exp1"])
model.predict(peptides)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment