From 3305b47eac10024c3d61e51542f4c090c9c5f893 Mon Sep 17 00:00:00 2001
From: Tim O'Donnell <timodonnell@gmail.com>
Date: Mon, 30 Sep 2019 17:11:39 -0400
Subject: [PATCH] updates

---
 .../GENERATE.WITH_HPC_CLUSTER.sh              |  68 +++++
 .../data_mass_spec_benchmark/GENERATE.sh      |  43 ++-
 .../cluster_submit_script_header.mssm_hpc.lsf |  36 +++
 .../data_mass_spec_benchmark/run_mhcflurry.py | 252 ++++++++++++++++++
 .../write_allele_list.py                      |  43 +++
 5 files changed, 436 insertions(+), 6 deletions(-)
 create mode 100755 downloads-generation/data_mass_spec_benchmark/GENERATE.WITH_HPC_CLUSTER.sh
 create mode 100644 downloads-generation/data_mass_spec_benchmark/cluster_submit_script_header.mssm_hpc.lsf
 create mode 100644 downloads-generation/data_mass_spec_benchmark/run_mhcflurry.py
 create mode 100644 downloads-generation/data_mass_spec_benchmark/write_allele_list.py

diff --git a/downloads-generation/data_mass_spec_benchmark/GENERATE.WITH_HPC_CLUSTER.sh b/downloads-generation/data_mass_spec_benchmark/GENERATE.WITH_HPC_CLUSTER.sh
new file mode 100755
index 00000000..132d78d3
--- /dev/null
+++ b/downloads-generation/data_mass_spec_benchmark/GENERATE.WITH_HPC_CLUSTER.sh
@@ -0,0 +1,68 @@
+#!/bin/bash
+#
+#
+set -e
+set -x
+
+DOWNLOAD_NAME=data_mass_spec_benchmark
+SCRATCH_DIR=${TMPDIR-/tmp}/mhcflurry-downloads-generation
+SCRIPT_ABSOLUTE_PATH="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/$(basename "${BASH_SOURCE[0]}")"
+SCRIPT_DIR=$(dirname "$SCRIPT_ABSOLUTE_PATH")
+export PYTHONUNBUFFERED=1
+
+mkdir -p "$SCRATCH_DIR"
+rm -rf "$SCRATCH_DIR/$DOWNLOAD_NAME"
+mkdir "$SCRATCH_DIR/$DOWNLOAD_NAME"
+
+# Send stdout and stderr to a logfile included with the archive.
+#exec >  >(tee -ia "$SCRATCH_DIR/$DOWNLOAD_NAME/LOG.txt")
+#exec 2> >(tee -ia "$SCRATCH_DIR/$DOWNLOAD_NAME/LOG.txt" >&2)
+
+# Log some environment info
+date
+pip freeze
+git status
+
+cd $SCRATCH_DIR/$DOWNLOAD_NAME
+
+cp $SCRIPT_DIR/write_proteome_peptides.py .
+cp $SCRIPT_DIR/run_mhcflurry.py .
+cp $SCRIPT_DIR/write_allele_list.py .
+
+
+PEPTIDES=$(mhcflurry-downloads path data_mass_spec_annotated)/annotated_ms.csv.bz2
+REFERENCES_DIR=$(mhcflurry-downloads path data_references)
+
+#python write_proteome_peptides.py \
+#    "$PEPTIDES" \
+#    "${REFERENCES_DIR}/uniprot_proteins.csv.bz2" \
+#    --out proteome_peptides.csv
+#ls -lh proteome_peptides.csv
+#bzip2 proteome_peptides.csv
+ln -s ~/Dropbox/sinai/projects/201808-mhcflurry-pan/20190622-models/proteome_peptides.csv.bz2 proteome_peptides.csv.bz2
+
+python write_allele_list.py "$PEPTIDES" --out alleles.txt
+
+mkdir predictions
+
+for kind in with_mass_spec no_mass_spec
+do
+    python run_mhcflurry.py \
+        proteome_peptides.csv.bz2 \
+        --models-dir "$(mhcflurry-downloads path models_class1_pan)/models.$kind" \
+        --allele $(cat alleles.txt) \
+        --out "predictions/mhcflurry.$kind" \
+        --verbosity 1 \
+        --worker-log-dir "$SCRATCH_DIR/$DOWNLOAD_NAME" \
+        --cluster-parallelism \
+        --cluster-max-retries 15 \
+        --cluster-submit-command bsub \
+        --cluster-results-workdir ~/mhcflurry-scratch \
+        --cluster-script-prefix-path $SCRIPT_DIR/cluster_submit_script_header.mssm_hpc.lsf
+done
+
+cp $SCRIPT_ABSOLUTE_PATH .
+bzip2 LOG.txt
+RESULT="$SCRATCH_DIR/${DOWNLOAD_NAME}.$(date +%Y%m%d).tar.bz2"
+tar -cjf "$RESULT" *
+echo "Created archive: $RESULT"
diff --git a/downloads-generation/data_mass_spec_benchmark/GENERATE.sh b/downloads-generation/data_mass_spec_benchmark/GENERATE.sh
index 3995b089..a3fa7887 100755
--- a/downloads-generation/data_mass_spec_benchmark/GENERATE.sh
+++ b/downloads-generation/data_mass_spec_benchmark/GENERATE.sh
@@ -26,16 +26,47 @@ git status
 cd $SCRATCH_DIR/$DOWNLOAD_NAME
 
 cp $SCRIPT_DIR/write_proteome_peptides.py .
+cp $SCRIPT_DIR/run_mhcflurry.py .
+cp $SCRIPT_DIR/write_allele_list.py .
+
+GPUS=$(nvidia-smi -L 2> /dev/null | wc -l) || GPUS=0
+echo "Detected GPUS: $GPUS"
+
+PROCESSORS=$(getconf _NPROCESSORS_ONLN)
+echo "Detected processors: $PROCESSORS"
+
+if [ "$GPUS" -eq "0" ]; then
+   NUM_JOBS=${NUM_JOBS-1}
+else
+    NUM_JOBS=${NUM_JOBS-$GPUS}
+fi
+echo "Num jobs: $NUM_JOBS"
+
 
 PEPTIDES=$(mhcflurry-downloads path data_mass_spec_annotated)/annotated_ms.csv.bz2
 REFERENCES_DIR=$(mhcflurry-downloads path data_references)
 
-python write_proteome_peptides.py \
-    "$PEPTIDES" \
-    "${REFERENCES_DIR}/uniprot_proteins.csv.bz2" \
-    --out proteome_peptides.csv
-ls -lh proteome_peptides.csv
-bzip2 proteome_peptides.csv
+#python write_proteome_peptides.py \
+#    "$PEPTIDES" \
+#    "${REFERENCES_DIR}/uniprot_proteins.csv.bz2" \
+#    --out proteome_peptides.csv
+#ls -lh proteome_peptides.csv
+#bzip2 proteome_peptides.csv
+ln -s ~/Dropbox/sinai/projects/201808-mhcflurry-pan/20190622-models/proteome_peptides.csv.bz2 proteome_peptides.csv.bz2
+
+python write_allele_list.py "$PEPTIDES" --out alleles.txt
+
+mkdir predictions
+
+for kind in with_mass_spec no_mass_spec
+do
+    python run_mhcflurry.py \
+        proteome_peptides.csv.bz2 \
+        --models-dir "$(mhcflurry-downloads path models_class1_pan)/models.$kind" \
+        --allele $(cat alleles.txt) \
+        --out "predictions/mhcflurry.$kind" \
+        --num-jobs $NUM_JOBS --max-tasks-per-worker 1 --gpus $GPUS --max-workers-per-gpu 1
+done
 
 cp $SCRIPT_ABSOLUTE_PATH .
 bzip2 LOG.txt
diff --git a/downloads-generation/data_mass_spec_benchmark/cluster_submit_script_header.mssm_hpc.lsf b/downloads-generation/data_mass_spec_benchmark/cluster_submit_script_header.mssm_hpc.lsf
new file mode 100644
index 00000000..efa3d10e
--- /dev/null
+++ b/downloads-generation/data_mass_spec_benchmark/cluster_submit_script_header.mssm_hpc.lsf
@@ -0,0 +1,36 @@
+#!/bin/bash
+#BSUB -J MHCf-{work_item_num} # Job name
+#BSUB -P acc_nkcancer # allocation account or Unix group
+#BSUB -q gpu # queue
+#BSUB -R rusage[ngpus_excl_p=1]  # 1 exclusive GPU
+#BSUB -R span[hosts=1] # one node
+#BSUB -n 1 # number of compute cores
+#BSUB -W 46:00 # walltime in HH:MM
+#BSUB -R rusage[mem=30000] # mb memory requested
+#BSUB -o {work_dir}/%J.stdout # output log (%J : JobID)
+#BSUB -eo {work_dir}/STDERR # error log
+#BSUB -L /bin/bash # Initialize the execution environment
+#
+
+set -e
+set -x
+
+echo "Subsequent stderr output redirected to stdout" >&2
+exec 2>&1
+
+export TMPDIR=/local/JOBS/mhcflurry-{work_item_num}
+export PATH=$HOME/.conda/envs/py36b/bin/:$PATH
+export PYTHONUNBUFFERED=1
+export KMP_SETTINGS=1
+
+free -m
+
+module add cuda/10.0.130 cudnn/7.1.1
+module list
+
+# python -c 'import tensorflow as tf ; print("GPU AVAILABLE" if tf.test.is_gpu_available() else "GPU NOT AVAILABLE")'
+
+env
+
+cd {work_dir}
+
diff --git a/downloads-generation/data_mass_spec_benchmark/run_mhcflurry.py b/downloads-generation/data_mass_spec_benchmark/run_mhcflurry.py
new file mode 100644
index 00000000..e6ed769b
--- /dev/null
+++ b/downloads-generation/data_mass_spec_benchmark/run_mhcflurry.py
@@ -0,0 +1,252 @@
+"""
+"""
+import argparse
+import os
+import signal
+import sys
+import time
+import traceback
+import collections
+from functools import partial
+
+import numpy
+import pandas
+
+from mhcnames import normalize_allele_name
+import tqdm  # progress bar
+tqdm.monitor_interval = 0  # see https://github.com/tqdm/tqdm/issues/481
+
+from mhcflurry.class1_affinity_predictor import Class1AffinityPredictor
+from mhcflurry.encodable_sequences import EncodableSequences
+from mhcflurry.common import configure_logging, random_peptides, amino_acid_distribution
+from mhcflurry.local_parallelism import (
+    add_local_parallelism_args,
+    worker_pool_with_gpu_assignments_from_args,
+    call_wrapped_kwargs)
+from mhcflurry.cluster_parallelism import (
+    add_cluster_parallelism_args,
+    cluster_results_from_args)
+
+
+# To avoid pickling large matrices to send to child processes when running in
+# parallel, we use this global variable as a place to store data. Data that is
+# stored here before creating the thread pool will be inherited to the child
+# processes upon fork() call, allowing us to share large data with the workers
+# via shared memory.
+GLOBAL_DATA = {}
+
+parser = argparse.ArgumentParser(usage=__doc__)
+
+parser.add_argument(
+    "input_peptides",
+    metavar="CSV",
+    help="CSV file with 'peptide' column")
+parser.add_argument(
+    "--models-dir",
+    metavar="DIR",
+    required=True,
+    help="Directory to read MHCflurry models")
+parser.add_argument(
+    "--allele",
+    default=None,
+    required=True,
+    nargs="+",
+    help="Alleles to predict")
+parser.add_argument(
+    "--chunk-size",
+    type=int,
+    default=1000000,
+    help="Num peptides per job. Default: %(default)s")
+parser.add_argument(
+    "--batch-size",
+    type=int,
+    default=4096,
+    help="Keras batch size for predictions. Default: %(default)s")
+parser.add_argument(
+    "--reuse-results",
+    metavar="DIR",
+    help="Reuse results from DIR")
+parser.add_argument(
+    "--out",
+    metavar="DIR",
+    help="Write results to DIR")
+parser.add_argument(
+    "--verbosity",
+    type=int,
+    help="Keras verbosity. Default: %(default)s",
+    default=0)
+
+add_local_parallelism_args(parser)
+add_cluster_parallelism_args(parser)
+
+
+def run(argv=sys.argv[1:]):
+    global GLOBAL_DATA
+
+    # On sigusr1 print stack trace
+    print("To show stack trace, run:\nkill -s USR1 %d" % os.getpid())
+    signal.signal(signal.SIGUSR1, lambda sig, frame: traceback.print_stack())
+
+    args = parser.parse_args(argv)
+
+    args.models_dir = os.path.abspath(args.models_dir)
+
+    configure_logging(verbose=args.verbosity > 1)
+
+    serial_run = not args.cluster_parallelism and args.num_jobs == 0
+
+    # It's important that we don't trigger a Keras import here since that breaks
+    # local parallelism (tensorflow backend). So we set optimization_level=0 if
+    # using local parallelism.
+    predictor = Class1AffinityPredictor.load(
+        args.models_dir,
+        optimization_level=None if serial_run or args.cluster_parallelism else 0,
+    )
+
+    alleles = [normalize_allele_name(a) for a in args.allele]
+    alleles = sorted(set(alleles))
+
+    peptides = pandas.read_csv(args.input_peptides).peptide.unique()
+    num_peptides = len(peptides)
+
+    print("Predictions for %d alleles x %d peptides." % (
+        len(alleles), num_peptides))
+
+    if not os.path.exists(args.out):
+        print("Creating", args.out)
+        os.mkdir(args.out)
+
+    # Write peptide and allele lists to out dir.
+    out_peptides = os.path.abspath(os.path.join(args.out, "peptides.csv"))
+    pandas.DataFrame({"peptide": peptides}).to_csv(out_peptides, index=False)
+    print("Wrote: ", out_peptides)
+    allele_to_file_path = dict(
+        (allele, "%s.npz" % (allele.replace("*", ""))) for allele in alleles)
+    out_alleles = os.path.abspath(os.path.join(args.out, "alleles.csv"))
+    pandas.DataFrame({
+        'allele': alleles,
+        'path': [allele_to_file_path[allele] for allele in alleles],
+    }).to_csv(out_alleles, index=False)
+    print("Wrote: ", out_alleles)
+
+    num_chunks = int(len(peptides) / args.chunk_size)
+    print("Split peptides into %d chunks" % num_chunks)
+    peptide_chunks = numpy.array_split(peptides, num_chunks)
+
+    GLOBAL_DATA["predictor"] = predictor
+    GLOBAL_DATA["args"] = {
+        'verbose': args.verbosity > 0,
+        'model_kwargs': {
+            'batch_size': args.prediction_batch_size,
+        }
+    }
+
+    work_items = []
+    for allele in alleles:
+        for (chunk_index, chunk_peptides) in enumerate(peptide_chunks):
+            work_item = {
+                'allele': allele,
+                'chunk_index': chunk_index,
+                'chunk_peptides': chunk_peptides,
+            }
+            work_items.append(work_item)
+    print("Work items: ", len(work_items))
+
+    worker_pool = None
+    start = time.time()
+    if serial_run:
+        # Serial run
+        print("Running in serial.")
+        results = (
+            do_predictions(**item) for item in work_items)
+    elif args.cluster_parallelism:
+        # Run using separate processes HPC cluster.
+        print("Running on cluster.")
+        results = cluster_results_from_args(
+            args,
+            work_function=do_predictions,
+            work_items=work_items,
+            constant_data=GLOBAL_DATA,
+            result_serialization_method="pickle",
+            clear_constant_data=True)
+    else:
+        worker_pool = worker_pool_with_gpu_assignments_from_args(args)
+        print("Worker pool", worker_pool)
+        assert worker_pool is not None
+        results = worker_pool.imap_unordered(
+            partial(call_wrapped_kwargs, do_predictions),
+            work_items,
+            chunksize=1)
+
+    allele_to_chunk_index_to_predictions = {}
+    for allele in alleles:
+        allele_to_chunk_index_to_predictions[allele] = {}
+
+    for (allele, chunk_index, predictions) in tqdm.tqdm(
+            results, total=len(work_items)):
+
+        chunk_index_to_predictions = allele_to_chunk_index_to_predictions[allele]
+
+        assert chunk_index not in chunk_index_to_predictions
+        chunk_index_to_predictions[chunk_index] = predictions
+
+        if len(allele_to_chunk_index_to_predictions[allele]) == num_chunks:
+            chunk_predictions = sorted(chunk_index_to_predictions.items())
+            assert [i for (i, _) in chunk_predictions] == list(range(num_chunks))
+            predictions = numpy.concatenate([
+                predictions for (_, predictions) in chunk_predictions
+            ])
+            assert len(predictions) == num_peptides
+            out_path = os.path.join(args.out, allele.replace("*", "")) + ".npz"
+            out_path = os.path.abspath(out_path)
+            numpy.savez(out_path, predictions)
+            print("Wrote:", out_path)
+
+            del allele_to_chunk_index_to_predictions[allele]
+
+    assert not allele_to_chunk_index_to_predictions, (
+        "Not all results written: ", allele_to_chunk_index_to_predictions)
+
+    if worker_pool:
+        worker_pool.close()
+        worker_pool.join()
+
+    prediction_time = time.time() - start
+    print("Done generating predictions in %0.2f min." % (
+        prediction_time / 60.0))
+
+
+def do_predictions(allele, chunk_index, chunk_peptides, constant_data=GLOBAL_DATA):
+    return predict_for_allele(
+        allele,
+        chunk_index,
+        chunk_peptides,
+        constant_data['predictor'],
+        **constant_data["args"])
+
+
+def predict_for_allele(
+        allele,
+        chunk_index,
+        chunk_peptides,
+        predictor,
+        verbose=False,
+        model_kwargs={}):
+    if verbose:
+        print("Predicting", allele)
+    predictor.optimize()  # since we may have loaded with optimization_level=0
+    start = time.time()
+    predictions = predictor.predict(
+        peptides=chunk_peptides,
+        allele=allele,
+        verbose=verbose,
+        throw=False,
+        model_kwargs=model_kwargs).astype('float32')
+    if verbose:
+        print("Done predicting", allele, "in", time.time() - start, "sec")
+
+    return (allele, chunk_index, predictions)
+
+
+if __name__ == '__main__':
+    run()
diff --git a/downloads-generation/data_mass_spec_benchmark/write_allele_list.py b/downloads-generation/data_mass_spec_benchmark/write_allele_list.py
new file mode 100644
index 00000000..39712e58
--- /dev/null
+++ b/downloads-generation/data_mass_spec_benchmark/write_allele_list.py
@@ -0,0 +1,43 @@
+"""
+"""
+import sys
+import argparse
+import os
+
+import pandas
+import tqdm  # progress bar
+tqdm.monitor_interval = 0  # see https://github.com/tqdm/tqdm/issues/481
+
+
+parser = argparse.ArgumentParser(usage=__doc__)
+
+parser.add_argument(
+    "input",
+    metavar="FILE.csv",
+    help="CSV of annotated mass spec hits")
+parser.add_argument(
+    "--out",
+    metavar="OUT.txt",
+    help="Out file path")
+
+
+def run():
+    args = parser.parse_args(sys.argv[1:])
+
+    df = pandas.read_csv(args.input)
+    print("Read peptides", df.shape, *df.columns.tolist())
+
+    df = df.loc[df.mhc_class == "I"]
+
+    hla_sets = df.hla.unique()
+    all_hla = set()
+    for hla_set in hla_sets:
+        all_hla.update(hla_set.split())
+
+    all_hla = pandas.Series(sorted(all_hla))
+    all_hla.to_csv(args.out, index=False, header=False)
+    print("Wrote [%d alleles]: %s" % (len(all_hla), os.path.abspath(args.out)))
+
+
+if __name__ == '__main__':
+    run()
-- 
GitLab