From c9b683e924364605117737c1c3754dd2dc50fc42 Mon Sep 17 00:00:00 2001 From: Tim O'Donnell <timodonnell@gmail.com> Date: Thu, 3 Oct 2019 13:01:13 -0400 Subject: [PATCH] fix --- .../{requiments.txt => requirements.txt} | 0 .../data_mass_spec_benchmark/run_mhcflurry.py | 257 ------------------ ...dparty_predictors.py => run_predictors.py} | 10 +- 3 files changed, 6 insertions(+), 261 deletions(-) rename downloads-generation/data_mass_spec_benchmark/{requiments.txt => requirements.txt} (100%) delete mode 100644 downloads-generation/data_mass_spec_benchmark/run_mhcflurry.py rename downloads-generation/data_mass_spec_benchmark/{run_thirdparty_predictors.py => run_predictors.py} (97%) diff --git a/downloads-generation/data_mass_spec_benchmark/requiments.txt b/downloads-generation/data_mass_spec_benchmark/requirements.txt similarity index 100% rename from downloads-generation/data_mass_spec_benchmark/requiments.txt rename to downloads-generation/data_mass_spec_benchmark/requirements.txt diff --git a/downloads-generation/data_mass_spec_benchmark/run_mhcflurry.py b/downloads-generation/data_mass_spec_benchmark/run_mhcflurry.py deleted file mode 100644 index e69bd2cd..00000000 --- a/downloads-generation/data_mass_spec_benchmark/run_mhcflurry.py +++ /dev/null @@ -1,257 +0,0 @@ -""" -""" -import argparse -import os -import signal -import sys -import time -import traceback -import math -from functools import partial - -import numpy -import pandas - -from mhcnames import normalize_allele_name -import tqdm # progress bar -tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481 - -from mhcflurry.class1_affinity_predictor import Class1AffinityPredictor -from mhcflurry.common import configure_logging -from mhcflurry.local_parallelism import ( - add_local_parallelism_args, - worker_pool_with_gpu_assignments_from_args, - call_wrapped_kwargs) -from mhcflurry.cluster_parallelism import ( - add_cluster_parallelism_args, - cluster_results_from_args) - - -# To avoid pickling large matrices to send to child processes when running in -# parallel, we use this global variable as a place to store data. Data that is -# stored here before creating the thread pool will be inherited to the child -# processes upon fork() call, allowing us to share large data with the workers -# via shared memory. -GLOBAL_DATA = {} - -parser = argparse.ArgumentParser(usage=__doc__) - -parser.add_argument( - "input_peptides", - metavar="CSV", - help="CSV file with 'peptide' column") -parser.add_argument( - "--models-dir", - metavar="DIR", - required=True, - help="Directory to read MHCflurry models") -parser.add_argument( - "--allele", - default=None, - required=True, - nargs="+", - help="Alleles to predict") -parser.add_argument( - "--chunk-size", - type=int, - default=100000, - help="Num peptides per job. Default: %(default)s") -parser.add_argument( - "--batch-size", - type=int, - default=4096, - help="Keras batch size for predictions. Default: %(default)s") -parser.add_argument( - "--out", - metavar="DIR", - help="Write results to DIR") -parser.add_argument( - "--verbosity", - type=int, - help="Keras verbosity. Default: %(default)s", - default=0) -parser.add_argument( - "--max-peptides", - type=int, - help="Max peptides to process. For debugging.", - default=None) - - -add_local_parallelism_args(parser) -add_cluster_parallelism_args(parser) - - -def run(argv=sys.argv[1:]): - global GLOBAL_DATA - - # On sigusr1 print stack trace - print("To show stack trace, run:\nkill -s USR1 %d" % os.getpid()) - signal.signal(signal.SIGUSR1, lambda sig, frame: traceback.print_stack()) - - args = parser.parse_args(argv) - - args.models_dir = os.path.abspath(args.models_dir) - - configure_logging(verbose=args.verbosity > 1) - - serial_run = not args.cluster_parallelism and args.num_jobs == 0 - - # It's important that we don't trigger a Keras import here since that breaks - # local parallelism (tensorflow backend). So we set optimization_level=0. - predictor = Class1AffinityPredictor.load( - args.models_dir, - optimization_level=0, - ) - - alleles = [normalize_allele_name(a) for a in args.allele] - alleles = sorted(set(alleles)) - - peptides = pandas.read_csv( - args.input_peptides, nrows=args.max_peptides).peptide.drop_duplicates() - print("Filtering to valid peptides. Starting at: ", len(peptides)) - peptides = peptides[peptides.str.match("^[ACDEFGHIKLMNPQRSTVWY]+$")] - print("Filtered to: ", len(peptides)) - peptides = peptides.unique() - num_peptides = len(peptides) - - print("Predictions for %d alleles x %d peptides." % ( - len(alleles), num_peptides)) - - if not os.path.exists(args.out): - print("Creating", args.out) - os.mkdir(args.out) - - # Write peptide and allele lists to out dir. - out_peptides = os.path.abspath(os.path.join(args.out, "peptides.csv")) - pandas.DataFrame({"peptide": peptides}).to_csv(out_peptides, index=False) - print("Wrote: ", out_peptides) - allele_to_file_path = dict( - (allele, "%s.npz" % (allele.replace("*", ""))) for allele in alleles) - out_alleles = os.path.abspath(os.path.join(args.out, "alleles.csv")) - pandas.DataFrame({ - 'allele': alleles, - 'path': [allele_to_file_path[allele] for allele in alleles], - }).to_csv(out_alleles, index=False) - print("Wrote: ", out_alleles) - - num_chunks = int(math.ceil(len(peptides) / args.chunk_size)) - print("Splitting peptides into %d chunks" % num_chunks) - peptide_chunks = numpy.array_split(peptides, num_chunks) - - GLOBAL_DATA["predictor"] = predictor - GLOBAL_DATA["args"] = { - 'verbose': args.verbosity > 0, - 'model_kwargs': { - 'batch_size': args.batch_size, - } - } - - work_items = [] - for (chunk_index, chunk_peptides) in enumerate(peptide_chunks): - work_item = { - 'alleles': alleles, - 'chunk_index': chunk_index, - 'peptides': chunk_peptides, - } - work_items.append(work_item) - print("Work items: ", len(work_items)) - - worker_pool = None - start = time.time() - if serial_run: - # Serial run - print("Running in serial.") - results = ( - do_predictions(**item) for item in work_items) - elif args.cluster_parallelism: - # Run using separate processes HPC cluster. - print("Running on cluster.") - results = cluster_results_from_args( - args, - work_function=do_predictions, - work_items=work_items, - constant_data=GLOBAL_DATA, - input_serialization_method="dill", - result_serialization_method="pickle", - clear_constant_data=True) - else: - worker_pool = worker_pool_with_gpu_assignments_from_args(args) - print("Worker pool", worker_pool) - assert worker_pool is not None - results = worker_pool.imap_unordered( - partial(call_wrapped_kwargs, do_predictions), - work_items, - chunksize=1) - - allele_to_chunk_index_to_predictions = {} - for allele in alleles: - allele_to_chunk_index_to_predictions[allele] = {} - - for (chunk_index, allele_to_predictions) in tqdm.tqdm( - results, total=len(work_items)): - for (allele, predictions) in allele_to_predictions.items(): - chunk_index_to_predictions = allele_to_chunk_index_to_predictions[ - allele - ] - assert chunk_index not in chunk_index_to_predictions - chunk_index_to_predictions[chunk_index] = predictions - - if len(allele_to_chunk_index_to_predictions[allele]) == num_chunks: - chunk_predictions = sorted(chunk_index_to_predictions.items()) - assert [i for (i, _) in chunk_predictions] == list( - range(num_chunks)) - predictions = numpy.concatenate([ - predictions for (_, predictions) in chunk_predictions - ]) - assert len(predictions) == num_peptides - out_path = os.path.join( - args.out, allele.replace("*", "")) + ".npz" - out_path = os.path.abspath(out_path) - numpy.savez(out_path, predictions) - print("Wrote:", out_path) - - del allele_to_chunk_index_to_predictions[allele] - - assert not allele_to_chunk_index_to_predictions, ( - "Not all results written: ", allele_to_chunk_index_to_predictions) - - if worker_pool: - worker_pool.close() - worker_pool.join() - - prediction_time = time.time() - start - print("Done generating predictions in %0.2f min." % ( - prediction_time / 60.0)) - - -def do_predictions(chunk_index, peptides, alleles, constant_data=None): - # This may run on the cluster in a way that misses all top level imports, - # so we have to re-import everything here. - import time - from mhcflurry.encodable_sequences import EncodableSequences - - if constant_data is None: - constant_data = GLOBAL_DATA - - predictor = constant_data['predictor'] - verbose = constant_data['args'].get("verbose", False) - model_kwargs = constant_data['args'].get("model_kwargs", {}) - - predictor.optimize(warn=False) # since we loaded with optimization_level=0 - start = time.time() - results = {} - peptides = EncodableSequences.create(peptides) - for (i, allele) in enumerate(alleles): - print("Processing allele %d / %d: %0.2f sec elapsed" % ( - i + 1, len(alleles), time.time() - start)) - results[allele] = predictor.predict( - peptides=peptides, - allele=allele, - throw=False, - model_kwargs=model_kwargs).astype('float32') - print("Done predicting in", time.time() - start, "sec") - return (chunk_index, results) - - -if __name__ == '__main__': - run() diff --git a/downloads-generation/data_mass_spec_benchmark/run_thirdparty_predictors.py b/downloads-generation/data_mass_spec_benchmark/run_predictors.py similarity index 97% rename from downloads-generation/data_mass_spec_benchmark/run_thirdparty_predictors.py rename to downloads-generation/data_mass_spec_benchmark/run_predictors.py index 26b5c694..fd871ff6 100644 --- a/downloads-generation/data_mass_spec_benchmark/run_thirdparty_predictors.py +++ b/downloads-generation/data_mass_spec_benchmark/run_predictors.py @@ -75,6 +75,7 @@ parser.add_argument( parser.add_argument( "--reuse-predictions", metavar="DIR", + nargs="+", help="Take predictions from indicated DIR instead of re-running them") add_local_parallelism_args(parser) @@ -214,10 +215,11 @@ def run(argv=sys.argv[1:]): result_df[:] = numpy.nan if args.reuse_predictions: - print("Loading predictions", args.reuse_predictions) - result_df = load_results(args.reuse_predictions, result_df) - print("Existing data filled %f%% entries" % ( - result_df.notnull().values.mean())) + for dirname in args.reuse_predictions: + print("Loading predictions", dirname) + result_df = load_results(dirname, result_df) + print("Existing data filled %f%% entries" % ( + result_df.notnull().values.mean())) # We rerun any alleles have nulls for any kind of values # (affinity, percentile rank, elution score). -- GitLab