aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqqsun8819 <jin.oyj@alibaba-inc.com>2014-03-19 16:33:54 -0700
committerAaron Davidson <aaron@databricks.com>2014-03-19 16:33:54 -0700
commit16789317a34c1974f7b35960f06a7b51d8e0f29f (patch)
tree93f27f88cc87fb5157f39776046d2104516d39a7
parent67fa71cba2cc07a65478899592e6ebad000e24c5 (diff)
downloadspark-16789317a34c1974f7b35960f06a7b51d8e0f29f.tar.gz
spark-16789317a34c1974f7b35960f06a7b51d8e0f29f.tar.bz2
spark-16789317a34c1974f7b35960f06a7b51d8e0f29f.zip
SPARK-1099:Spark's local mode should probably respect spark.cores.max by default
This is for JIRA:https://spark-project.atlassian.net/browse/SPARK-1099 And this is what I do in this patch (also commented in the JIRA) @aarondav This is really a behavioral change, so I do this with great caution, and welcome any review advice: 1 I change the "MASTER=local" pattern of create LocalBackEnd . In the past, we passed 1 core to it . now it use a default cores The reason here is that when someone use spark-shell to start local mode , Repl will use this "MASTER=local" pattern as default. So if one also specify cores in the spark-shell command line, it will all go in here. So here pass 1 core is not suitalbe reponding to our change here. 2 In the LocalBackEnd , the "totalCores" variable are fetched following a different rule(in the past it just take in a userd passed cores, like 1 in "MASTER=local" pattern, 2 in "MASTER=local[2]" pattern" rules: a The second argument of LocalBackEnd 's constructor indicating cores have a default value which is Int.MaxValue. If user didn't pass it , its first default value is Int.MaxValue b In getMaxCores, we first compare the former value to Int.MaxValue. if it's not equal, we think that user has passed their desired value, so just use it c. If b is not satified, we then get cores from spark.cores.max, and we get real logical cores from Runtime. And if cores specified by spark.cores.max is bigger than logical cores, we use logical cores, otherwise we use spark.cores.max 3 In SparkContextSchedulerCreationSuite 's test("local") case, assertion is modified from 1 to logical cores, because "MASTER=local" pattern use default vaules. Author: qqsun8819 <jin.oyj@alibaba-inc.com> Closes #110 from qqsun8819/local-cores and squashes the following commits: 731aefa [qqsun8819] 1 LocalBackend not change 2 In SparkContext do some process to the cores and pass it to original LocalBackend constructor 78b9c60 [qqsun8819] 1 SparkContext MASTER=local pattern use default cores instead of 1 to construct LocalBackEnd , for use of spark-shell and cores specified in cmd line 2 some test case change from local to local[1]. 3 SparkContextSchedulerCreationSuite test spark.cores.max config in local pattern 6ae1ee8 [qqsun8819] Add a static function in LocalBackEnd to let it use spark.cores.max specified cores when no cores are passed to it
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala19
3 files changed, 22 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a1003b7925..8f74607278 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1262,7 +1262,10 @@ object SparkContext extends Logging {
master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
- val backend = new LocalBackend(scheduler, 1)
+ // Use user specified in config, up to all available cores
+ val realCores = Runtime.getRuntime.availableProcessors()
+ val toUseCores = math.min(sc.conf.getInt("spark.cores.max", realCores), realCores)
+ val backend = new LocalBackend(scheduler, toUseCores)
scheduler.initialize(backend)
scheduler
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 01af940771..b4a5881cd9 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.SparkContext._
class FileSuite extends FunSuite with LocalSparkContext {
test("text files") {
- sc = new SparkContext("local", "test")
+ sc = new SparkContext("local[1]", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 4)
@@ -176,7 +176,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
test("write SequenceFile using new Hadoop API") {
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
- sc = new SparkContext("local", "test")
+ sc = new SparkContext("local[1]", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index b543471a5d..9dd42be1d7 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -27,10 +27,10 @@ import org.apache.spark.scheduler.local.LocalBackend
class SparkContextSchedulerCreationSuite
extends FunSuite with PrivateMethodTester with LocalSparkContext with Logging {
- def createTaskScheduler(master: String): TaskSchedulerImpl = {
+ def createTaskScheduler(master: String, conf: SparkConf = new SparkConf()): TaskSchedulerImpl = {
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
- sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test", conf)
val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler)
val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
sched.asInstanceOf[TaskSchedulerImpl]
@@ -44,13 +44,26 @@ class SparkContextSchedulerCreationSuite
}
test("local") {
- val sched = createTaskScheduler("local")
+ var conf = new SparkConf()
+ conf.set("spark.cores.max", "1")
+ val sched = createTaskScheduler("local", conf)
sched.backend match {
case s: LocalBackend => assert(s.totalCores === 1)
case _ => fail()
}
}
+ test("local-cores-exceed") {
+ val cores = Runtime.getRuntime.availableProcessors() + 1
+ var conf = new SparkConf()
+ conf.set("spark.cores.max", cores.toString)
+ val sched = createTaskScheduler("local", conf)
+ sched.backend match {
+ case s: LocalBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors())
+ case _ => fail()
+ }
+ }
+
test("local-n") {
val sched = createTaskScheduler("local[5]")
assert(sched.maxTaskFailures === 1)