Skip to content
Snippets Groups Projects
Commit d3184f29 authored by Tim O'Donnell's avatar Tim O'Donnell
Browse files

fix

parent d32c8f91
No related branches found
No related tags found
No related merge requests found
......@@ -244,16 +244,20 @@ def run(argv=sys.argv[1:]):
# We rerun any alleles have nulls for any kind of values
# (e.g. affinity, percentile rank, elution score).
is_null_matrix = pandas.DataFrame(columns=alleles, dtype="int8")
for (allele, sub_df) in manifest_df.groupby("allele"):
is_null_matrix[allele] = result_df[sub_df.col.values].isnull().any(1)
print("Fraction null", is_null_matrix.values.mean())
print("Computing blocks.")
start = time.time()
blocks = blocks_of_ones(result_df.isnull().values)
blocks = blocks_of_ones(is_null_matrix.values)
print("Found %d blocks in %f sec." % (
len(blocks), (time.time() - start)))
work_items = []
for (row_index1, col_index1, row_index2, col_index2) in blocks:
block_cols = result_df.columns[col_index1 : col_index2 + 1]
block_alleles = sorted(set([x.split()[0] for x in block_cols]))
block_alleles = is_null_matrix.columns[col_index1 : col_index2 + 1]
block_peptides = result_df.index[row_index1 : row_index2 + 1]
print("Block: ", row_index1, col_index1, row_index2, col_index2)
......@@ -285,6 +289,26 @@ def run(argv=sys.argv[1:]):
for (i, work_item) in enumerate(work_items):
work_item["work_item_num"] = i
# Combine work items to form tasks.
tasks = []
peptides_in_last_task = None
# We sort work_items to put small items first so they get combined.
for work_item in sorted(work_items, key=lambda d: len(d['peptides'])):
if peptides_in_last_task is not None and (
len(work_item['peptides']) +
peptides_in_last_task < args.chunk_size):
# Add to last task.
tasks[-1]['work_item_dicts'].append(work_item)
peptides_in_last_task += len(work_item['peptides'])
else:
# New task
tasks.append({'work_item_dicts': [work_item]})
peptides_in_last_task = len(work_item['peptides'])
print("Collected %d work items into %d tasks" % (
len(work_items), len(tasks)))
if args.predictor == "mhcflurry":
do_predictions_function = do_predictions_mhcflurry
else:
......@@ -296,14 +320,14 @@ def run(argv=sys.argv[1:]):
# Serial run
print("Running in serial.")
results = (
do_predictions_function(**item) for item in work_items)
do_predictions_function(**task) for task in tasks)
elif args.cluster_parallelism:
# Run using separate processes HPC cluster.
print("Running on cluster.")
results = cluster_results_from_args(
args,
work_function=do_predictions_function,
work_items=work_items,
work_items=tasks,
constant_data=GLOBAL_DATA,
input_serialization_method="dill",
result_serialization_method="pickle",
......@@ -314,7 +338,7 @@ def run(argv=sys.argv[1:]):
assert worker_pool is not None
results = worker_pool.imap_unordered(
partial(call_wrapped_kwargs, do_predictions_function),
work_items,
tasks,
chunksize=1)
allele_to_chunk_index_to_predictions = {}
......@@ -332,16 +356,16 @@ def run(argv=sys.argv[1:]):
result_df[col].isnull().mean() * 100.0),
out_path)
for (work_item_num, col_to_predictions) in tqdm.tqdm(
results, total=len(work_items)):
for (col, predictions) in col_to_predictions.items():
result_df.loc[
work_items[work_item_num]['peptides'],
col
] = predictions
if time.time() - last_write_time_per_column[col] > 180:
write_col(col)
last_write_time_per_column[col] = time.time()
for worker_results in tqdm.tqdm(results, total=len(work_items)):
for (work_item_num, col_to_predictions) in worker_results:
for (col, predictions) in col_to_predictions.items():
result_df.loc[
work_items[work_item_num]['peptides'],
col
] = predictions
if time.time() - last_write_time_per_column[col] > 180:
write_col(col)
last_write_time_per_column[col] = time.time()
print("Done processing. Final write for each column.")
for col in result_df.columns:
......@@ -359,8 +383,14 @@ def run(argv=sys.argv[1:]):
prediction_time / 60.0))
def do_predictions_mhctools(
work_item_num, peptides, alleles, constant_data=None):
def do_predictions_mhctools(work_item_dicts, constant_data=None):
"""
Each tuple of work items consists of:
(work_item_num, peptides, alleles)
"""
# This may run on the cluster in a way that misses all top level imports,
# so we have to re-import everything here.
import time
......@@ -371,28 +401,43 @@ def do_predictions_mhctools(
if constant_data is None:
constant_data = GLOBAL_DATA
cols = constant_data['cols']
predictor_name = constant_data['args'].predictor
if predictor_name == "netmhcpan4":
predictor = mhctools.NetMHCpan4(
alleles=alleles, program_name="netMHCpan-4.0")
else:
raise ValueError("Unsupported", predictor_name)
cols = constant_data['cols']
results = []
for (i, d) in enumerate(work_item_dicts):
work_item_num = d['work_item_num']
peptides = d['peptides']
alleles = d['alleles']
start = time.time()
df = predictor.predict_peptides_dataframe(peptides)
print("Generated predictions for %d peptides x %d alleles in %0.2f sec." % (
len(peptides), len(alleles), (time.time() - start)))
print("Processing work item", i + 1, "of", len(work_item_dicts))
result = {}
results.append((work_item_num, result))
if predictor_name == "netmhcpan4":
predictor = mhctools.NetMHCpan4(
alleles=alleles, program_name="netMHCpan-4.0")
else:
raise ValueError("Unsupported", predictor_name)
start = time.time()
df = predictor.predict_peptides_dataframe(peptides)
print("Predicted for %d peptides x %d alleles in %0.2f sec." % (
len(peptides), len(alleles), (time.time() - start)))
for (allele, sub_df) in df.groupby("allele"):
for col in cols:
result["%s %s" % (allele, col)] = (
sub_df[col].values.astype('float32'))
return results
results = {}
for (allele, sub_df) in df.groupby("allele"):
for col in cols:
results["%s %s" % (allele, col)] = sub_df[col].values.astype('float32')
return (work_item_num, results)
def do_predictions_mhcflurry(work_item_dicts, constant_data=None):
"""
Each dict of work items should have keys: work_item_num, peptides, alleles
"""
def do_predictions_mhcflurry(work_item_num, peptides, alleles, constant_data=None):
# This may run on the cluster in a way that misses all top level imports,
# so we have to re-import everything here.
import time
......@@ -409,22 +454,30 @@ def do_predictions_mhcflurry(work_item_num, peptides, alleles, constant_data=Non
predictor = Class1AffinityPredictor.load(args.mhcflurry_models_dir)
start = time.time()
results = {}
peptides = EncodableSequences.create(peptides)
for (i, allele) in enumerate(alleles):
print("Processing allele %d / %d: %0.2f sec elapsed" % (
i + 1, len(alleles), time.time() - start))
for col in ["affinity"]:
results["%s %s" % (allele, col)] = predictor.predict(
peptides=peptides,
allele=allele,
throw=False,
model_kwargs={
'batch_size': args.mhcflurry_batch_size,
}).astype('float32')
print("Done predicting in", time.time() - start, "sec")
return (work_item_num, results)
results = []
for (i, d) in enumerate(work_item_dicts):
work_item_num = d['work_item_num']
peptides = d['peptides']
alleles = d['alleles']
print("Processing work item", i + 1, "of", len(work_item_dicts))
result = {}
results.append((work_item_num, result))
start = time.time()
peptides = EncodableSequences.create(peptides)
for (i, allele) in enumerate(alleles):
print("Processing allele %d / %d: %0.2f sec elapsed" % (
i + 1, len(alleles), time.time() - start))
for col in ["affinity"]:
result["%s %s" % (allele, col)] = predictor.predict(
peptides=peptides,
allele=allele,
throw=False,
model_kwargs={
'batch_size': args.mhcflurry_batch_size,
}).astype('float32')
print("Done predicting in", time.time() - start, "sec")
return results
if __name__ == '__main__':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment