diff options
author | Matei Zaharia <matei@databricks.com> | 2014-01-07 19:30:23 -0500 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-01-07 19:30:23 -0500 |
commit | 2c421749eae1e3945ca34ce006addd98a0c1a00b (patch) | |
tree | 0390de796eabebd84a1016a04b19324f8e9212b9 /core | |
parent | 044c8ad3a47d245198a16a68e36a417e80e8c37e (diff) | |
download | spark-2c421749eae1e3945ca34ce006addd98a0c1a00b.tar.gz spark-2c421749eae1e3945ca34ce006addd98a0c1a00b.tar.bz2 spark-2c421749eae1e3945ca34ce006addd98a0c1a00b.zip |
Address review comments
Diffstat (limited to 'core')
7 files changed, 9 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 19d393a0db..e38459b883 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy private[spark] class ApplicationDescription( val name: String, - val maxCores: Int, /* Integer.MAX_VALUE denotes an unlimited number of cores */ + val maxCores: Option[Int], val memoryPerSlave: Int, val command: Command, val sparkHome: String, diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index ef649fd80c..28ebbdc66b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -48,7 +48,7 @@ private[spark] object TestClient { val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, conf = new SparkConf) val desc = new ApplicationDescription( - "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), + "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored") val listener = new TestListener val client = new Client(actorSystem, Array(url), desc, listener, new SparkConf) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 1321d9200b..3e26379166 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -82,7 +82,7 @@ private[spark] class ApplicationInfo( } } - private val myMaxCores = if (desc.maxCores == Int.MaxValue) defaultCores else desc.maxCores + private val myMaxCores = desc.maxCores.getOrElse(defaultCores) def coresLeft: Int = myMaxCores - coresGranted diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index ee01fb11df..6617b7100f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -92,6 +92,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act // Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue) val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue) + if (defaultCores < 1) { + throw new SparkException("spark.deploy.defaultCores must be positive") + } override def preStart() { logInfo("Starting Spark master at " + masterUrl) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 9858717d13..73fc37444e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -38,7 +38,7 @@ private[spark] class SparkDeploySchedulerBackend( var stopping = false var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ - val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + val maxCores = conf.getOption("spark.cores.max").map(_.toInt) override def start() { super.start() diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 810ebf4140..331fa3a642 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -70,7 +70,7 @@ class JsonProtocolSuite extends FunSuite { def createAppDesc() : ApplicationDescription = { val cmd = new Command("mainClass", List("arg1", "arg2"), Map()) - new ApplicationDescription("name", 4, 1234, cmd, "sparkHome", "appUiUrl") + new ApplicationDescription("name", Some(4), 1234, cmd, "sparkHome", "appUiUrl") } def createAppInfo() : ApplicationInfo = { new ApplicationInfo( diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 7e5aaa3f98..be93074b7b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -27,7 +27,7 @@ class ExecutorRunnerTest extends FunSuite { test("command includes appId") { def f(s:String) = new File(s) val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get - val appDesc = new ApplicationDescription("app name", 8, 500, Command("foo", Seq(),Map()), + val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(),Map()), sparkHome, "appUiUrl") val appId = "12345-worker321-9876" val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome), |