Skip to content
Snippets Groups Projects
Commit f56a579c authored by Tim O'Donnell's avatar Tim O'Donnell
Browse files

cluster parallelism

parent b82832a1
No related branches found
No related tags found
No related merge requests found
#!/bin/bash
#
# Train pan-allele MHCflurry Class I models.
#
# Uses an HPC cluster (Mount Sinai chimera cluster, which uses lsf job
# scheduler). This would need to be modified for other sites.
#
set -e
set -x
DOWNLOAD_NAME=models_class1_pan_unselected
SCRATCH_DIR=${TMPDIR-/tmp}/mhcflurry-downloads-generation
SCRIPT_ABSOLUTE_PATH="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/$(basename "${BASH_SOURCE[0]}")"
SCRIPT_DIR=$(dirname "$SCRIPT_ABSOLUTE_PATH")
mkdir -p "$SCRATCH_DIR"
rm -rf "$SCRATCH_DIR/$DOWNLOAD_NAME"
mkdir "$SCRATCH_DIR/$DOWNLOAD_NAME"
# Send stdout and stderr to a logfile included with the archive.
exec > >(tee -ia "$SCRATCH_DIR/$DOWNLOAD_NAME/LOG.txt")
exec 2> >(tee -ia "$SCRATCH_DIR/$DOWNLOAD_NAME/LOG.txt" >&2)
# Log some environment info
date
pip freeze
git status
cd $SCRATCH_DIR/$DOWNLOAD_NAME
cp $SCRIPT_DIR/generate_hyperparameters.py .
python generate_hyperparameters.py > hyperparameters.yaml
for kind in with_mass_spec no_mass_spec
do
mhcflurry-class1-train-pan-allele-models \
--data "$(mhcflurry-downloads path data_curated)/curated_training_data.${kind}.csv.bz2" \
--allele-sequences "$(mhcflurry-downloads path allele_sequences)/allele_sequences.csv" \
--pretrain-data "$(mhcflurry-downloads path random_peptide_predictions)/predictions.csv.bz2" \
--held-out-measurements-per-allele-fraction-and-max 0.25 100 \
--ensemble-size 4 \
--hyperparameters hyperparameters.yaml \
--out-models-dir models.${kind} \
--worker-log-dir "$SCRATCH_DIR/$DOWNLOAD_NAME" \
--verbosity 1 \
--cluster-parallelism \
--cluster-submit-command bsub \
--cluster-results-workdir ~/mhcflurry-model-training \
--cluster-script-prefix-path ./cluster_submit_script_header.mssm_hpc.lsf
done
cp $SCRIPT_ABSOLUTE_PATH .
bzip2 LOG.txt
for i in $(ls LOG-worker.*.txt) ; do bzip2 $i ; done
tar -cjf "../${DOWNLOAD_NAME}.tar.bz2" *
echo "Created archive: $SCRATCH_DIR/${DOWNLOAD_NAME}.tar.bz2"
#!/bin/bash
#BSUB -J mhcf-{work_item_num} # Job name
#BSUB -P acc_nkcancer # allocation account or Unix group
#BSUB -q gpu # queue
#BSUB -R span[hosts=1] # one node
#BSUB -n 1 # number of compute cores
#BSUB -W 36:00 # walltime in HH:MM
#BSUB -R rusage[mem=60000] # mb memory requested
#BSUB -o {work_dir}/%J.stdout # output log (%J : JobID)
#BSUB -eo {work_dir}/%J.stderr # error log
#BSUB -L /bin/bash # Initialize the execution environment
set -e
free -m
cd {work_dir}
......@@ -15,8 +15,8 @@ tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481
from .class1_affinity_predictor import Class1AffinityPredictor
from .common import configure_logging
from .parallelism import (
add_worker_pool_args,
from .local_parallelism import (
add_local_parallelism_args,
worker_pool_with_gpu_assignments_from_args,
call_wrapped)
......@@ -54,7 +54,7 @@ parser.add_argument(
help="Keras verbosity. Default: %(default)s",
default=0)
add_worker_pool_args(parser)
add_local_parallelism_args(parser)
def run(argv=sys.argv[1:]):
global GLOBAL_DATA
......
......@@ -13,8 +13,8 @@ import numpy
from .common import set_keras_backend
def add_worker_pool_args(parser):
group = parser.add_argument_group("Worker pool")
def add_local_parallelism_args(parser):
group = parser.add_argument_group("Local parallelism")
group.add_argument(
"--num-jobs",
......
......@@ -22,7 +22,7 @@ tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481
from .class1_affinity_predictor import Class1AffinityPredictor
from .encodable_sequences import EncodableSequences
from .common import configure_logging, random_peptides
from .parallelism import worker_pool_with_gpu_assignments_from_args, add_worker_pool_args
from .local_parallelism import worker_pool_with_gpu_assignments_from_args, add_local_parallelism_args
from .regression_target import from_ic50
......@@ -176,7 +176,7 @@ parser.add_argument(
help="Keras verbosity. Default: %(default)s",
default=0)
add_worker_pool_args(parser)
add_local_parallelism_args(parser)
def run(argv=sys.argv[1:]):
......
......@@ -22,7 +22,7 @@ from .class1_affinity_predictor import Class1AffinityPredictor
from .encodable_sequences import EncodableSequences
from .allele_encoding import AlleleEncoding
from .common import configure_logging, random_peptides
from .parallelism import worker_pool_with_gpu_assignments_from_args, add_worker_pool_args
from .local_parallelism import worker_pool_with_gpu_assignments_from_args, add_local_parallelism_args
from .regression_target import from_ic50
......@@ -82,7 +82,7 @@ parser.add_argument(
help="Keras verbosity. Default: %(default)s",
default=0)
add_worker_pool_args(parser)
add_local_parallelism_args(parser)
def mse(
......
......@@ -21,8 +21,8 @@ tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481
from .class1_affinity_predictor import Class1AffinityPredictor
from .common import configure_logging
from .parallelism import (
add_worker_pool_args,
from .local_parallelism import (
add_local_parallelism_args,
worker_pool_with_gpu_assignments_from_args,
call_wrapped_kwargs)
from .hyperparameters import HyperparameterDefaults
......@@ -122,7 +122,7 @@ parser.add_argument(
help="Keras verbosity. Default: %(default)s",
default=0)
add_worker_pool_args(parser)
add_local_parallelism_args(parser)
TRAIN_DATA_HYPERPARAMETER_DEFAULTS = HyperparameterDefaults(
subset="all",
......
......@@ -10,6 +10,8 @@ import traceback
import random
import pprint
import hashlib
import pickle
import subprocess
from functools import partial
import numpy
......@@ -22,10 +24,13 @@ tqdm.monitor_interval = 0 # see https://github.com/tqdm/tqdm/issues/481
from .class1_affinity_predictor import Class1AffinityPredictor
from .class1_neural_network import Class1NeuralNetwork
from .common import configure_logging
from .parallelism import (
add_worker_pool_args,
from .local_parallelism import (
add_local_parallelism_args,
worker_pool_with_gpu_assignments_from_args,
call_wrapped_kwargs)
from .cluster_parallelism import (
add_cluster_parallelism_args,
cluster_results_from_args)
from .allele_encoding import AlleleEncoding
from .encodable_sequences import EncodableSequences
......@@ -121,8 +126,8 @@ parser.add_argument(
default=False,
help="Launch python debugger on error")
add_worker_pool_args(parser)
add_local_parallelism_args(parser)
add_cluster_parallelism_args(parser)
def assign_folds(df, num_folds, held_out_fraction, held_out_max):
result_df = pandas.DataFrame(index=df.index)
......@@ -335,9 +340,6 @@ def main(args):
start = time.time()
worker_pool = worker_pool_with_gpu_assignments_from_args(args)
print("Worker pool", worker_pool)
# The estimated time to completion is more accurate if we randomize
# the order of the work.
random.shuffle(work_items)
......@@ -345,15 +347,41 @@ def main(args):
item['work_item_num'] = work_item_num
item['num_work_items'] = len(work_items)
if worker_pool:
print("Processing %d work items in parallel." % len(work_items))
assert not serial_run
results_generator = worker_pool.imap_unordered(
partial(call_wrapped_kwargs, train_model),
work_items,
chunksize=1)
if args.cluster_parallelism:
# Run using separate processes HPC cluster.
results_generator = cluster_results_from_args(
args,
work_function=train_model,
work_items=work_items,
constant_data=GLOBAL_DATA,
result_serialization_method="save_predictor")
worker_pool = None
else:
worker_pool = worker_pool_with_gpu_assignments_from_args(args)
print("Worker pool", worker_pool)
if worker_pool:
print("Processing %d work items in parallel." % len(work_items))
assert not serial_run
results_generator = worker_pool.imap_unordered(
partial(call_wrapped_kwargs, train_model),
work_items,
chunksize=1)
else:
# Run in serial. In this case, every worker is passed the same predictor,
# which it adds models to, so no merging is required. It also saves
# as it goes so no saving is required at the end.
print("Processing %d work items in serial." % len(work_items))
assert serial_run
for _ in tqdm.trange(len(work_items)):
item = work_items.pop(0) # want to keep freeing up memory
work_predictor = train_model(**item)
assert work_predictor is predictor
assert not work_items
results_generator = None
if results_generator:
unsaved_predictors = []
last_save_time = time.time()
for new_predictor in tqdm.tqdm(results_generator, total=len(work_items)):
......@@ -379,18 +407,6 @@ def main(args):
predictor.merge_in_place(unsaved_predictors)
else:
# Run in serial. In this case, every worker is passed the same predictor,
# which it adds models to, so no merging is required. It also saves
# as it goes so no saving is required at the end.
print("Processing %d work items in serial." % len(work_items))
assert serial_run
for _ in tqdm.trange(len(work_items)):
item = work_items.pop(0) # want to keep freeing up memory
work_predictor = train_model(**item)
assert work_predictor is predictor
assert not work_items
print("Saving final predictor to: %s" % args.out_models_dir)
predictor.save(args.out_models_dir) # write all models just to be sure
print("Done.")
......@@ -422,12 +438,13 @@ def train_model(
verbose,
progress_print_interval,
predictor,
save_to):
save_to,
constant_data=GLOBAL_DATA):
df = GLOBAL_DATA["train_data"]
folds_df = GLOBAL_DATA["folds_df"]
allele_encoding = GLOBAL_DATA["allele_encoding"]
args = GLOBAL_DATA["args"]
df = constant_data["train_data"]
folds_df = constant_data["folds_df"]
allele_encoding = constant_data["allele_encoding"]
args = constant_data["args"]
if predictor is None:
predictor = Class1AffinityPredictor(
......
......@@ -87,6 +87,8 @@ if __name__ == '__main__':
'mhcflurry.select_pan_allele_models_command:run',
'mhcflurry-calibrate-percentile-ranks = '
'mhcflurry.calibrate_percentile_ranks_command:run',
'_mhcflurry-cluster-worker-entry-point = '
'mhcflurry.cluster_parallelism:worker_entry_point',
]
},
classifiers=[
......
......@@ -101,7 +101,7 @@ HYPERPARAMETERS_LIST = [
][1:]
def run_and_check(n_jobs=0, delete=True):
def run_and_check(n_jobs=0, delete=True, additional_args=[]):
models_dir = tempfile.mkdtemp(prefix="mhcflurry-test-models")
hyperparameters_filename = os.path.join(
models_dir, "hyperparameters.yaml")
......@@ -125,7 +125,7 @@ def run_and_check(n_jobs=0, delete=True):
"--verbosity", "1",
"--pretrain-data", get_path(
"random_peptide_predictions", "predictions.csv.bz2"),
]
] + additional_args
print("Running with args: %s" % args)
subprocess.check_call(args)
......@@ -144,7 +144,7 @@ def run_and_check(n_jobs=0, delete=True):
print("Deleting: %s" % models_dir)
shutil.rmtree(models_dir)
"""
if os.environ.get("KERAS_BACKEND") != "theano":
def test_run_parallel():
run_and_check(n_jobs=1)
......@@ -153,6 +153,14 @@ if os.environ.get("KERAS_BACKEND") != "theano":
def test_run_serial():
run_and_check(n_jobs=0)
"""
def test_run_cluster_parallelism():
run_and_check(n_jobs=0, additional_args=[
'--cluster-parallelism',
'--cluster-results-workdir', '/tmp/'
])
if __name__ == "__main__":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment