-
Tim O'Donnell authored
Fixes #59
Tim O'Donnell authoredFixes #59
train.py 11.08 KiB
# Copyright (c) 2016. Mount Sinai School of Medicine
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import (
print_function,
division,
absolute_import,
)
import collections
import logging
import time
import socket
import math
import numpy
import pandas
import mhcflurry
from .scoring import make_scores
from .class1_binding_predictor import Class1BindingPredictor
from ..hyperparameters import HyperparameterDefaults
from ..parallelism import get_default_executor, map_throw_fast
TRAIN_HYPERPARAMETER_DEFAULTS = HyperparameterDefaults(impute=False)
HYPERPARAMETER_DEFAULTS = (
Class1BindingPredictor.hyperparameter_defaults
.extend(TRAIN_HYPERPARAMETER_DEFAULTS))
AlleleSpecificTrainTestFold = collections.namedtuple(
"AlleleSpecificTrainTestFold",
"allele train imputed_train test")
def impute_and_select_allele(dataset, imputer, allele=None, **kwargs):
'''
Run imputation and optionally filter to the specified allele.
Useful as a parallelized task where we want to filter to the desired
data *before* sending the result back to the master process.
Parameters
-----------
dataset : mhcflurry.Dataset
imputer : object or string
See Dataset.impute_missing_values
allele : string [optional]
Allele name to subselect to after imputation
**kwargs : passed on to dataset.impute_missing_values
Returns
-----------
list of dict
'''
result = dataset.impute_missing_values(imputer, **kwargs)
if allele is not None:
try:
result = result.get_allele(allele)
except KeyError:
result = None
return result
def train_and_test_one_model(model_description, folds, **kwargs):
'''
Train one model on some number of folds.
Parameters
-----------
model_description : dict of model hyperparameters
folds : list of AlleleSpecificTrainTestFold
**kwargs : passed on to train_and_test_one_model_one_fold
Returns
-----------
list of dict giving the train and test results for each fold
'''
logging.info("Training 1 model on %d folds: %s" % (len(folds), folds))
return [
train_and_test_one_model_one_fold(
model_description,
fold.train,
fold.test,
fold.imputed_train,
**kwargs)
for fold in folds
]
def train_and_test_one_model_one_fold(
model_description,
train_dataset,
test_dataset=None,
imputed_train_dataset=None,
return_train_scores=True,
return_predictor=False,
return_train_predictions=False,
return_test_predictions=False):
'''
Task for instantiating, training, and testing one model on one fold.
Parameters
-----------
model_description : dict of model parameters
train_dataset : mhcflurry.Dataset
Dataset to train on. Must include only one allele.
test_dataset : mhcflurry.Dataset, optional
Dataset to test on. Must include only one allele. If not specified
no testing is performed.
imputed_train_dataset : mhcflurry.Dataset, optional
Required only if model_description["impute"] == True
return_train_scores : boolean
Calculate and include in the result dict the auc/f1/tau scores on the
training data.
return_predictor : boolean
Calculate and include in the result dict the trained predictor.
return_train_predictions : boolean
Calculate and include in the result dict the model predictions on the
train data.
return_test_predictions : boolean
Calculate and include in the result dict the model predictions on the
test data.
Returns
-----------
dict
'''
assert len(train_dataset.unique_alleles()) == 1, "Multiple train alleles"
allele = train_dataset.alleles[0]
if test_dataset is not None:
assert len(train_dataset.unique_alleles()) == 1, \
"Multiple test alleles"
assert train_dataset.alleles[0] == allele, \
"Wrong test allele %s != %s" % (train_dataset.alleles[0], allele)
if imputed_train_dataset is not None:
assert len(imputed_train_dataset.unique_alleles()) == 1, \
"Multiple imputed train alleles"
assert imputed_train_dataset.alleles[0] == allele, \
"Wrong imputed train allele %s != %s" % (
imputed_train_dataset.alleles[0], allele)
if model_description["impute"]:
assert imputed_train_dataset is not None
# Make a predictor
model_params = dict(model_description)
fraction_negative = model_params.pop("fraction_negative")
impute = model_params.pop("impute")
n_training_epochs = model_params.pop("n_training_epochs")
pretrain_decay = model_params.pop("pretrain_decay")
batch_size = model_params.pop("batch_size")
max_ic50 = model_params.pop("max_ic50")
logging.info(
"%10s train_size=%d test_size=%d impute=%s model=%s" %
(allele,
len(train_dataset),
len(test_dataset) if test_dataset is not None else 0,
impute,
model_description))
predictor = mhcflurry.Class1BindingPredictor(
max_ic50=max_ic50,
**model_params)
# Train predictor
fit_time = -time.time()
predictor.fit_dataset(
train_dataset,
pretrain_decay=lambda epoch: eval(pretrain_decay, {
'epoch': epoch, 'numpy': numpy}),
pretraining_dataset=imputed_train_dataset if impute else None,
verbose=True,
batch_size=batch_size,
n_training_epochs=n_training_epochs,
n_random_negative_samples=int(fraction_negative * len(train_dataset)))
fit_time += time.time()
result = {
'fit_time': fit_time,
'fit_host': socket.gethostname(),
}
if return_predictor:
result['predictor'] = predictor
if return_train_scores or return_train_predictions:
train_predictions = predictor.predict(train_dataset.peptides)
if return_train_scores:
result['train_scores'] = make_scores(
train_dataset.affinities,
train_predictions,
max_ic50=model_description["max_ic50"])
if return_train_predictions:
result['train_predictions'] = train_predictions
if test_dataset is not None:
test_predictions = predictor.predict(test_dataset.peptides)
result['test_scores'] = make_scores(
test_dataset.affinities,
test_predictions,
max_ic50=model_description["max_ic50"])
if return_test_predictions:
result['test_predictions'] = test_predictions
logging.info("Training result: %s" % result)
return result
def train_across_models_and_folds(
folds,
model_descriptions,
cartesian_product_of_folds_and_models=True,
return_predictors=False,
folds_per_task=1,
executor=None):
'''
Train and optionally test any number of models across any number of folds.
Parameters
-----------
folds : list of AlleleSpecificTrainTestFold
model_descriptions : list of dict
Models to test
cartesian_product_of_folds_and_models : boolean, optional
If true, then a predictor is treained for each fold and model
description.
If false, then len(folds) must equal len(model_descriptions), and
the i'th model is trained on the i'th fold.
return_predictors : boolean, optional
Include the trained predictors in the result.
executor :
Returns
-----------
pandas.DataFrame
'''
if executor is None:
executor = get_default_executor()
if cartesian_product_of_folds_and_models:
tasks_per_model = int(math.ceil(float(len(folds)) / folds_per_task))
fold_index_groups = [[] for _ in range(tasks_per_model)]
index_group = 0
for index in range(len(folds)):
fold_index_groups[index_group].append(index)
index_group += 1
if index_group == len(fold_index_groups):
index_group = 0
task_model_and_fold_indices = [
(model_num, group)
for group in fold_index_groups
for model_num in range(len(model_descriptions))
]
else:
assert len(folds) == len(model_descriptions), \
"folds and models have different lengths and " \
"cartesian_product_of_folds_and_models is False"
task_model_and_fold_indices = [
(num, [num])
for num in range(len(folds))
]
logging.info("Training %d architectures on %d folds = %d tasks." % (
len(model_descriptions), len(folds), len(task_model_and_fold_indices)))
def train_and_test_one_model_task(model_and_fold_nums_pair):
(model_num, fold_nums) = model_and_fold_nums_pair
return train_and_test_one_model(
model_descriptions[model_num],
[folds[i] for i in fold_nums],
return_predictor=return_predictors)
task_results = map_throw_fast(
executor,
train_and_test_one_model_task,
task_model_and_fold_indices)
logging.info("Done.")
results_dict = collections.OrderedDict()
def column(key, value):
if key not in results_dict:
results_dict[key] = []
results_dict[key].append(value)
for ((model_num, fold_nums), task_results_for_folds) in zip(
task_model_and_fold_indices, task_results):
for (fold_num, task_result) in zip(fold_nums, task_results_for_folds):
fold = folds[fold_num]
model_description = model_descriptions[model_num]
column("allele", fold.allele)
column("fold_num", fold_num)
column("model_num", model_num)
column("train_size", len(fold.train))
column(
"test_size",
len(fold.test) if fold.test is not None else None)
column(
"imputed_train_size",
len(fold.imputed_train)
if fold.imputed_train is not None else None)
# Scores
for score_kind in ['train', 'test']:
field = "%s_scores" % score_kind
for (score, value) in task_result.pop(field, {}).items():
column("%s_%s" % (score_kind, score), value)
# Misc. fields
for (key, value) in task_result.items():
column(key, value)
# Model parameters
for (model_param, value) in model_description.items():
column("model_%s" % model_param, value)
results_df = pandas.DataFrame(results_dict)
return results_df