Skip to content
Snippets Groups Projects
parallelism.py 3.43 KiB
from concurrent import futures
import logging

DEFAULT_BACKEND = None


class ParallelBackend(object):
    """
    Thin wrapper of futures implementations. Designed to support
    concurrent.futures as well as dask.distributed's workalike implementation.
    """
    def __init__(self, executor, module, verbose=1):
        self.executor = executor
        self.module = module
        self.verbose = verbose


class KubefaceParallelBackend(ParallelBackend):
    """
    ParallelBackend that uses kubeface
    """
    def __init__(self, args):
        from kubeface import Client  # pylint: disable=import-error
        self.client = Client.from_args(args)

    def map(self, func, iterable):
        return self.client.map(func, iterable)

    def __str__(self):
        return "<Kubeface backend, client=%s>" % self.client


class DaskDistributedParallelBackend(ParallelBackend):
    """
    ParallelBackend that uses dask.distributed
    """
    def __init__(self, scheduler_ip_and_port, verbose=1):
        from dask import distributed  # pylint: disable=import-error
        executor = distributed.Executor(scheduler_ip_and_port)
        ParallelBackend.__init__(self, executor, distributed, verbose=verbose)
        self.scheduler_ip_and_port = scheduler_ip_and_port

    def map(self, func, iterable):
        fs = [
            self.executor.submit(func, arg) for arg in iterable
        ]
        return self.wait(fs)

    def wait(self, fs):
        result_dict = {}
        for finished_future in self.module.as_completed(fs):
            result = finished_future.result()
            logging.info("%3d / %3d tasks completed" % (
                len(result_dict), len(fs)))
            result_dict[finished_future] = result

        return [result_dict[future] for future in fs]

    def __str__(self):
        return "<Dask distributed backend, scheduler=%s, total_cores=%d>" % (
            self.scheduler_ip_and_port,
            sum(self.executor.ncores().values()))


class ConcurrentFuturesParallelBackend(ParallelBackend):
    """
    ParallelBackend that uses Python's concurrent.futures module.
    Can use either threads or processes.
    """
    def __init__(self, num_workers=1, processes=False, verbose=1):
        if processes:
            executor = futures.ProcessPoolExecutor(num_workers)
        else:
            executor = futures.ThreadPoolExecutor(num_workers)
        ParallelBackend.__init__(self, executor, futures, verbose=verbose)
        self.num_workers = num_workers
        self.processes = processes

    def __str__(self):
        return "<Concurrent futures %s parallel backend, num workers = %d>" % (
            ("processes" if self.processes else "threads"), self.num_workers)

    def map(self, func, iterable):
        fs = [
            self.executor.submit(func, arg) for arg in iterable
        ]
        return self.wait(fs)

    def wait(self, fs):
        result_dict = {}
        for finished_future in self.module.as_completed(fs):
            result = finished_future.result()
            logging.info("%3d / %3d tasks completed" % (
                len(result_dict), len(fs)))
            result_dict[finished_future] = result

        return [result_dict[future] for future in fs]


def set_default_backend(backend):
    global DEFAULT_BACKEND
    DEFAULT_BACKEND = backend


def get_default_backend():
    global DEFAULT_BACKEND
    if DEFAULT_BACKEND is None:
        set_default_backend(ConcurrentFuturesParallelBackend())
    return DEFAULT_BACKEND