aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-04-07 13:06:30 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-07 13:06:30 -0700
commit0307db0f55b714930c7ea118d5451190ea8c1a94 (patch)
treeaef07717fd1658760a51d77d2b22445bbfe9921e /core
parent2a2ca48be61ed0d72c4347e1c042a264b94db3e8 (diff)
downloadspark-0307db0f55b714930c7ea118d5451190ea8c1a94.tar.gz
spark-0307db0f55b714930c7ea118d5451190ea8c1a94.tar.bz2
spark-0307db0f55b714930c7ea118d5451190ea8c1a94.zip
SPARK-1099: Introduce local[*] mode to infer number of cores
This is the default mode for running spark-shell and pyspark, intended to allow users running spark for the first time to see the performance benefits of using multiple cores, while not breaking backwards compatibility for users who use "local" mode and expect exactly 1 core. Author: Aaron Davidson <aaron@databricks.com> Closes #182 from aarondav/110 and squashes the following commits: a88294c [Aaron Davidson] Rebased changes for new spark-shell a9f393e [Aaron Davidson] SPARK-1099: Introduce local[*] mode to infer number of cores
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala8
2 files changed, 14 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 8382dd44f3..e5ebd350ee 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1285,8 +1285,8 @@ object SparkContext extends Logging {
/** Creates a task scheduler based on a given master URL. Extracted for testing. */
private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
- // Regular expression used for local[N] master format
- val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
+ // Regular expression used for local[N] and local[*] master formats
+ val LOCAL_N_REGEX = """local\[([0-9\*]+)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
@@ -1309,8 +1309,11 @@ object SparkContext extends Logging {
scheduler
case LOCAL_N_REGEX(threads) =>
+ def localCpuCount = Runtime.getRuntime.availableProcessors()
+ // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
+ val threadCount = if (threads == "*") localCpuCount else threads.toInt
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
- val backend = new LocalBackend(scheduler, threads.toInt)
+ val backend = new LocalBackend(scheduler, threadCount)
scheduler.initialize(backend)
scheduler
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index b543471a5d..94fba10286 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -51,6 +51,14 @@ class SparkContextSchedulerCreationSuite
}
}
+ test("local-*") {
+ val sched = createTaskScheduler("local[*]")
+ sched.backend match {
+ case s: LocalBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors())
+ case _ => fail()
+ }
+ }
+
test("local-n") {
val sched = createTaskScheduler("local[5]")
assert(sched.maxTaskFailures === 1)