Skip to content
Snippets Groups Projects
Commit 4752df65 authored by Tim O'Donnell's avatar Tim O'Donnell
Browse files

better handling of crashed workers

parent cc1543cf
No related merge requests found
......@@ -9,6 +9,8 @@ import time
import traceback
import random
from multiprocessing import Pool, Queue, cpu_count
from queue import Empty
from multiprocessing.util import Finalize
from functools import partial
from pprint import pprint
......@@ -181,10 +183,12 @@ def make_worker_pool(
if initializer_kwargs_per_process:
assert len(initializer_kwargs_per_process) == processes
kwargs_queue = Queue()
kwargs_queue2 = Queue()
for kwargs in initializer_kwargs_per_process:
kwargs_queue.put(kwargs)
kwargs_queue2.put(kwargs)
pool_kwargs["initializer"] = worker_init_entry_point
pool_kwargs["initargs"] = (initializer, kwargs_queue)
pool_kwargs["initargs"] = (initializer, kwargs_queue, kwargs_queue2)
else:
pool_kwargs["initializer"] = initializer
......@@ -194,16 +198,23 @@ def make_worker_pool(
return worker_pool
def worker_init_entry_point(init_function, arg_queue=None):
def worker_init_entry_point(
init_function, arg_queue=None, round_robin_arg_queue=None):
kwargs = {}
if arg_queue:
kwargs = arg_queue.get()
# We add the init args back to the queue so restarted workers (e.g. when
# when running with maxtasksperchild) will pickup init arguments in a
# round-robin style.
arg_queue.put(kwargs)
else:
kwargs = {}
try:
kwargs = arg_queue.get(block=False)
except Empty:
print("Argument queue empty. Using round robin arg queue.")
kwargs = round_robin_arg_queue.get(block=True)
round_robin_arg_queue.put(kwargs)
# On exit we add the init args back to the queue so restarted workers
# (e.g. when when running with maxtasksperchild) will pickup init
# arguments from a previously exited worker.
Finalize(None, arg_queue.put, (kwargs,), exitpriority=1)
print("Initializing worker: %s" % str(kwargs))
init_function(**kwargs)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment