From 92ee5a23c0618588eab15788ad8623c25bfcacb6 Mon Sep 17 00:00:00 2001 From: Tim O'Donnell <timodonnell@gmail.com> Date: Thu, 3 Oct 2019 12:58:41 -0400 Subject: [PATCH] update --- .../run_thirdparty_predictors.py | 158 ++++++++++++++++-- 1 file changed, 142 insertions(+), 16 deletions(-) diff --git a/downloads-generation/data_mass_spec_benchmark/run_thirdparty_predictors.py b/downloads-generation/data_mass_spec_benchmark/run_thirdparty_predictors.py index a45115ee..26b5c694 100644 --- a/downloads-generation/data_mass_spec_benchmark/run_thirdparty_predictors.py +++ b/downloads-generation/data_mass_spec_benchmark/run_thirdparty_predictors.py @@ -16,7 +16,6 @@ from mhcnames import normalize_allele_name import tqdm # progress bar tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481 -from mhcflurry.class1_affinity_predictor import Class1AffinityPredictor from mhcflurry.common import configure_logging from mhcflurry.local_parallelism import ( add_local_parallelism_args, @@ -43,7 +42,16 @@ parser.add_argument( parser.add_argument( "--predictor", required=True, - choices=("netmhcpan4", "netmhcpan4-el")) + choices=("mhcflurry", "netmhcpan4")) +parser.add_argument( + "--mhcflurry-models-dir", + metavar="DIR", + help="Directory to read MHCflurry models") +parser.add_argument( + "--mhcflurry-batch-size", + type=int, + default=4096, + help="Keras batch size for MHCflurry predictions. Default: %(default)s") parser.add_argument( "--allele", default=None, @@ -73,27 +81,79 @@ add_local_parallelism_args(parser) add_cluster_parallelism_args(parser) -def load_results(dirname, result_df=None, col_names=None): +def load_results(dirname, result_df=None): peptides = pandas.read_csv( - os.path.join(dirname, "peptides.csv")).peptide.values + os.path.join(dirname, "peptides.csv")).peptide manifest_df = pandas.read_csv(os.path.join(dirname, "alleles.csv")) - if col_names: - manifest_df = manifest_df.loc[manifest_df.col.isin(col_names)] + + print( + "Loading results. Existing data has", + len(peptides), + "peptides and", + len(manifest_df), + "columns") if result_df is None: result_df = pandas.DataFrame( index=peptides, columns=manifest_df.col.values, dtype="float32") result_df[:] = numpy.nan + else: + manifest_df = manifest_df.loc[manifest_df.col.isin(result_df.columns)] + peptides = peptides[peptides.isin(result_df.index)] + + print("Will load", len(peptides), "peptides and", len(manifest_df), "cols") for _, row in manifest_df.iterrows(): with open(os.path.join(dirname, row.path), "rb") as fd: result_df.loc[ - peptides, row.col + peptides.values, row.col ] = numpy.load(fd)['arr_0'] return result_df +def blocks_of_ones(arr): + """ + Given a binary matrix, return indices of rectangular blocks of 1s. + + Parameters + ---------- + arr : binary matrix + + Returns + ------- + List of (x1, y1, x2, y2) where all indices are INCLUSIVE. Each block spans + from (x1, y1) on its upper left corner to (x2, y2) on its lower right corner. + + """ + arr = arr.copy() + blocks = [] + while arr.sum() > 0: + (x1, y1) = numpy.unravel_index(arr.argmax(), arr.shape) + block = [x1, y1, x1, y1] + + # Extend in first dimension as far as possible + down_stop = numpy.argmax(arr[x1:, y1] == 0) - 1 + if down_stop == -1: + block[2] = arr.shape[0] - 1 + else: + assert down_stop >= 0 + block[2] = x1 + down_stop + + # Extend in second dimension as far as possible + for i in range(y1, arr.shape[1]): + if (arr[block[0] : block[2] + 1, i] == 1).all(): + block[3] = i + + # Zero out block: + assert ( + arr[block[0]: block[2] + 1, block[1] : block[3] + 1] == 1).all(), (arr, block) + arr[block[0] : block[2] + 1, block[1] : block[3] + 1] = 0 + + blocks.append(block) + return blocks + + def run(argv=sys.argv[1:]): global GLOBAL_DATA @@ -125,8 +185,8 @@ def run(argv=sys.argv[1:]): print("Creating", args.out) os.mkdir(args.out) - GLOBAL_DATA["predictor"] = args.predictor + GLOBAL_DATA["args"] = args # Write peptide and allele lists to out dir. out_peptides = os.path.abspath(os.path.join(args.out, "peptides.csv")) @@ -150,11 +210,40 @@ def run(argv=sys.argv[1:]): print("Wrote: ", out_manifest) result_df = pandas.DataFrame( - index=peptides, columns=manifest_df.columns.values, dtype="float32") + index=peptides, columns=manifest_df.col.values, dtype="float32") result_df[:] = numpy.nan if args.reuse_predictions: - raise NotImplementedError() + print("Loading predictions", args.reuse_predictions) + result_df = load_results(args.reuse_predictions, result_df) + print("Existing data filled %f%% entries" % ( + result_df.notnull().values.mean())) + + # We rerun any alleles have nulls for any kind of values + # (affinity, percentile rank, elution score). + print("Computing blocks.") + start = time.time() + blocks = blocks_of_ones(result_df.isnull().values) + print("Found %d blocks in %f sec." % ( + len(blocks), (time.time() - start))) + + work_items = [] + for (row_index1, col_index1, row_index2, col_index2) in blocks: + block_cols = result_df.columns[col_index1 : col_index2 + 1] + block_alleles = sorted(set([x.split()[0] for x in block_cols])) + block_peptides = result_df.index[row_index1 : row_index2 + 1] + + print("Block: ", row_index1, col_index1, row_index2, col_index2) + num_chunks = int(math.ceil(len(block_peptides) / args.chunk_size)) + print("Splitting peptides into %d chunks" % num_chunks) + peptide_chunks = numpy.array_split(peptides, num_chunks) + + for chunk_peptides in peptide_chunks: + work_item = { + 'alleles': block_alleles, + 'peptides': chunk_peptides, + } + work_items.append(work_item) else: # Same number of chunks for all alleles num_chunks = int(math.ceil(len(peptides) / args.chunk_size)) @@ -162,7 +251,7 @@ def run(argv=sys.argv[1:]): peptide_chunks = numpy.array_split(peptides, num_chunks) work_items = [] - for (chunk_index, chunk_peptides) in enumerate(peptide_chunks): + for (_, chunk_peptides) in enumerate(peptide_chunks): work_item = { 'alleles': alleles, 'peptides': chunk_peptides, @@ -173,19 +262,24 @@ def run(argv=sys.argv[1:]): for (i, work_item) in enumerate(work_items): work_item["work_item_num"] = i + if args.predictor == "mhcflurry": + do_predictions_function = do_predictions_mhcflurry + else: + do_predictions_function = do_predictions_mhctools + worker_pool = None start = time.time() if serial_run: # Serial run print("Running in serial.") results = ( - do_predictions(**item) for item in work_items) + do_predictions_function(**item) for item in work_items) elif args.cluster_parallelism: # Run using separate processes HPC cluster. print("Running on cluster.") results = cluster_results_from_args( args, - work_function=do_predictions, + work_function=do_predictions_function, work_items=work_items, constant_data=GLOBAL_DATA, input_serialization_method="dill", @@ -196,7 +290,7 @@ def run(argv=sys.argv[1:]): print("Worker pool", worker_pool) assert worker_pool is not None results = worker_pool.imap_unordered( - partial(call_wrapped_kwargs, do_predictions), + partial(call_wrapped_kwargs, do_predictions_function), work_items, chunksize=1) @@ -231,7 +325,7 @@ def run(argv=sys.argv[1:]): prediction_time / 60.0)) -def do_predictions(work_item_num, peptides, alleles, constant_data=None): +def do_predictions_mhctools(work_item_num, peptides, alleles, constant_data=None): # This may run on the cluster in a way that misses all top level imports, # so we have to re-import everything here. import time @@ -242,7 +336,7 @@ def do_predictions(work_item_num, peptides, alleles, constant_data=None): if constant_data is None: constant_data = GLOBAL_DATA - predictor_name = constant_data['predictor'] + predictor_name = constant_data['args'].predictor if predictor_name == "netmhcpan4": predictor = mhctools.NetMHCpan4( alleles=alleles, program_name="netMHCpan-4.0") @@ -262,5 +356,37 @@ def do_predictions(work_item_num, peptides, alleles, constant_data=None): return (work_item_num, results) +def do_predictions_mhcflurry(work_item_num, peptides, alleles, constant_data=None): + # This may run on the cluster in a way that misses all top level imports, + # so we have to re-import everything here. + import time + from mhcflurry.encodable_sequences import EncodableSequences + from mhcflurry import Class1AffinityPredictor + + if constant_data is None: + constant_data = GLOBAL_DATA + + args = constant_data['args'] + + assert args.predictor == "mhcflurry" + + predictor = Class1AffinityPredictor.load(args.mhcflurry_models_dir) + + start = time.time() + results = {} + peptides = EncodableSequences.create(peptides) + for (i, allele) in enumerate(alleles): + print("Processing allele %d / %d: %0.2f sec elapsed" % ( + i + 1, len(alleles), time.time() - start)) + for col in ["affinity"]: + results["%s %s" % (allele, col)] = predictor.predict( + peptides=peptides, + allele=allele, + throw=False, + model_kwargs={'batch_size': args.batch_size}).astype('float32') + print("Done predicting in", time.time() - start, "sec") + return (work_item_num, results) + + if __name__ == '__main__': run() -- GitLab