diff --git a/downloads-generation/data_mass_spec_benchmark/GENERATE.WITH_HPC_CLUSTER.sh b/downloads-generation/data_mass_spec_benchmark/GENERATE.WITH_HPC_CLUSTER.sh new file mode 100755 index 0000000000000000000000000000000000000000..132d78d3ad4040c16f104c10d9a9ab1cc70d883e --- /dev/null +++ b/downloads-generation/data_mass_spec_benchmark/GENERATE.WITH_HPC_CLUSTER.sh @@ -0,0 +1,68 @@ +#!/bin/bash +# +# +set -e +set -x + +DOWNLOAD_NAME=data_mass_spec_benchmark +SCRATCH_DIR=${TMPDIR-/tmp}/mhcflurry-downloads-generation +SCRIPT_ABSOLUTE_PATH="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/$(basename "${BASH_SOURCE[0]}")" +SCRIPT_DIR=$(dirname "$SCRIPT_ABSOLUTE_PATH") +export PYTHONUNBUFFERED=1 + +mkdir -p "$SCRATCH_DIR" +rm -rf "$SCRATCH_DIR/$DOWNLOAD_NAME" +mkdir "$SCRATCH_DIR/$DOWNLOAD_NAME" + +# Send stdout and stderr to a logfile included with the archive. +#exec > >(tee -ia "$SCRATCH_DIR/$DOWNLOAD_NAME/LOG.txt") +#exec 2> >(tee -ia "$SCRATCH_DIR/$DOWNLOAD_NAME/LOG.txt" >&2) + +# Log some environment info +date +pip freeze +git status + +cd $SCRATCH_DIR/$DOWNLOAD_NAME + +cp $SCRIPT_DIR/write_proteome_peptides.py . +cp $SCRIPT_DIR/run_mhcflurry.py . +cp $SCRIPT_DIR/write_allele_list.py . + + +PEPTIDES=$(mhcflurry-downloads path data_mass_spec_annotated)/annotated_ms.csv.bz2 +REFERENCES_DIR=$(mhcflurry-downloads path data_references) + +#python write_proteome_peptides.py \ +# "$PEPTIDES" \ +# "${REFERENCES_DIR}/uniprot_proteins.csv.bz2" \ +# --out proteome_peptides.csv +#ls -lh proteome_peptides.csv +#bzip2 proteome_peptides.csv +ln -s ~/Dropbox/sinai/projects/201808-mhcflurry-pan/20190622-models/proteome_peptides.csv.bz2 proteome_peptides.csv.bz2 + +python write_allele_list.py "$PEPTIDES" --out alleles.txt + +mkdir predictions + +for kind in with_mass_spec no_mass_spec +do + python run_mhcflurry.py \ + proteome_peptides.csv.bz2 \ + --models-dir "$(mhcflurry-downloads path models_class1_pan)/models.$kind" \ + --allele $(cat alleles.txt) \ + --out "predictions/mhcflurry.$kind" \ + --verbosity 1 \ + --worker-log-dir "$SCRATCH_DIR/$DOWNLOAD_NAME" \ + --cluster-parallelism \ + --cluster-max-retries 15 \ + --cluster-submit-command bsub \ + --cluster-results-workdir ~/mhcflurry-scratch \ + --cluster-script-prefix-path $SCRIPT_DIR/cluster_submit_script_header.mssm_hpc.lsf +done + +cp $SCRIPT_ABSOLUTE_PATH . +bzip2 LOG.txt +RESULT="$SCRATCH_DIR/${DOWNLOAD_NAME}.$(date +%Y%m%d).tar.bz2" +tar -cjf "$RESULT" * +echo "Created archive: $RESULT" diff --git a/downloads-generation/data_mass_spec_benchmark/GENERATE.sh b/downloads-generation/data_mass_spec_benchmark/GENERATE.sh index 3995b089ae75ccf5d97450b1c01d48a7cc920d22..a3fa7887acab3b663d62505a709bb19014d8a000 100755 --- a/downloads-generation/data_mass_spec_benchmark/GENERATE.sh +++ b/downloads-generation/data_mass_spec_benchmark/GENERATE.sh @@ -26,16 +26,47 @@ git status cd $SCRATCH_DIR/$DOWNLOAD_NAME cp $SCRIPT_DIR/write_proteome_peptides.py . +cp $SCRIPT_DIR/run_mhcflurry.py . +cp $SCRIPT_DIR/write_allele_list.py . + +GPUS=$(nvidia-smi -L 2> /dev/null | wc -l) || GPUS=0 +echo "Detected GPUS: $GPUS" + +PROCESSORS=$(getconf _NPROCESSORS_ONLN) +echo "Detected processors: $PROCESSORS" + +if [ "$GPUS" -eq "0" ]; then + NUM_JOBS=${NUM_JOBS-1} +else + NUM_JOBS=${NUM_JOBS-$GPUS} +fi +echo "Num jobs: $NUM_JOBS" + PEPTIDES=$(mhcflurry-downloads path data_mass_spec_annotated)/annotated_ms.csv.bz2 REFERENCES_DIR=$(mhcflurry-downloads path data_references) -python write_proteome_peptides.py \ - "$PEPTIDES" \ - "${REFERENCES_DIR}/uniprot_proteins.csv.bz2" \ - --out proteome_peptides.csv -ls -lh proteome_peptides.csv -bzip2 proteome_peptides.csv +#python write_proteome_peptides.py \ +# "$PEPTIDES" \ +# "${REFERENCES_DIR}/uniprot_proteins.csv.bz2" \ +# --out proteome_peptides.csv +#ls -lh proteome_peptides.csv +#bzip2 proteome_peptides.csv +ln -s ~/Dropbox/sinai/projects/201808-mhcflurry-pan/20190622-models/proteome_peptides.csv.bz2 proteome_peptides.csv.bz2 + +python write_allele_list.py "$PEPTIDES" --out alleles.txt + +mkdir predictions + +for kind in with_mass_spec no_mass_spec +do + python run_mhcflurry.py \ + proteome_peptides.csv.bz2 \ + --models-dir "$(mhcflurry-downloads path models_class1_pan)/models.$kind" \ + --allele $(cat alleles.txt) \ + --out "predictions/mhcflurry.$kind" \ + --num-jobs $NUM_JOBS --max-tasks-per-worker 1 --gpus $GPUS --max-workers-per-gpu 1 +done cp $SCRIPT_ABSOLUTE_PATH . bzip2 LOG.txt diff --git a/downloads-generation/data_mass_spec_benchmark/cluster_submit_script_header.mssm_hpc.lsf b/downloads-generation/data_mass_spec_benchmark/cluster_submit_script_header.mssm_hpc.lsf new file mode 100644 index 0000000000000000000000000000000000000000..efa3d10e75a65c830a3134387eb5f4fdd4136668 --- /dev/null +++ b/downloads-generation/data_mass_spec_benchmark/cluster_submit_script_header.mssm_hpc.lsf @@ -0,0 +1,36 @@ +#!/bin/bash +#BSUB -J MHCf-{work_item_num} # Job name +#BSUB -P acc_nkcancer # allocation account or Unix group +#BSUB -q gpu # queue +#BSUB -R rusage[ngpus_excl_p=1] # 1 exclusive GPU +#BSUB -R span[hosts=1] # one node +#BSUB -n 1 # number of compute cores +#BSUB -W 46:00 # walltime in HH:MM +#BSUB -R rusage[mem=30000] # mb memory requested +#BSUB -o {work_dir}/%J.stdout # output log (%J : JobID) +#BSUB -eo {work_dir}/STDERR # error log +#BSUB -L /bin/bash # Initialize the execution environment +# + +set -e +set -x + +echo "Subsequent stderr output redirected to stdout" >&2 +exec 2>&1 + +export TMPDIR=/local/JOBS/mhcflurry-{work_item_num} +export PATH=$HOME/.conda/envs/py36b/bin/:$PATH +export PYTHONUNBUFFERED=1 +export KMP_SETTINGS=1 + +free -m + +module add cuda/10.0.130 cudnn/7.1.1 +module list + +# python -c 'import tensorflow as tf ; print("GPU AVAILABLE" if tf.test.is_gpu_available() else "GPU NOT AVAILABLE")' + +env + +cd {work_dir} + diff --git a/downloads-generation/data_mass_spec_benchmark/run_mhcflurry.py b/downloads-generation/data_mass_spec_benchmark/run_mhcflurry.py new file mode 100644 index 0000000000000000000000000000000000000000..e6ed769b917b119d48b859626f43e6a97c02cb5c --- /dev/null +++ b/downloads-generation/data_mass_spec_benchmark/run_mhcflurry.py @@ -0,0 +1,252 @@ +""" +""" +import argparse +import os +import signal +import sys +import time +import traceback +import collections +from functools import partial + +import numpy +import pandas + +from mhcnames import normalize_allele_name +import tqdm # progress bar +tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481 + +from mhcflurry.class1_affinity_predictor import Class1AffinityPredictor +from mhcflurry.encodable_sequences import EncodableSequences +from mhcflurry.common import configure_logging, random_peptides, amino_acid_distribution +from mhcflurry.local_parallelism import ( + add_local_parallelism_args, + worker_pool_with_gpu_assignments_from_args, + call_wrapped_kwargs) +from mhcflurry.cluster_parallelism import ( + add_cluster_parallelism_args, + cluster_results_from_args) + + +# To avoid pickling large matrices to send to child processes when running in +# parallel, we use this global variable as a place to store data. Data that is +# stored here before creating the thread pool will be inherited to the child +# processes upon fork() call, allowing us to share large data with the workers +# via shared memory. +GLOBAL_DATA = {} + +parser = argparse.ArgumentParser(usage=__doc__) + +parser.add_argument( + "input_peptides", + metavar="CSV", + help="CSV file with 'peptide' column") +parser.add_argument( + "--models-dir", + metavar="DIR", + required=True, + help="Directory to read MHCflurry models") +parser.add_argument( + "--allele", + default=None, + required=True, + nargs="+", + help="Alleles to predict") +parser.add_argument( + "--chunk-size", + type=int, + default=1000000, + help="Num peptides per job. Default: %(default)s") +parser.add_argument( + "--batch-size", + type=int, + default=4096, + help="Keras batch size for predictions. Default: %(default)s") +parser.add_argument( + "--reuse-results", + metavar="DIR", + help="Reuse results from DIR") +parser.add_argument( + "--out", + metavar="DIR", + help="Write results to DIR") +parser.add_argument( + "--verbosity", + type=int, + help="Keras verbosity. Default: %(default)s", + default=0) + +add_local_parallelism_args(parser) +add_cluster_parallelism_args(parser) + + +def run(argv=sys.argv[1:]): + global GLOBAL_DATA + + # On sigusr1 print stack trace + print("To show stack trace, run:\nkill -s USR1 %d" % os.getpid()) + signal.signal(signal.SIGUSR1, lambda sig, frame: traceback.print_stack()) + + args = parser.parse_args(argv) + + args.models_dir = os.path.abspath(args.models_dir) + + configure_logging(verbose=args.verbosity > 1) + + serial_run = not args.cluster_parallelism and args.num_jobs == 0 + + # It's important that we don't trigger a Keras import here since that breaks + # local parallelism (tensorflow backend). So we set optimization_level=0 if + # using local parallelism. + predictor = Class1AffinityPredictor.load( + args.models_dir, + optimization_level=None if serial_run or args.cluster_parallelism else 0, + ) + + alleles = [normalize_allele_name(a) for a in args.allele] + alleles = sorted(set(alleles)) + + peptides = pandas.read_csv(args.input_peptides).peptide.unique() + num_peptides = len(peptides) + + print("Predictions for %d alleles x %d peptides." % ( + len(alleles), num_peptides)) + + if not os.path.exists(args.out): + print("Creating", args.out) + os.mkdir(args.out) + + # Write peptide and allele lists to out dir. + out_peptides = os.path.abspath(os.path.join(args.out, "peptides.csv")) + pandas.DataFrame({"peptide": peptides}).to_csv(out_peptides, index=False) + print("Wrote: ", out_peptides) + allele_to_file_path = dict( + (allele, "%s.npz" % (allele.replace("*", ""))) for allele in alleles) + out_alleles = os.path.abspath(os.path.join(args.out, "alleles.csv")) + pandas.DataFrame({ + 'allele': alleles, + 'path': [allele_to_file_path[allele] for allele in alleles], + }).to_csv(out_alleles, index=False) + print("Wrote: ", out_alleles) + + num_chunks = int(len(peptides) / args.chunk_size) + print("Split peptides into %d chunks" % num_chunks) + peptide_chunks = numpy.array_split(peptides, num_chunks) + + GLOBAL_DATA["predictor"] = predictor + GLOBAL_DATA["args"] = { + 'verbose': args.verbosity > 0, + 'model_kwargs': { + 'batch_size': args.prediction_batch_size, + } + } + + work_items = [] + for allele in alleles: + for (chunk_index, chunk_peptides) in enumerate(peptide_chunks): + work_item = { + 'allele': allele, + 'chunk_index': chunk_index, + 'chunk_peptides': chunk_peptides, + } + work_items.append(work_item) + print("Work items: ", len(work_items)) + + worker_pool = None + start = time.time() + if serial_run: + # Serial run + print("Running in serial.") + results = ( + do_predictions(**item) for item in work_items) + elif args.cluster_parallelism: + # Run using separate processes HPC cluster. + print("Running on cluster.") + results = cluster_results_from_args( + args, + work_function=do_predictions, + work_items=work_items, + constant_data=GLOBAL_DATA, + result_serialization_method="pickle", + clear_constant_data=True) + else: + worker_pool = worker_pool_with_gpu_assignments_from_args(args) + print("Worker pool", worker_pool) + assert worker_pool is not None + results = worker_pool.imap_unordered( + partial(call_wrapped_kwargs, do_predictions), + work_items, + chunksize=1) + + allele_to_chunk_index_to_predictions = {} + for allele in alleles: + allele_to_chunk_index_to_predictions[allele] = {} + + for (allele, chunk_index, predictions) in tqdm.tqdm( + results, total=len(work_items)): + + chunk_index_to_predictions = allele_to_chunk_index_to_predictions[allele] + + assert chunk_index not in chunk_index_to_predictions + chunk_index_to_predictions[chunk_index] = predictions + + if len(allele_to_chunk_index_to_predictions[allele]) == num_chunks: + chunk_predictions = sorted(chunk_index_to_predictions.items()) + assert [i for (i, _) in chunk_predictions] == list(range(num_chunks)) + predictions = numpy.concatenate([ + predictions for (_, predictions) in chunk_predictions + ]) + assert len(predictions) == num_peptides + out_path = os.path.join(args.out, allele.replace("*", "")) + ".npz" + out_path = os.path.abspath(out_path) + numpy.savez(out_path, predictions) + print("Wrote:", out_path) + + del allele_to_chunk_index_to_predictions[allele] + + assert not allele_to_chunk_index_to_predictions, ( + "Not all results written: ", allele_to_chunk_index_to_predictions) + + if worker_pool: + worker_pool.close() + worker_pool.join() + + prediction_time = time.time() - start + print("Done generating predictions in %0.2f min." % ( + prediction_time / 60.0)) + + +def do_predictions(allele, chunk_index, chunk_peptides, constant_data=GLOBAL_DATA): + return predict_for_allele( + allele, + chunk_index, + chunk_peptides, + constant_data['predictor'], + **constant_data["args"]) + + +def predict_for_allele( + allele, + chunk_index, + chunk_peptides, + predictor, + verbose=False, + model_kwargs={}): + if verbose: + print("Predicting", allele) + predictor.optimize() # since we may have loaded with optimization_level=0 + start = time.time() + predictions = predictor.predict( + peptides=chunk_peptides, + allele=allele, + verbose=verbose, + throw=False, + model_kwargs=model_kwargs).astype('float32') + if verbose: + print("Done predicting", allele, "in", time.time() - start, "sec") + + return (allele, chunk_index, predictions) + + +if __name__ == '__main__': + run() diff --git a/downloads-generation/data_mass_spec_benchmark/write_allele_list.py b/downloads-generation/data_mass_spec_benchmark/write_allele_list.py new file mode 100644 index 0000000000000000000000000000000000000000..39712e5832b5267a68a08a106a9eb31f4a477b4b --- /dev/null +++ b/downloads-generation/data_mass_spec_benchmark/write_allele_list.py @@ -0,0 +1,43 @@ +""" +""" +import sys +import argparse +import os + +import pandas +import tqdm # progress bar +tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481 + + +parser = argparse.ArgumentParser(usage=__doc__) + +parser.add_argument( + "input", + metavar="FILE.csv", + help="CSV of annotated mass spec hits") +parser.add_argument( + "--out", + metavar="OUT.txt", + help="Out file path") + + +def run(): + args = parser.parse_args(sys.argv[1:]) + + df = pandas.read_csv(args.input) + print("Read peptides", df.shape, *df.columns.tolist()) + + df = df.loc[df.mhc_class == "I"] + + hla_sets = df.hla.unique() + all_hla = set() + for hla_set in hla_sets: + all_hla.update(hla_set.split()) + + all_hla = pandas.Series(sorted(all_hla)) + all_hla.to_csv(args.out, index=False, header=False) + print("Wrote [%d alleles]: %s" % (len(all_hla), os.path.abspath(args.out))) + + +if __name__ == '__main__': + run()