Skip to content
Snippets Groups Projects
Commit bc78f82f authored by Tim O'Donnell's avatar Tim O'Donnell
Browse files

basic support for kubeface

parent 4e3ccd35
No related merge requests found
......@@ -169,6 +169,12 @@ parser.add_argument(
default=False,
help="Output more info")
try:
import kubeface
kubeface.Client.add_args(parser)
except ImportError:
logging.error("Kubeface support disabled, not installed.")
def run(argv=sys.argv[1:]):
args = parser.parse_args(argv)
......@@ -183,6 +189,8 @@ def run(argv=sys.argv[1:]):
if args.dask_scheduler:
backend = parallelism.DaskDistributedParallelBackend(
args.dask_scheduler)
elif hasattr(args, 'storage_prefix') and args.storage_prefix:
backend = parallelism.KubefaceParallelBackend(args)
else:
if args.num_local_processes:
backend = parallelism.ConcurrentFuturesParallelBackend(
......
......@@ -36,6 +36,21 @@ class ParallelBackend(object):
return [result_dict[future] for future in fs]
class KubefaceParallelBackend(ParallelBackend):
"""
ParallelBackend that uses kubeface
"""
def __init__(self, args):
from kubeface import Client # pylint: disable=import-error
self.client = Client.from_args(args)
def map(self, func, iterable):
return self.client.map(func, iterable)
def __str__(self):
return "<Kubeface backend, client=%s>" % self.client
class DaskDistributedParallelBackend(ParallelBackend):
"""
ParallelBackend that uses dask.distributed
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment