Skip to content
Snippets Groups Projects
parallelism.py 2.38 KiB
Newer Older
from concurrent import futures
import logging

DEFAULT_BACKEND = None
class ParallelBackend(object):
    def __init__(self, executor, module, verbose=1):
        self.executor = executor
        self.module = module
        self.verbose = verbose
    def submit(self, func, *args, **kwargs):
        if self.verbose > 0:
            logging.debug("Submitting: %s %s %s" % (func, args, kwargs))
        return self.executor.submit(func, *args, **kwargs)
    def map(self, func, iterable):
        fs = [
            self.executor.submit(func, arg) for arg in iterable
        ]
        return self.wait(fs)
    def wait(self, fs):
        result_dict = {}
        for finished_future in self.module.as_completed(fs):
            result = finished_future.result()
            logging.info("%3d / %3d tasks completed" % (
                len(result_dict), len(fs)))
            result_dict[finished_future] = result
        return [result_dict[future] for future in fs]
class DaskDistributedParallelBackend(ParallelBackend):
    def __init__(self, scheduler_ip_and_port, verbose=1):
Tim O'Donnell's avatar
Tim O'Donnell committed
        from dask import distributed  # pylint: disable=import-error
        executor = distributed.Executor(scheduler_ip_and_port)
        ParallelBackend.__init__(self, executor, distributed, verbose=verbose)
        self.scheduler_ip_and_port = scheduler_ip_and_port
    def __str__(self):
        return "<Dask distributed backend, scheduler=%s, total_cores=%d>" % (
            self.scheduler_ip_and_port,
            sum(self.executor.ncores().values()))


class ConcurrentFuturesParallelBackend(ParallelBackend):
    def __init__(self, num_workers=1, processes=False, verbose=1):
        if processes:
            executor = futures.ProcessPoolExecutor(num_workers)
        else:
            executor = futures.ThreadPoolExecutor(num_workers)
        ParallelBackend.__init__(self, executor, futures, verbose=verbose)
        self.num_workers = num_workers
        self.processes = processes

    def __str__(self):
        return "<Concurrent futures %s parallel backend, num workers = %d>" % (
            ("processes" if self.processes else "threads"), self.num_workers)


def set_default_backend(backend):
    global DEFAULT_BACKEND
    DEFAULT_BACKEND = backend


def get_default_backend():
    global DEFAULT_BACKEND
    if DEFAULT_BACKEND is None:
        set_default_backend(ConcurrentFuturesParallelBackend())
    return DEFAULT_BACKEND