From 0f1946b84f639c9f1b17071646719be7eaabd878 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Thu, 25 Jun 2020 11:16:49 +0800 Subject: [PATCH 1/5] init --- joblibspark/backend.py | 80 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 73 insertions(+), 7 deletions(-) diff --git a/joblibspark/backend.py b/joblibspark/backend.py index 05d9374..51181d3 100644 --- a/joblibspark/backend.py +++ b/joblibspark/backend.py @@ -17,6 +17,8 @@ """ The joblib spark backend implementation. """ +import sys +import logging import warnings from multiprocessing.pool import ThreadPool import uuid @@ -32,6 +34,20 @@ from pyspark.util import VersionUtils +def _get_logger(name): + """ Gets a logger by name, or creates and configures it for the first time. """ + logger = logging.getLogger(name) + logger.setLevel(logging.INFO) + # If the logger is configured, skip the configure + if not logger.handlers and not logging.getLogger().handlers: + handler = logging.StreamHandler(sys.stderr) + logger.addHandler(handler) + return logger + + +logger = _get_logger("joblib-spark") + + def register(): """ Register joblib spark backend. @@ -56,6 +72,8 @@ class SparkDistributedBackend(ParallelBackendBase, AutoBatchingMixin): Each task batch will be run inside one spark task on worker node, and will be executed by `SequentialBackend` """ + # Hard cap on the number of concurrent hyperopt tasks (Spark jobs) to run. Set at 128. + MAX_CONCURRENT_JOBS_ALLOWED = 128 def __init__(self, **backend_args): super(SparkDistributedBackend, self).__init__(**backend_args) @@ -77,19 +95,67 @@ def _cancel_all_jobs(self): else: self._spark.sparkContext.cancelJobGroup(self._job_group) + @staticmethod + def _decide_parallelism( + requested_parallelism, spark_default_parallelism, max_num_concurrent_tasks + ): + """ + Given the requested parallelism, return the max parallelism SparkTrials will actually use. + See the docstring for `parallelism` in the constructor for expected behavior. + """ + if max_num_concurrent_tasks == 0: + logger.warning( + "The cluster has no executors currently. " + "The trials won't start until some new executors register." + ) + + if requested_parallelism is None or requested_parallelism <= 0: + parallelism = max(spark_default_parallelism, max_num_concurrent_tasks, 1) + logger.warning( + "Because the requested parallelism was None or a non-positive value, " + "parallelism will be set to ({d}), which is Spark's default parallelism ({s}), " + "or the current total of Spark task slots ({t}), or 1, whichever is greater. " + "We recommend setting parallelism explicitly to a positive value because " + "the total of Spark task slots is subject to cluster sizing.".format( + d=parallelism, + s=spark_default_parallelism, + t=max_num_concurrent_tasks, + ) + ) + else: + parallelism = requested_parallelism + + if parallelism > SparkDistributedBackend.MAX_CONCURRENT_JOBS_ALLOWED: + logger.warning( + "Parallelism ({p}) is capped at SparkTrials.MAX_CONCURRENT_JOBS_ALLOWED ({c}).".format( + p=parallelism, c=SparkDistributedBackend.MAX_CONCURRENT_JOBS_ALLOWED + ) + ) + parallelism = SparkDistributedBackend.MAX_CONCURRENT_JOBS_ALLOWED + + if parallelism > max_num_concurrent_tasks: + logger.warning( + "Parallelism ({p}) is greater than the current total of Spark task slots ({c}). " + "If dynamic allocation is enabled, you might see more executors allocated.".format( + p=requested_parallelism, c=max_num_concurrent_tasks + ) + ) + return parallelism + def effective_n_jobs(self, n_jobs): max_num_concurrent_tasks = self._get_max_num_concurrent_tasks() + spark_default_parallelism = self._spark_context.defaultParallelism if n_jobs is None: n_jobs = 1 elif n_jobs == -1: # n_jobs=-1 means requesting all available workers - n_jobs = max_num_concurrent_tasks - if n_jobs > max_num_concurrent_tasks: - warnings.warn("User-specified n_jobs ({n}) is greater than the max number of " - "concurrent tasks ({c}) this cluster can run now. If dynamic " - "allocation is enabled for the cluster, you might see more " - "executors allocated." - .format(n=n_jobs, c=max_num_concurrent_tasks)) + # But if cluster in dynamic allocation mode and available workers is zero + # then use spark_default_parallelism and trigger spark worker dynamic allocation + n_jobs = self._decide_parallelism( + requested_parallelism=n_jobs, + spark_default_parallelism=spark_default_parallelism, + max_num_concurrent_tasks=max_num_concurrent_tasks, + ) return n_jobs def _get_max_num_concurrent_tasks(self): From e442b0af481f105b3c0b28225796b64f5efe26a1 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Thu, 25 Jun 2020 11:30:35 +0800 Subject: [PATCH 2/5] update --- joblibspark/backend.py | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/joblibspark/backend.py b/joblibspark/backend.py index 51181d3..55836b1 100644 --- a/joblibspark/backend.py +++ b/joblibspark/backend.py @@ -17,6 +17,8 @@ """ The joblib spark backend implementation. """ + +# pylint: disable=W0621,W1202 import sys import logging import warnings @@ -96,9 +98,9 @@ def _cancel_all_jobs(self): self._spark.sparkContext.cancelJobGroup(self._job_group) @staticmethod - def _decide_parallelism( - requested_parallelism, spark_default_parallelism, max_num_concurrent_tasks - ): + def _decide_parallelism(requested_parallelism, + spark_default_parallelism, + max_num_concurrent_tasks): """ Given the requested parallelism, return the max parallelism SparkTrials will actually use. See the docstring for `parallelism` in the constructor for expected behavior. @@ -108,8 +110,9 @@ def _decide_parallelism( "The cluster has no executors currently. " "The trials won't start until some new executors register." ) - - if requested_parallelism is None or requested_parallelism <= 0: + if requested_parallelism is None: + requested_parallelism = 1 + elif requested_parallelism <= 0: parallelism = max(spark_default_parallelism, max_num_concurrent_tasks, 1) logger.warning( "Because the requested parallelism was None or a non-positive value, " @@ -143,20 +146,19 @@ def _decide_parallelism( return parallelism def effective_n_jobs(self, n_jobs): + """ + n_jobs is None will request 1 worker. + n_jobs=-1 means requesting all available workers, + but if cluster in dynamic allocation mode and available workers is zero + then use spark_default_parallelism and trigger spark worker dynamic allocation + """ max_num_concurrent_tasks = self._get_max_num_concurrent_tasks() - spark_default_parallelism = self._spark_context.defaultParallelism - if n_jobs is None: - n_jobs = 1 - elif n_jobs == -1: - # n_jobs=-1 means requesting all available workers - # But if cluster in dynamic allocation mode and available workers is zero - # then use spark_default_parallelism and trigger spark worker dynamic allocation - n_jobs = self._decide_parallelism( - requested_parallelism=n_jobs, - spark_default_parallelism=spark_default_parallelism, - max_num_concurrent_tasks=max_num_concurrent_tasks, - ) - return n_jobs + spark_default_parallelism = self._spark.sparkContext.defaultParallelism + return self._decide_parallelism( + requested_parallelism=n_jobs, + spark_default_parallelism=spark_default_parallelism, + max_num_concurrent_tasks=max_num_concurrent_tasks, + ) def _get_max_num_concurrent_tasks(self): # maxNumConcurrentTasks() is a package private API From 8e63906cde8951af1615945c5394930c33ec0f02 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Thu, 25 Jun 2020 11:35:48 +0800 Subject: [PATCH 3/5] update --- joblibspark/backend.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/joblibspark/backend.py b/joblibspark/backend.py index 55836b1..c89df37 100644 --- a/joblibspark/backend.py +++ b/joblibspark/backend.py @@ -130,9 +130,8 @@ def _decide_parallelism(requested_parallelism, if parallelism > SparkDistributedBackend.MAX_CONCURRENT_JOBS_ALLOWED: logger.warning( - "Parallelism ({p}) is capped at SparkTrials.MAX_CONCURRENT_JOBS_ALLOWED ({c}).".format( - p=parallelism, c=SparkDistributedBackend.MAX_CONCURRENT_JOBS_ALLOWED - ) + "Parallelism ({p}) is capped at SparkTrials.MAX_CONCURRENT_JOBS_ALLOWED ({c})." + .format(p=parallelism, c=SparkDistributedBackend.MAX_CONCURRENT_JOBS_ALLOWED) ) parallelism = SparkDistributedBackend.MAX_CONCURRENT_JOBS_ALLOWED From ef1b292907ffca52c55579f5a783b405ce0b373a Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Thu, 25 Jun 2020 11:57:09 +0800 Subject: [PATCH 4/5] update --- joblibspark/backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/joblibspark/backend.py b/joblibspark/backend.py index c89df37..b86b47b 100644 --- a/joblibspark/backend.py +++ b/joblibspark/backend.py @@ -111,7 +111,7 @@ def _decide_parallelism(requested_parallelism, "The trials won't start until some new executors register." ) if requested_parallelism is None: - requested_parallelism = 1 + parallelism = 1 elif requested_parallelism <= 0: parallelism = max(spark_default_parallelism, max_num_concurrent_tasks, 1) logger.warning( From 153e9b27c8cb807d7c7120c41c8def1b883c87da Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Thu, 25 Jun 2020 12:55:51 +0800 Subject: [PATCH 5/5] update test --- test/test_backend.py | 68 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 5 deletions(-) diff --git a/test/test_backend.py b/test/test_backend.py index d000f06..52e113f 100644 --- a/test/test_backend.py +++ b/test/test_backend.py @@ -1,9 +1,26 @@ -import warnings +import contextlib +import logging from unittest.mock import MagicMock +from six import StringIO from joblibspark.backend import SparkDistributedBackend +@contextlib.contextmanager +def patch_logger(name, level=logging.INFO): + """patch logger and give an output""" + io_out = StringIO() + log = logging.getLogger(name) + log.setLevel(level) + log.handlers = [] + handler = logging.StreamHandler(io_out) + log.addHandler(handler) + try: + yield io_out + finally: + log.removeHandler(handler) + + def test_effective_n_jobs(): backend = SparkDistributedBackend() @@ -13,8 +30,49 @@ def test_effective_n_jobs(): assert backend.effective_n_jobs(n_jobs=None) == 1 assert backend.effective_n_jobs(n_jobs=-1) == 8 assert backend.effective_n_jobs(n_jobs=4) == 4 + assert backend.effective_n_jobs(n_jobs=16) == 16 + + +def test_parallelism_arg(): + for spark_default_parallelism, max_num_concurrent_tasks in [(2, 4), (2, 0)]: + default_parallelism = max(spark_default_parallelism, max_num_concurrent_tasks) + + assert 1 == SparkDistributedBackend._decide_parallelism( + requested_parallelism=None, + spark_default_parallelism=spark_default_parallelism, + max_num_concurrent_tasks=max_num_concurrent_tasks, + ) + with patch_logger("joblib-spark") as output: + parallelism = SparkDistributedBackend._decide_parallelism( + requested_parallelism=-1, + spark_default_parallelism=spark_default_parallelism, + max_num_concurrent_tasks=max_num_concurrent_tasks, + ) + assert parallelism == default_parallelism + log_output = output.getvalue().strip() + assert "Because the requested parallelism was None or a non-positive value, " \ + "parallelism will be set to ({d})".format(d=default_parallelism) in log_output + + # Test requested_parallelism which will trigger spark executor dynamic allocation. + with patch_logger("joblib-spark") as output: + parallelism = SparkDistributedBackend._decide_parallelism( + requested_parallelism=max_num_concurrent_tasks + 1, + spark_default_parallelism=spark_default_parallelism, + max_num_concurrent_tasks=max_num_concurrent_tasks, + ) + assert parallelism == max_num_concurrent_tasks + 1 + log_output = output.getvalue().strip() + assert "Parallelism ({p}) is greater".format(p=max_num_concurrent_tasks + 1) \ + in log_output - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - assert backend.effective_n_jobs(n_jobs=16) == 16 - assert len(w) == 1 + # Test requested_parallelism exceeds hard cap + with patch_logger("joblib-spark") as output: + parallelism = SparkDistributedBackend._decide_parallelism( + requested_parallelism=SparkDistributedBackend.MAX_CONCURRENT_JOBS_ALLOWED + 1, + spark_default_parallelism=spark_default_parallelism, + max_num_concurrent_tasks=max_num_concurrent_tasks, + ) + assert parallelism == SparkDistributedBackend.MAX_CONCURRENT_JOBS_ALLOWED + log_output = output.getvalue().strip() + assert "SparkTrials.MAX_CONCURRENT_JOBS_ALLOWED ({c})" \ + .format(c=SparkDistributedBackend.MAX_CONCURRENT_JOBS_ALLOWED) in log_output