Skip to content
Snippets Groups Projects
Commit e802c87b authored by Tim O'Donnell's avatar Tim O'Donnell
Browse files

Add --additional-complete-file cluster parallelism argument

parent 49554ca3
No related branches found
No related tags found
No related merge requests found
......@@ -8,17 +8,21 @@
#BSUB -W 46:00 # walltime in HH:MM
#BSUB -R rusage[mem=30000] # mb memory requested
#BSUB -o {work_dir}/%J.stdout # output log (%J : JobID)
#BSUB -eo {work_dir}/%J.stderr # error log
#BSUB -eo {work_dir}/STDERR # error log
#BSUB -L /bin/bash # Initialize the execution environment
#
set -e
set -x
echo "Subsequent stderr output redirected to stdout" >&2
exec 2>&1
export TMPDIR=/local/JOBS/mhcflurry-{work_item_num}
export PATH=$HOME/.conda/envs/py36b/bin/:$PATH
export PYTHONUNBUFFERED=1
export KMP_SETTINGS=1
set -e
set -x
free -m
module add cuda/10.0.130 cudnn/7.1.1
......
......@@ -44,6 +44,10 @@ def add_cluster_parallelism_args(parser):
"--cluster-results-workdir",
default='./cluster-workdir',
help="Default: %(default)s")
group.add_argument(
"--additional-complete-file",
default='STDERR',
help="Additional file to monitor for job completion. Default: %(default)s")
group.add_argument(
'--cluster-script-prefix-path',
help="",
......@@ -85,6 +89,7 @@ def cluster_results_from_args(
constant_data=constant_data,
submit_command=args.cluster_submit_command,
results_workdir=args.cluster_results_workdir,
additional_complete_file=args.additional_complete_file,
script_prefix_path=args.cluster_script_prefix_path,
result_serialization_method=result_serialization_method,
clear_constant_data=clear_constant_data
......@@ -97,6 +102,7 @@ def cluster_results(
constant_data=None,
submit_command="sh",
results_workdir="./cluster-workdir",
additional_complete_file=None,
script_prefix_path=None,
result_serialization_method="pickle",
max_retries=3,
......@@ -230,6 +236,15 @@ def cluster_results(
if os.path.exists(d['finished_path']):
result_item = d
break
if additional_complete_file:
additional_complete_file_path = os.path.join(
d['work_dir'],
additional_complete_file)
if os.path.exists(additional_complete_file_path):
result_item = d
print("Exists", additional_complete_file_path)
break
if result_item is None:
time.sleep(60)
else:
......@@ -245,25 +260,33 @@ def cluster_results(
print("[%0.1f sec elapsed] processing item %s" % (
time.time() - start, result_item))
if os.path.exists(error_path):
print("Error path exists", error_path)
with open(error_path, "rb") as fd:
exception = pickle.load(fd)
print(exception)
if retry_num < max_retries:
print("Relaunching", launch_command)
attempt_dir = os.path.join(
result_item['work_dir'], "attempt.%d" % retry_num)
if os.path.exists(error_path) or not os.path.exists(result_path):
if os.path.exists(error_path):
print("Error path exists", error_path)
with open(error_path, "rb") as fd:
exception = pickle.load(fd)
print(exception)
if not os.path.exists(result_path):
print("Result path does NOT exist", result_path)
if retry_num < max_retries:
print("Relaunching", launch_command)
attempt_dir = os.path.join(
result_item['work_dir'], "attempt.%d" % retry_num)
if os.path.exists(additional_complete_file_path):
shutil.move(additional_complete_file_path, attempt_dir)
if os.path.exists(complete_dir):
shutil.move(complete_dir, attempt_dir)
if os.path.exists(error_path):
shutil.move(error_path, attempt_dir)
subprocess.check_call(launch_command, shell=True)
print("Invoked", launch_command)
result_item['retry_num'] += 1
result_items.append(result_item)
continue
else:
print("Max retries exceeded", max_retries)
raise exception
subprocess.check_call(launch_command, shell=True)
print("Invoked", launch_command)
result_item['retry_num'] += 1
result_items.append(result_item)
continue
else:
print("Max retries exceeded", max_retries)
raise exception
if os.path.exists(result_path):
print("Result path exists", result_path)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment