diff --git a/downloads-generation/models_class1_allele_specific_single_kim2014_only/models.py b/downloads-generation/models_class1_allele_specific_single_kim2014_only/models.py
index 6e20239c8185561217aa5e38e6fb4c2ac0d30677..6375cd4510bfebadd4df529a6884c1eb1632f162 100644
--- a/downloads-generation/models_class1_allele_specific_single_kim2014_only/models.py
+++ b/downloads-generation/models_class1_allele_specific_single_kim2014_only/models.py
@@ -3,13 +3,13 @@ from mhcflurry.class1_allele_specific.train import HYPERPARAMETER_DEFAULTS
 import json
 
 models = HYPERPARAMETER_DEFAULTS.models_grid(
-    impute=[False, True],
+    #impute=[False, True],
+    impute=[False],
     activation=["tanh"],
     layer_sizes=[[12], [64], [128]],
     embedding_output_dim=[8, 32, 64],
     dropout_probability=[0, .1, .25],
-    pretrain_decay=["1 / (1+epoch)**2"],
-    fraction_negative=[0, .1, .2],
+    # fraction_negative=[0, .1, .2],
     n_training_epochs=[250])
 
 sys.stderr.write("Models: %d\n" % len(models))
diff --git a/mhcflurry/class1_allele_specific/cross_validation.py b/mhcflurry/class1_allele_specific/cross_validation.py
index 63852548670970e1ac90816b7934cebc1ad54ec8..6bcd092cbe2b4c4fe11e4bff273c6c3e8d04a909 100644
--- a/mhcflurry/class1_allele_specific/cross_validation.py
+++ b/mhcflurry/class1_allele_specific/cross_validation.py
@@ -20,11 +20,10 @@ from __future__ import (
 import collections
 import logging
 
-
 import pepdata
 
 from .train import impute_and_select_allele, AlleleSpecificTrainTestFold
-from ..parallelism import get_default_executor
+from ..parallelism import get_default_backend
 
 gbmr4_transformer = pepdata.reduced_alphabet.make_alphabet_transformer("gbmr4")
 
@@ -100,7 +99,7 @@ def cross_validation_folds(
             'min_observations_per_peptide': 2,
             'min_observations_per_allele': 2,
         },
-        executor=None):
+        parallel_backend=None):
     '''
     Split a Dataset into n_folds cross validation folds for each allele,
     optionally performing imputation.
@@ -136,8 +135,8 @@ def cross_validation_folds(
     list of AlleleSpecificTrainTestFold of length num alleles * n_folds
 
     '''
-    if executor is None:
-        executor = get_default_executor()
+    if parallel_backend is None:
+        parallel_backend = get_default_backend()
 
     if alleles is None:
         alleles = train_data.unique_alleles()
@@ -166,7 +165,7 @@ def cross_validation_folds(
                 test_split = full_test_split
 
             if imputer is not None:
-                imputation_future = executor.submit(
+                imputation_future = parallel_backend.submit(
                     impute_and_select_allele,
                     all_allele_train_split,
                     imputer=imputer,
diff --git a/mhcflurry/class1_allele_specific/cv_and_train_command.py b/mhcflurry/class1_allele_specific/cv_and_train_command.py
index cda07b7e9218c155837b2b75918816a10ed573ff..fea31a9558ab13424d08ca20e1d16eac2c590409 100644
--- a/mhcflurry/class1_allele_specific/cv_and_train_command.py
+++ b/mhcflurry/class1_allele_specific/cv_and_train_command.py
@@ -48,9 +48,7 @@ import pickle
 
 import numpy
 
-from dask import distributed
-
-from ..parallelism import set_default_executor, get_default_executor
+from .. import parallelism
 from ..dataset import Dataset
 from ..imputation_helpers import imputer_from_name
 from .cross_validation import cross_validation_folds
@@ -138,6 +136,19 @@ parser.add_argument(
     metavar="HOST:PORT",
     help="Host and port of dask distributed scheduler")
 
+parser.add_argument(
+    "--num-local-processes",
+    metavar="N",
+    type=int,
+    help="Processes (exclusive with --dask-scheduler and --num-local-threads)")
+
+parser.add_argument(
+    "--num-local-threads",
+    metavar="N",
+    type=int,
+    default=1,
+    help="Threads (exclusive with --dask-scheduler and --num-local-processes)")
+
 parser.add_argument(
     "--min-samples-per-allele",
     default=100,
@@ -161,22 +172,36 @@ parser.add_argument(
 
 def run(argv=sys.argv[1:]):
     args = parser.parse_args(argv)
-    if not args.quiet:
-        logging.basicConfig(level="INFO")
     if args.verbose:
-        logging.basicConfig(level="DEBUG")
+        logging.root.setLevel(level="DEBUG")
+    elif not args.quiet:
+        logging.root.setLevel(level="INFO")
+
+    logging.info("Running with arguments: %s" % args)
+    print("Past logging")
+
+
+    # Set parallel backend
     if args.dask_scheduler:
-        executor = distributed.Executor(args.dask_scheduler)
-        set_default_executor(executor)
-        logging.info(
-            "Running with dask scheduler: %s [%s cores]" % (
-                args.dask_scheduler,
-                sum(executor.ncores().values())))
+        backend = parallelism.DaskDistributedParallelBackend(
+            args.dask_scheduler)
+    else:
+        if args.num_local_processes:
+            backend = parallelism.ConcurrentFuturesParallelBackend(
+                args.num_local_processes,
+                processes=True)
+        else:
+            backend = parallelism.ConcurrentFuturesParallelBackend(
+                args.num_local_threads,
+                processes=False)
+
+    parallelism.set_default_backend(backend)
+    logging.info("Using parallel backend: %s" % backend)
     go(args)
 
 
 def go(args):
-    executor = get_default_executor()
+    backend = parallelism.get_default_backend()
 
     model_architectures = json.loads(args.model_architectures.read())
     logging.info("Read %d model architectures" % len(model_architectures))
@@ -292,7 +317,7 @@ def go(args):
             (allele, best_index, architecture))
 
         if architecture['impute']:
-            imputation_future = executor.submit(
+            imputation_future = backend.submit(
                 impute_and_select_allele,
                 train_data,
                 imputer=imputer,
diff --git a/mhcflurry/class1_allele_specific/train.py b/mhcflurry/class1_allele_specific/train.py
index 73937f7a06144267367b75e5728a580574e03e69..c4f29e4936adb83d235c5bdba1683a8c181f34a2 100644
--- a/mhcflurry/class1_allele_specific/train.py
+++ b/mhcflurry/class1_allele_specific/train.py
@@ -31,7 +31,7 @@ import mhcflurry
 from .scoring import make_scores
 from .class1_binding_predictor import Class1BindingPredictor
 from ..hyperparameters import HyperparameterDefaults
-from ..parallelism import get_default_executor, map_throw_fast
+from ..parallelism import get_default_backend
 
 
 TRAIN_HYPERPARAMETER_DEFAULTS = HyperparameterDefaults(impute=False)
@@ -239,7 +239,7 @@ def train_across_models_and_folds(
         cartesian_product_of_folds_and_models=True,
         return_predictors=False,
         folds_per_task=1,
-        executor=None):
+        parallel_backend=None):
     '''
     Train and optionally test any number of models across any number of folds.
 
@@ -259,14 +259,14 @@ def train_across_models_and_folds(
     return_predictors : boolean, optional
         Include the trained predictors in the result.
 
-    executor : 
+    parallel_backend : parallel backend, optional
 
     Returns
     -----------
     pandas.DataFrame
     '''
-    if executor is None:
-        executor = get_default_executor()
+    if parallel_backend is None:
+        parallel_backend = get_default_backend()
 
     if cartesian_product_of_folds_and_models:
         tasks_per_model = int(math.ceil(float(len(folds)) / folds_per_task))
@@ -303,8 +303,7 @@ def train_across_models_and_folds(
             [folds[i] for i in fold_nums],
             return_predictor=return_predictors)
 
-    task_results = map_throw_fast(
-        executor,
+    task_results = parallel_backend.map(
         train_and_test_one_model_task,
         task_model_and_fold_indices)
 
diff --git a/mhcflurry/parallelism.py b/mhcflurry/parallelism.py
index 73e9cfce34cd686a99beb2b307e2ad7c8d584ecd..d22473bb4803485c4cd762f82689ccd41318dc43 100644
--- a/mhcflurry/parallelism.py
+++ b/mhcflurry/parallelism.py
@@ -1,34 +1,72 @@
 from concurrent import futures
 import logging
 
-DEFAULT_EXECUTOR = None
+DEFAULT_BACKEND = None
 
 
-def set_default_executor(executor):
-    global DEFAULT_EXECUTOR
-    DEFAULT_EXECUTOR = executor
+class ParallelBackend(object):
+    def __init__(self, executor, module, verbose=1):
+        self.executor = executor
+        self.module = module
+        self.verbose = verbose
 
+    def submit(self, func, *args, **kwargs):
+        if self.verbose > 0:
+            logging.debug("Submitting: %s %s %s" % (func, args, kwargs))
+        return self.executor.submit(func, *args, **kwargs)
 
-def get_default_executor():
-    global DEFAULT_EXECUTOR
-    if DEFAULT_EXECUTOR is None:
-        DEFAULT_EXECUTOR = futures.ThreadPoolExecutor(max_workers=1)
-    return DEFAULT_EXECUTOR
+    def map(self, func, iterable):
+        fs = [
+            self.executor.submit(func, arg) for arg in iterable
+        ]
+        return self.wait(fs)
 
+    def wait(self, fs):
+        result_dict = {}
+        for finished_future in self.module.as_completed(fs):
+            result = finished_future.result()
+            logging.info("%3d / %3d tasks completed" % (
+                len(result_dict), len(fs)))
+            result_dict[finished_future] = result
 
-def map_throw_fast(executor, func, iterable):
-    futures = [
-        executor.submit(func, arg) for arg in iterable
-    ]
-    return wait_all_throw_fast(futures)
+        return [result_dict[future] for future in fs]
 
 
-def wait_all_throw_fast(fs):
-    result_dict = {}
-    for finished_future in futures.as_completed(fs):
-        result = finished_future.result()
-        logging.info("%3d / %3d tasks completed" % (
-            len(result_dict), len(fs)))
-        result_dict[finished_future] = result
+class DaskDistributedParallelBackend(ParallelBackend):
+    def __init__(self, scheduler_ip_and_port, verbose=1):
+        from dask import distributed
+        executor = distributed.Executor(scheduler_ip_and_port)
+        ParallelBackend.__init__(self, executor, distributed, verbose=verbose)
+        self.scheduler_ip_and_port = scheduler_ip_and_port
 
-    return [result_dict[future] for future in fs]
+    def __str__(self):
+        return "<Dask distributed backend, scheduler=%s, total_cores=%d>" % (
+            self.scheduler_ip_and_port,
+            sum(self.executor.ncores().values()))
+
+
+class ConcurrentFuturesParallelBackend(ParallelBackend):
+    def __init__(self, num_workers=1, processes=False, verbose=1):
+        if processes:
+            executor = futures.ProcessPoolExecutor(num_workers)
+        else:
+            executor = futures.ThreadPoolExecutor(num_workers)
+        ParallelBackend.__init__(self, executor, futures, verbose=verbose)
+        self.num_workers = num_workers
+        self.processes = processes
+
+    def __str__(self):
+        return "<Concurrent futures %s parallel backend, num workers = %d>" % (
+            ("processes" if self.processes else "threads"), self.num_workers)
+
+
+def set_default_backend(backend):
+    global DEFAULT_BACKEND
+    DEFAULT_BACKEND = backend
+
+
+def get_default_backend():
+    global DEFAULT_BACKEND
+    if DEFAULT_BACKEND is None:
+        set_default_backend(ConcurrentFuturesParallelBackend())
+    return DEFAULT_BACKEND
diff --git a/test/test_class1_allele_specific_cv_and_train_command.py b/test/test_class1_allele_specific_cv_and_train_command.py
index a439b5151977414f6e8f4f9cb7addec2086159b5..fdc123be44d31c1602c0b971b9cbd91247183b38 100644
--- a/test/test_class1_allele_specific_cv_and_train_command.py
+++ b/test/test_class1_allele_specific_cv_and_train_command.py
@@ -60,6 +60,7 @@ def test_small_run():
         "--cv-num-folds", "2",
         "--alleles", "HLA-A0201", "HLA-A0301",
         "--verbose",
+        "--num-local-threads", "2",
     ]
     print("Running cv_and_train_command with args: %s " % str(args))