From d3184f295f0841c30826cdca1af837b0dc82be99 Mon Sep 17 00:00:00 2001
From: Tim O'Donnell <timodonnell@gmail.com>
Date: Fri, 4 Oct 2019 09:03:17 -0400
Subject: [PATCH] fix

---
 .../run_predictors.py                         | 153 ++++++++++++------
 1 file changed, 103 insertions(+), 50 deletions(-)

diff --git a/downloads-generation/data_mass_spec_benchmark/run_predictors.py b/downloads-generation/data_mass_spec_benchmark/run_predictors.py
index 7b58966f..3c1a747f 100644
--- a/downloads-generation/data_mass_spec_benchmark/run_predictors.py
+++ b/downloads-generation/data_mass_spec_benchmark/run_predictors.py
@@ -244,16 +244,20 @@ def run(argv=sys.argv[1:]):
 
         # We rerun any alleles have nulls for any kind of values
         # (e.g. affinity, percentile rank, elution score).
+        is_null_matrix = pandas.DataFrame(columns=alleles, dtype="int8")
+        for (allele, sub_df) in manifest_df.groupby("allele"):
+            is_null_matrix[allele] = result_df[sub_df.col.values].isnull().any(1)
+        print("Fraction null", is_null_matrix.values.mean())
+
         print("Computing blocks.")
         start = time.time()
-        blocks = blocks_of_ones(result_df.isnull().values)
+        blocks = blocks_of_ones(is_null_matrix.values)
         print("Found %d blocks in %f sec." % (
             len(blocks), (time.time() - start)))
 
         work_items = []
         for (row_index1, col_index1, row_index2, col_index2) in blocks:
-            block_cols = result_df.columns[col_index1 : col_index2 + 1]
-            block_alleles = sorted(set([x.split()[0] for x in block_cols]))
+            block_alleles = is_null_matrix.columns[col_index1 : col_index2 + 1]
             block_peptides = result_df.index[row_index1 : row_index2 + 1]
 
             print("Block: ", row_index1, col_index1, row_index2, col_index2)
@@ -285,6 +289,26 @@ def run(argv=sys.argv[1:]):
     for (i, work_item) in enumerate(work_items):
         work_item["work_item_num"] = i
 
+    # Combine work items to form tasks.
+    tasks = []
+    peptides_in_last_task = None
+    # We sort work_items to put small items first so they get combined.
+    for work_item in sorted(work_items, key=lambda d: len(d['peptides'])):
+        if peptides_in_last_task is not None and (
+                len(work_item['peptides']) +
+                peptides_in_last_task < args.chunk_size):
+
+            # Add to last task.
+            tasks[-1]['work_item_dicts'].append(work_item)
+            peptides_in_last_task += len(work_item['peptides'])
+        else:
+            # New task
+            tasks.append({'work_item_dicts': [work_item]})
+            peptides_in_last_task = len(work_item['peptides'])
+
+    print("Collected %d work items into %d tasks" % (
+        len(work_items), len(tasks)))
+
     if args.predictor == "mhcflurry":
         do_predictions_function = do_predictions_mhcflurry
     else:
@@ -296,14 +320,14 @@ def run(argv=sys.argv[1:]):
         # Serial run
         print("Running in serial.")
         results = (
-            do_predictions_function(**item) for item in work_items)
+            do_predictions_function(**task) for task in tasks)
     elif args.cluster_parallelism:
         # Run using separate processes HPC cluster.
         print("Running on cluster.")
         results = cluster_results_from_args(
             args,
             work_function=do_predictions_function,
-            work_items=work_items,
+            work_items=tasks,
             constant_data=GLOBAL_DATA,
             input_serialization_method="dill",
             result_serialization_method="pickle",
@@ -314,7 +338,7 @@ def run(argv=sys.argv[1:]):
         assert worker_pool is not None
         results = worker_pool.imap_unordered(
             partial(call_wrapped_kwargs, do_predictions_function),
-            work_items,
+            tasks,
             chunksize=1)
 
     allele_to_chunk_index_to_predictions = {}
@@ -332,16 +356,16 @@ def run(argv=sys.argv[1:]):
                 result_df[col].isnull().mean() * 100.0),
             out_path)
 
-    for (work_item_num, col_to_predictions) in tqdm.tqdm(
-            results, total=len(work_items)):
-        for (col, predictions) in col_to_predictions.items():
-            result_df.loc[
-                work_items[work_item_num]['peptides'],
-                col
-            ] = predictions
-            if time.time() - last_write_time_per_column[col] > 180:
-                write_col(col)
-                last_write_time_per_column[col] = time.time()
+    for worker_results in tqdm.tqdm(results, total=len(work_items)):
+        for (work_item_num, col_to_predictions) in worker_results:
+            for (col, predictions) in col_to_predictions.items():
+                result_df.loc[
+                    work_items[work_item_num]['peptides'],
+                    col
+                ] = predictions
+                if time.time() - last_write_time_per_column[col] > 180:
+                    write_col(col)
+                    last_write_time_per_column[col] = time.time()
 
     print("Done processing. Final write for each column.")
     for col in result_df.columns:
@@ -359,8 +383,14 @@ def run(argv=sys.argv[1:]):
         prediction_time / 60.0))
 
 
-def do_predictions_mhctools(
-        work_item_num, peptides, alleles, constant_data=None):
+def do_predictions_mhctools(work_item_dicts, constant_data=None):
+    """
+    Each tuple of work items consists of:
+
+    (work_item_num, peptides, alleles)
+
+    """
+
     # This may run on the cluster in a way that misses all top level imports,
     # so we have to re-import everything here.
     import time
@@ -371,28 +401,43 @@ def do_predictions_mhctools(
     if constant_data is None:
         constant_data = GLOBAL_DATA
 
+    cols = constant_data['cols']
     predictor_name = constant_data['args'].predictor
-    if predictor_name == "netmhcpan4":
-        predictor = mhctools.NetMHCpan4(
-            alleles=alleles, program_name="netMHCpan-4.0")
-    else:
-        raise ValueError("Unsupported", predictor_name)
 
-    cols = constant_data['cols']
+    results = []
+    for (i, d) in enumerate(work_item_dicts):
+        work_item_num = d['work_item_num']
+        peptides = d['peptides']
+        alleles = d['alleles']
 
-    start = time.time()
-    df = predictor.predict_peptides_dataframe(peptides)
-    print("Generated predictions for %d peptides x %d alleles in %0.2f sec." % (
-        len(peptides), len(alleles), (time.time() - start)))
+        print("Processing work item", i + 1, "of", len(work_item_dicts))
+        result = {}
+        results.append((work_item_num, result))
+
+        if predictor_name == "netmhcpan4":
+            predictor = mhctools.NetMHCpan4(
+                alleles=alleles, program_name="netMHCpan-4.0")
+        else:
+            raise ValueError("Unsupported", predictor_name)
+
+        start = time.time()
+        df = predictor.predict_peptides_dataframe(peptides)
+        print("Predicted for %d peptides x %d alleles in %0.2f sec." % (
+            len(peptides), len(alleles), (time.time() - start)))
+
+        for (allele, sub_df) in df.groupby("allele"):
+            for col in cols:
+                result["%s %s" % (allele, col)] = (
+                    sub_df[col].values.astype('float32'))
+    return results
 
-    results = {}
-    for (allele, sub_df) in df.groupby("allele"):
-        for col in cols:
-            results["%s %s" % (allele, col)] = sub_df[col].values.astype('float32')
-    return (work_item_num, results)
 
+def do_predictions_mhcflurry(work_item_dicts, constant_data=None):
+    """
+    Each dict of work items should have keys: work_item_num, peptides, alleles
+
+    """
 
-def do_predictions_mhcflurry(work_item_num, peptides, alleles, constant_data=None):
     # This may run on the cluster in a way that misses all top level imports,
     # so we have to re-import everything here.
     import time
@@ -409,22 +454,30 @@ def do_predictions_mhcflurry(work_item_num, peptides, alleles, constant_data=Non
 
     predictor = Class1AffinityPredictor.load(args.mhcflurry_models_dir)
 
-    start = time.time()
-    results = {}
-    peptides = EncodableSequences.create(peptides)
-    for (i, allele) in enumerate(alleles):
-        print("Processing allele %d / %d: %0.2f sec elapsed" % (
-            i + 1, len(alleles), time.time() - start))
-        for col in ["affinity"]:
-            results["%s %s" % (allele, col)] = predictor.predict(
-                peptides=peptides,
-                allele=allele,
-                throw=False,
-                model_kwargs={
-                    'batch_size': args.mhcflurry_batch_size,
-                }).astype('float32')
-    print("Done predicting in", time.time() - start, "sec")
-    return (work_item_num, results)
+    results = []
+    for (i, d) in enumerate(work_item_dicts):
+        work_item_num = d['work_item_num']
+        peptides = d['peptides']
+        alleles = d['alleles']
+
+        print("Processing work item", i + 1, "of", len(work_item_dicts))
+        result = {}
+        results.append((work_item_num, result))
+        start = time.time()
+        peptides = EncodableSequences.create(peptides)
+        for (i, allele) in enumerate(alleles):
+            print("Processing allele %d / %d: %0.2f sec elapsed" % (
+                i + 1, len(alleles), time.time() - start))
+            for col in ["affinity"]:
+                result["%s %s" % (allele, col)] = predictor.predict(
+                    peptides=peptides,
+                    allele=allele,
+                    throw=False,
+                    model_kwargs={
+                        'batch_size': args.mhcflurry_batch_size,
+                    }).astype('float32')
+        print("Done predicting in", time.time() - start, "sec")
+    return results
 
 
 if __name__ == '__main__':
-- 
GitLab