From bc78f82fb6ebce61ae6f6f32a0a1a70f2a7128be Mon Sep 17 00:00:00 2001 From: Tim O'Donnell <timodonnell@gmail.com> Date: Mon, 9 Jan 2017 16:53:02 -0500 Subject: [PATCH] basic support for kubeface --- .../cv_and_train_command.py | 8 ++++++++ mhcflurry/parallelism.py | 15 +++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/mhcflurry/class1_allele_specific/cv_and_train_command.py b/mhcflurry/class1_allele_specific/cv_and_train_command.py index 075e8e37..b1256ac0 100644 --- a/mhcflurry/class1_allele_specific/cv_and_train_command.py +++ b/mhcflurry/class1_allele_specific/cv_and_train_command.py @@ -169,6 +169,12 @@ parser.add_argument( default=False, help="Output more info") +try: + import kubeface + kubeface.Client.add_args(parser) +except ImportError: + logging.error("Kubeface support disabled, not installed.") + def run(argv=sys.argv[1:]): args = parser.parse_args(argv) @@ -183,6 +189,8 @@ def run(argv=sys.argv[1:]): if args.dask_scheduler: backend = parallelism.DaskDistributedParallelBackend( args.dask_scheduler) + elif hasattr(args, 'storage_prefix') and args.storage_prefix: + backend = parallelism.KubefaceParallelBackend(args) else: if args.num_local_processes: backend = parallelism.ConcurrentFuturesParallelBackend( diff --git a/mhcflurry/parallelism.py b/mhcflurry/parallelism.py index 18008b4e..4d0c47ac 100644 --- a/mhcflurry/parallelism.py +++ b/mhcflurry/parallelism.py @@ -36,6 +36,21 @@ class ParallelBackend(object): return [result_dict[future] for future in fs] +class KubefaceParallelBackend(ParallelBackend): + """ + ParallelBackend that uses kubeface + """ + def __init__(self, args): + from kubeface import Client # pylint: disable=import-error + self.client = Client.from_args(args) + + def map(self, func, iterable): + return self.client.map(func, iterable) + + def __str__(self): + return "<Kubeface backend, client=%s>" % self.client + + class DaskDistributedParallelBackend(ParallelBackend): """ ParallelBackend that uses dask.distributed -- GitLab