diff options
author | Aaron Davidson <aaron@databricks.com> | 2014-04-07 13:06:30 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-07 13:06:30 -0700 |
commit | 0307db0f55b714930c7ea118d5451190ea8c1a94 (patch) | |
tree | aef07717fd1658760a51d77d2b22445bbfe9921e /core | |
parent | 2a2ca48be61ed0d72c4347e1c042a264b94db3e8 (diff) | |
download | spark-0307db0f55b714930c7ea118d5451190ea8c1a94.tar.gz spark-0307db0f55b714930c7ea118d5451190ea8c1a94.tar.bz2 spark-0307db0f55b714930c7ea118d5451190ea8c1a94.zip |
SPARK-1099: Introduce local[*] mode to infer number of cores
This is the default mode for running spark-shell and pyspark, intended to allow users running spark for the first time to see the performance benefits of using multiple cores, while not breaking backwards compatibility for users who use "local" mode and expect exactly 1 core.
Author: Aaron Davidson <aaron@databricks.com>
Closes #182 from aarondav/110 and squashes the following commits:
a88294c [Aaron Davidson] Rebased changes for new spark-shell
a9f393e [Aaron Davidson] SPARK-1099: Introduce local[*] mode to infer number of cores
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 9 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala | 8 |
2 files changed, 14 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8382dd44f3..e5ebd350ee 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1285,8 +1285,8 @@ object SparkContext extends Logging { /** Creates a task scheduler based on a given master URL. Extracted for testing. */ private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = { - // Regular expression used for local[N] master format - val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r + // Regular expression used for local[N] and local[*] master formats + val LOCAL_N_REGEX = """local\[([0-9\*]+)\]""".r // Regular expression for local[N, maxRetries], used in tests with failing tasks val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r // Regular expression for simulating a Spark cluster of [N, cores, memory] locally @@ -1309,8 +1309,11 @@ object SparkContext extends Logging { scheduler case LOCAL_N_REGEX(threads) => + def localCpuCount = Runtime.getRuntime.availableProcessors() + // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. + val threadCount = if (threads == "*") localCpuCount else threads.toInt val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) - val backend = new LocalBackend(scheduler, threads.toInt) + val backend = new LocalBackend(scheduler, threadCount) scheduler.initialize(backend) scheduler diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index b543471a5d..94fba10286 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -51,6 +51,14 @@ class SparkContextSchedulerCreationSuite } } + test("local-*") { + val sched = createTaskScheduler("local[*]") + sched.backend match { + case s: LocalBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors()) + case _ => fail() + } + } + test("local-n") { val sched = createTaskScheduler("local[5]") assert(sched.maxTaskFailures === 1) |