-
Tim O'Donnell authored
Swith to using parallel_backend.map instead of parallel_backend.submit, to be compatable with kubeface
Tim O'Donnell authoredSwith to using parallel_backend.map instead of parallel_backend.submit, to be compatable with kubeface
parallelism.py 3.43 KiB
from concurrent import futures
import logging
DEFAULT_BACKEND = None
class ParallelBackend(object):
"""
Thin wrapper of futures implementations. Designed to support
concurrent.futures as well as dask.distributed's workalike implementation.
"""
def __init__(self, executor, module, verbose=1):
self.executor = executor
self.module = module
self.verbose = verbose
class KubefaceParallelBackend(ParallelBackend):
"""
ParallelBackend that uses kubeface
"""
def __init__(self, args):
from kubeface import Client # pylint: disable=import-error
self.client = Client.from_args(args)
def map(self, func, iterable):
return self.client.map(func, iterable)
def __str__(self):
return "<Kubeface backend, client=%s>" % self.client
class DaskDistributedParallelBackend(ParallelBackend):
"""
ParallelBackend that uses dask.distributed
"""
def __init__(self, scheduler_ip_and_port, verbose=1):
from dask import distributed # pylint: disable=import-error
executor = distributed.Executor(scheduler_ip_and_port)
ParallelBackend.__init__(self, executor, distributed, verbose=verbose)
self.scheduler_ip_and_port = scheduler_ip_and_port
def map(self, func, iterable):
fs = [
self.executor.submit(func, arg) for arg in iterable
]
return self.wait(fs)
def wait(self, fs):
result_dict = {}
for finished_future in self.module.as_completed(fs):
result = finished_future.result()
logging.info("%3d / %3d tasks completed" % (
len(result_dict), len(fs)))
result_dict[finished_future] = result
return [result_dict[future] for future in fs]
def __str__(self):
return "<Dask distributed backend, scheduler=%s, total_cores=%d>" % (
self.scheduler_ip_and_port,
sum(self.executor.ncores().values()))
class ConcurrentFuturesParallelBackend(ParallelBackend):
"""
ParallelBackend that uses Python's concurrent.futures module.
Can use either threads or processes.
"""
def __init__(self, num_workers=1, processes=False, verbose=1):
if processes:
executor = futures.ProcessPoolExecutor(num_workers)
else:
executor = futures.ThreadPoolExecutor(num_workers)
ParallelBackend.__init__(self, executor, futures, verbose=verbose)
self.num_workers = num_workers
self.processes = processes
def __str__(self):
return "<Concurrent futures %s parallel backend, num workers = %d>" % (
("processes" if self.processes else "threads"), self.num_workers)
def map(self, func, iterable):
fs = [
self.executor.submit(func, arg) for arg in iterable
]
return self.wait(fs)
def wait(self, fs):
result_dict = {}
for finished_future in self.module.as_completed(fs):
result = finished_future.result()
logging.info("%3d / %3d tasks completed" % (
len(result_dict), len(fs)))
result_dict[finished_future] = result
return [result_dict[future] for future in fs]
def set_default_backend(backend):
global DEFAULT_BACKEND
DEFAULT_BACKEND = backend
def get_default_backend():
global DEFAULT_BACKEND
if DEFAULT_BACKEND is None:
set_default_backend(ConcurrentFuturesParallelBackend())
return DEFAULT_BACKEND