From 733d8f4f1ea0f7d7d482c5ad445076e30806db94 Mon Sep 17 00:00:00 2001 From: Tim O'Donnell <timodonnell@gmail.com> Date: Tue, 1 Oct 2019 10:11:34 -0400 Subject: [PATCH] fix --- .../data_mass_spec_benchmark/GENERATE.sh | 3 +- .../data_mass_spec_benchmark/run_mhcflurry.py | 114 +++++++++--------- mhcflurry/class1_affinity_predictor.py | 5 +- 3 files changed, 64 insertions(+), 58 deletions(-) diff --git a/downloads-generation/data_mass_spec_benchmark/GENERATE.sh b/downloads-generation/data_mass_spec_benchmark/GENERATE.sh index fde917c4..1818ddbf 100755 --- a/downloads-generation/data_mass_spec_benchmark/GENERATE.sh +++ b/downloads-generation/data_mass_spec_benchmark/GENERATE.sh @@ -62,8 +62,9 @@ for kind in with_mass_spec no_mass_spec do python run_mhcflurry.py \ proteome_peptides.csv.bz2 \ - --chunk-size 10000000 \ + --chunk-size 100000 \ --models-dir "$(mhcflurry-downloads path models_class1_pan)/models.$kind" \ + --batch-size 65536 \ --allele $(cat alleles.txt) \ --out "predictions/mhcflurry.$kind" \ --num-jobs $NUM_JOBS --max-tasks-per-worker 1 --gpus $GPUS --max-workers-per-gpu 1 diff --git a/downloads-generation/data_mass_spec_benchmark/run_mhcflurry.py b/downloads-generation/data_mass_spec_benchmark/run_mhcflurry.py index a2602e36..8cb55779 100644 --- a/downloads-generation/data_mass_spec_benchmark/run_mhcflurry.py +++ b/downloads-generation/data_mass_spec_benchmark/run_mhcflurry.py @@ -56,7 +56,7 @@ parser.add_argument( parser.add_argument( "--chunk-size", type=int, - default=100000000, + default=100000, help="Num peptides per job. Default: %(default)s") parser.add_argument( "--batch-size", @@ -76,6 +76,12 @@ parser.add_argument( type=int, help="Keras verbosity. Default: %(default)s", default=0) +parser.add_argument( + "--max-peptides", + type=int, + help="Max peptides to process. For debugging.", + default=None) + add_local_parallelism_args(parser) add_cluster_parallelism_args(parser) @@ -97,18 +103,17 @@ def run(argv=sys.argv[1:]): serial_run = not args.cluster_parallelism and args.num_jobs == 0 # It's important that we don't trigger a Keras import here since that breaks - # local parallelism (tensorflow backend). So we set optimization_level=0 if - # using local parallelism. + # local parallelism (tensorflow backend). So we set optimization_level=0. predictor = Class1AffinityPredictor.load( args.models_dir, - #optimization_level=None if serial_run or args.cluster_parallelism else 0, optimization_level=0, ) alleles = [normalize_allele_name(a) for a in args.allele] alleles = sorted(set(alleles)) - peptides = pandas.read_csv(args.input_peptides).peptide.drop_duplicates() + peptides = pandas.read_csv( + args.input_peptides, nrows=args.max_peptides).peptide.drop_duplicates() print("Filtering to valid peptides. Starting at: ", len(peptides)) peptides = peptides[peptides.str.match("^[ACDEFGHIKLMNPQRSTVWY]+$")] print("Filtered to: ", len(peptides)) @@ -136,11 +141,8 @@ def run(argv=sys.argv[1:]): print("Wrote: ", out_alleles) num_chunks = int(math.ceil(len(peptides) / args.chunk_size)) - print("Split peptides into %d chunks" % num_chunks) - peptide_chunks = [ - EncodableSequences.create(chunk) - for chunk in numpy.array_split(peptides, num_chunks) - ] + print("Splitting peptides into %d chunks" % num_chunks) + peptide_chunks = numpy.array_split(peptides, num_chunks) GLOBAL_DATA["predictor"] = predictor GLOBAL_DATA["args"] = { @@ -151,14 +153,13 @@ def run(argv=sys.argv[1:]): } work_items = [] - for allele in alleles: - for (chunk_index, chunk_peptides) in enumerate(peptide_chunks): - work_item = { - 'allele': allele, - 'chunk_index': chunk_index, - 'chunk_peptides': chunk_peptides, - } - work_items.append(work_item) + for (chunk_index, chunk_peptides) in enumerate(peptide_chunks): + work_item = { + 'alleles': alleles, + 'chunk_index': chunk_index, + 'peptides': chunk_peptides, + } + work_items.append(work_item) print("Work items: ", len(work_items)) worker_pool = None @@ -191,27 +192,30 @@ def run(argv=sys.argv[1:]): for allele in alleles: allele_to_chunk_index_to_predictions[allele] = {} - for (allele, chunk_index, predictions) in tqdm.tqdm( + for (chunk_index, allele_to_predictions) in tqdm.tqdm( results, total=len(work_items)): - - chunk_index_to_predictions = allele_to_chunk_index_to_predictions[allele] - - assert chunk_index not in chunk_index_to_predictions - chunk_index_to_predictions[chunk_index] = predictions - - if len(allele_to_chunk_index_to_predictions[allele]) == num_chunks: - chunk_predictions = sorted(chunk_index_to_predictions.items()) - assert [i for (i, _) in chunk_predictions] == list(range(num_chunks)) - predictions = numpy.concatenate([ - predictions for (_, predictions) in chunk_predictions - ]) - assert len(predictions) == num_peptides - out_path = os.path.join(args.out, allele.replace("*", "")) + ".npz" - out_path = os.path.abspath(out_path) - numpy.savez(out_path, predictions) - print("Wrote:", out_path) - - del allele_to_chunk_index_to_predictions[allele] + for (allele, predictions) in allele_to_predictions.items(): + chunk_index_to_predictions = allele_to_chunk_index_to_predictions[ + allele + ] + assert chunk_index not in chunk_index_to_predictions + chunk_index_to_predictions[chunk_index] = predictions + + if len(allele_to_chunk_index_to_predictions[allele]) == num_chunks: + chunk_predictions = sorted(chunk_index_to_predictions.items()) + assert [i for (i, _) in chunk_predictions] == list( + range(num_chunks)) + predictions = numpy.concatenate([ + predictions for (_, predictions) in chunk_predictions + ]) + assert len(predictions) == num_peptides + out_path = os.path.join( + args.out, allele.replace("*", "")) + ".npz" + out_path = os.path.abspath(out_path) + numpy.savez(out_path, predictions) + print("Wrote:", out_path) + + del allele_to_chunk_index_to_predictions[allele] assert not allele_to_chunk_index_to_predictions, ( "Not all results written: ", allele_to_chunk_index_to_predictions) @@ -225,35 +229,35 @@ def run(argv=sys.argv[1:]): prediction_time / 60.0)) -def do_predictions(allele, chunk_index, chunk_peptides, constant_data=GLOBAL_DATA): +def do_predictions(chunk_index, peptides, alleles, constant_data=GLOBAL_DATA): return predict_for_allele( - allele, chunk_index, - chunk_peptides, - constant_data['predictor'], + peptides, + alleles, + predictor=constant_data['predictor'], **constant_data["args"]) def predict_for_allele( - allele, chunk_index, - chunk_peptides, + peptides, + alleles, predictor, verbose=False, model_kwargs={}): - if verbose: - print("Predicting", allele) - predictor.optimize() # since we may have loaded with optimization_level=0 + predictor.optimize(warn=False) # since we loaded with optimization_level=0 start = time.time() - predictions = predictor.predict( - peptides=chunk_peptides, - allele=allele, - throw=False, - model_kwargs=model_kwargs).astype('float32') + results = {} + peptides = EncodableSequences.create(peptides) + for allele in alleles: + results[allele] = predictor.predict( + peptides=peptides, + allele=allele, + throw=False, + model_kwargs=model_kwargs).astype('float32') if verbose: - print("Done predicting", allele, "in", time.time() - start, "sec") - - return (allele, chunk_index, predictions) + print("Done predicting in", time.time() - start, "sec") + return (chunk_index, results) if __name__ == '__main__': diff --git a/mhcflurry/class1_affinity_predictor.py b/mhcflurry/class1_affinity_predictor.py index 57637888..740f7697 100644 --- a/mhcflurry/class1_affinity_predictor.py +++ b/mhcflurry/class1_affinity_predictor.py @@ -521,7 +521,7 @@ class Class1AffinityPredictor(object): "succeeded" if optimized else "not supported for these models") return result - def optimize(self): + def optimize(self, warn=True): """ EXPERIMENTAL: Optimize the predictor for faster predictions. @@ -545,7 +545,8 @@ class Class1AffinityPredictor(object): merge_method="concatenate") ] except NotImplementedError as e: - logging.warning("Optimization failed: %s", str(e)) + if warn: + logging.warning("Optimization failed: %s", str(e)) return False self._manifest_df = None self.clear_cache() -- GitLab