From 4752df65c80e58a36f6d4d456a7cfaf0e67bcf5c Mon Sep 17 00:00:00 2001 From: Tim O'Donnell <timodonnell@gmail.com> Date: Sun, 11 Feb 2018 13:02:49 -0500 Subject: [PATCH] better handling of crashed workers --- .../train_allele_specific_models_command.py | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/mhcflurry/train_allele_specific_models_command.py b/mhcflurry/train_allele_specific_models_command.py index 1f7395f8..98dfc408 100644 --- a/mhcflurry/train_allele_specific_models_command.py +++ b/mhcflurry/train_allele_specific_models_command.py @@ -9,6 +9,8 @@ import time import traceback import random from multiprocessing import Pool, Queue, cpu_count +from queue import Empty +from multiprocessing.util import Finalize from functools import partial from pprint import pprint @@ -181,10 +183,12 @@ def make_worker_pool( if initializer_kwargs_per_process: assert len(initializer_kwargs_per_process) == processes kwargs_queue = Queue() + kwargs_queue2 = Queue() for kwargs in initializer_kwargs_per_process: kwargs_queue.put(kwargs) + kwargs_queue2.put(kwargs) pool_kwargs["initializer"] = worker_init_entry_point - pool_kwargs["initargs"] = (initializer, kwargs_queue) + pool_kwargs["initargs"] = (initializer, kwargs_queue, kwargs_queue2) else: pool_kwargs["initializer"] = initializer @@ -194,16 +198,23 @@ def make_worker_pool( return worker_pool -def worker_init_entry_point(init_function, arg_queue=None): +def worker_init_entry_point( + init_function, arg_queue=None, round_robin_arg_queue=None): + kwargs = {} if arg_queue: - kwargs = arg_queue.get() - - # We add the init args back to the queue so restarted workers (e.g. when - # when running with maxtasksperchild) will pickup init arguments in a - # round-robin style. - arg_queue.put(kwargs) - else: - kwargs = {} + try: + kwargs = arg_queue.get(block=False) + except Empty: + print("Argument queue empty. Using round robin arg queue.") + kwargs = round_robin_arg_queue.get(block=True) + round_robin_arg_queue.put(kwargs) + + # On exit we add the init args back to the queue so restarted workers + # (e.g. when when running with maxtasksperchild) will pickup init + # arguments from a previously exited worker. + Finalize(None, arg_queue.put, (kwargs,), exitpriority=1) + + print("Initializing worker: %s" % str(kwargs)) init_function(**kwargs) -- GitLab