diff --git a/downloads-generation/data_mass_spec_benchmark/run_predictors.py b/downloads-generation/data_mass_spec_benchmark/run_predictors.py index 7b58966f114f35309aea3b1c6f8a8ffc9ae6138a..3c1a747f9f7a0f26b787194c64a3b8b837b0cc48 100644 --- a/downloads-generation/data_mass_spec_benchmark/run_predictors.py +++ b/downloads-generation/data_mass_spec_benchmark/run_predictors.py @@ -244,16 +244,20 @@ def run(argv=sys.argv[1:]): # We rerun any alleles have nulls for any kind of values # (e.g. affinity, percentile rank, elution score). + is_null_matrix = pandas.DataFrame(columns=alleles, dtype="int8") + for (allele, sub_df) in manifest_df.groupby("allele"): + is_null_matrix[allele] = result_df[sub_df.col.values].isnull().any(1) + print("Fraction null", is_null_matrix.values.mean()) + print("Computing blocks.") start = time.time() - blocks = blocks_of_ones(result_df.isnull().values) + blocks = blocks_of_ones(is_null_matrix.values) print("Found %d blocks in %f sec." % ( len(blocks), (time.time() - start))) work_items = [] for (row_index1, col_index1, row_index2, col_index2) in blocks: - block_cols = result_df.columns[col_index1 : col_index2 + 1] - block_alleles = sorted(set([x.split()[0] for x in block_cols])) + block_alleles = is_null_matrix.columns[col_index1 : col_index2 + 1] block_peptides = result_df.index[row_index1 : row_index2 + 1] print("Block: ", row_index1, col_index1, row_index2, col_index2) @@ -285,6 +289,26 @@ def run(argv=sys.argv[1:]): for (i, work_item) in enumerate(work_items): work_item["work_item_num"] = i + # Combine work items to form tasks. + tasks = [] + peptides_in_last_task = None + # We sort work_items to put small items first so they get combined. + for work_item in sorted(work_items, key=lambda d: len(d['peptides'])): + if peptides_in_last_task is not None and ( + len(work_item['peptides']) + + peptides_in_last_task < args.chunk_size): + + # Add to last task. + tasks[-1]['work_item_dicts'].append(work_item) + peptides_in_last_task += len(work_item['peptides']) + else: + # New task + tasks.append({'work_item_dicts': [work_item]}) + peptides_in_last_task = len(work_item['peptides']) + + print("Collected %d work items into %d tasks" % ( + len(work_items), len(tasks))) + if args.predictor == "mhcflurry": do_predictions_function = do_predictions_mhcflurry else: @@ -296,14 +320,14 @@ def run(argv=sys.argv[1:]): # Serial run print("Running in serial.") results = ( - do_predictions_function(**item) for item in work_items) + do_predictions_function(**task) for task in tasks) elif args.cluster_parallelism: # Run using separate processes HPC cluster. print("Running on cluster.") results = cluster_results_from_args( args, work_function=do_predictions_function, - work_items=work_items, + work_items=tasks, constant_data=GLOBAL_DATA, input_serialization_method="dill", result_serialization_method="pickle", @@ -314,7 +338,7 @@ def run(argv=sys.argv[1:]): assert worker_pool is not None results = worker_pool.imap_unordered( partial(call_wrapped_kwargs, do_predictions_function), - work_items, + tasks, chunksize=1) allele_to_chunk_index_to_predictions = {} @@ -332,16 +356,16 @@ def run(argv=sys.argv[1:]): result_df[col].isnull().mean() * 100.0), out_path) - for (work_item_num, col_to_predictions) in tqdm.tqdm( - results, total=len(work_items)): - for (col, predictions) in col_to_predictions.items(): - result_df.loc[ - work_items[work_item_num]['peptides'], - col - ] = predictions - if time.time() - last_write_time_per_column[col] > 180: - write_col(col) - last_write_time_per_column[col] = time.time() + for worker_results in tqdm.tqdm(results, total=len(work_items)): + for (work_item_num, col_to_predictions) in worker_results: + for (col, predictions) in col_to_predictions.items(): + result_df.loc[ + work_items[work_item_num]['peptides'], + col + ] = predictions + if time.time() - last_write_time_per_column[col] > 180: + write_col(col) + last_write_time_per_column[col] = time.time() print("Done processing. Final write for each column.") for col in result_df.columns: @@ -359,8 +383,14 @@ def run(argv=sys.argv[1:]): prediction_time / 60.0)) -def do_predictions_mhctools( - work_item_num, peptides, alleles, constant_data=None): +def do_predictions_mhctools(work_item_dicts, constant_data=None): + """ + Each tuple of work items consists of: + + (work_item_num, peptides, alleles) + + """ + # This may run on the cluster in a way that misses all top level imports, # so we have to re-import everything here. import time @@ -371,28 +401,43 @@ def do_predictions_mhctools( if constant_data is None: constant_data = GLOBAL_DATA + cols = constant_data['cols'] predictor_name = constant_data['args'].predictor - if predictor_name == "netmhcpan4": - predictor = mhctools.NetMHCpan4( - alleles=alleles, program_name="netMHCpan-4.0") - else: - raise ValueError("Unsupported", predictor_name) - cols = constant_data['cols'] + results = [] + for (i, d) in enumerate(work_item_dicts): + work_item_num = d['work_item_num'] + peptides = d['peptides'] + alleles = d['alleles'] - start = time.time() - df = predictor.predict_peptides_dataframe(peptides) - print("Generated predictions for %d peptides x %d alleles in %0.2f sec." % ( - len(peptides), len(alleles), (time.time() - start))) + print("Processing work item", i + 1, "of", len(work_item_dicts)) + result = {} + results.append((work_item_num, result)) + + if predictor_name == "netmhcpan4": + predictor = mhctools.NetMHCpan4( + alleles=alleles, program_name="netMHCpan-4.0") + else: + raise ValueError("Unsupported", predictor_name) + + start = time.time() + df = predictor.predict_peptides_dataframe(peptides) + print("Predicted for %d peptides x %d alleles in %0.2f sec." % ( + len(peptides), len(alleles), (time.time() - start))) + + for (allele, sub_df) in df.groupby("allele"): + for col in cols: + result["%s %s" % (allele, col)] = ( + sub_df[col].values.astype('float32')) + return results - results = {} - for (allele, sub_df) in df.groupby("allele"): - for col in cols: - results["%s %s" % (allele, col)] = sub_df[col].values.astype('float32') - return (work_item_num, results) +def do_predictions_mhcflurry(work_item_dicts, constant_data=None): + """ + Each dict of work items should have keys: work_item_num, peptides, alleles + + """ -def do_predictions_mhcflurry(work_item_num, peptides, alleles, constant_data=None): # This may run on the cluster in a way that misses all top level imports, # so we have to re-import everything here. import time @@ -409,22 +454,30 @@ def do_predictions_mhcflurry(work_item_num, peptides, alleles, constant_data=Non predictor = Class1AffinityPredictor.load(args.mhcflurry_models_dir) - start = time.time() - results = {} - peptides = EncodableSequences.create(peptides) - for (i, allele) in enumerate(alleles): - print("Processing allele %d / %d: %0.2f sec elapsed" % ( - i + 1, len(alleles), time.time() - start)) - for col in ["affinity"]: - results["%s %s" % (allele, col)] = predictor.predict( - peptides=peptides, - allele=allele, - throw=False, - model_kwargs={ - 'batch_size': args.mhcflurry_batch_size, - }).astype('float32') - print("Done predicting in", time.time() - start, "sec") - return (work_item_num, results) + results = [] + for (i, d) in enumerate(work_item_dicts): + work_item_num = d['work_item_num'] + peptides = d['peptides'] + alleles = d['alleles'] + + print("Processing work item", i + 1, "of", len(work_item_dicts)) + result = {} + results.append((work_item_num, result)) + start = time.time() + peptides = EncodableSequences.create(peptides) + for (i, allele) in enumerate(alleles): + print("Processing allele %d / %d: %0.2f sec elapsed" % ( + i + 1, len(alleles), time.time() - start)) + for col in ["affinity"]: + result["%s %s" % (allele, col)] = predictor.predict( + peptides=peptides, + allele=allele, + throw=False, + model_kwargs={ + 'batch_size': args.mhcflurry_batch_size, + }).astype('float32') + print("Done predicting in", time.time() - start, "sec") + return results if __name__ == '__main__':