aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-01-08 00:30:03 -0500
committerMatei Zaharia <matei@databricks.com>2014-01-08 00:30:03 -0500
commitd75dc428dae72add407aec2c134a25537e754303 (patch)
tree2c6fb6bfe806c9bf5c4084bfea383d6cef4caace /core/src
parent61674bcadfe64e0d8e05f3daab3274af3023dffa (diff)
parent2c421749eae1e3945ca34ce006addd98a0c1a00b (diff)
downloadspark-d75dc428dae72add407aec2c134a25537e754303.tar.gz
spark-d75dc428dae72add407aec2c134a25537e754303.tar.bz2
spark-d75dc428dae72add407aec2c134a25537e754303.zip
Merge pull request #350 from mateiz/standalone-limit
Add way to limit default # of cores used by apps in standalone mode Also documents the spark.deploy.spreadOut option, and fixes a config option that had a dash in its name.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala2
9 files changed, 28 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b166527614..2de32231e8 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -67,7 +67,7 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with
/** Set JAR files to distribute to the cluster. */
def setJars(jars: Seq[String]): SparkConf = {
- for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
+ for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
set("spark.jars", jars.filter(_ != null).mkString(","))
}
@@ -165,6 +165,11 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with
getOption(key).map(_.toDouble).getOrElse(defaultValue)
}
+ /** Get a parameter as a boolean, falling back to a default if not set */
+ def getBoolean(key: String, defaultValue: Boolean): Boolean = {
+ getOption(key).map(_.toBoolean).getOrElse(defaultValue)
+ }
+
/** Get all executor environment variables set on this SparkConf */
def getExecutorEnv: Seq[(String, String)] = {
val prefix = "spark.executorEnv."
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 99dcced7d7..0e47f4e442 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -116,7 +116,7 @@ class SparkContext(
throw new SparkException("An application must be set in your configuration")
}
- if (conf.get("spark.log-conf", "false").toBoolean) {
+ if (conf.get("spark.logConf", "false").toBoolean) {
logInfo("Spark configuration:\n" + conf.toDebugString)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 19d393a0db..e38459b883 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy
private[spark] class ApplicationDescription(
val name: String,
- val maxCores: Int, /* Integer.MAX_VALUE denotes an unlimited number of cores */
+ val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
val sparkHome: String,
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index ef649fd80c..28ebbdc66b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -48,7 +48,7 @@ private[spark] object TestClient {
val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
conf = new SparkConf)
val desc = new ApplicationDescription(
- "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
+ "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
"dummy-spark-home", "ignored")
val listener = new TestListener
val client = new Client(actorSystem, Array(url), desc, listener, new SparkConf)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index 5150b7c7de..3e26379166 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -28,7 +28,8 @@ private[spark] class ApplicationInfo(
val desc: ApplicationDescription,
val submitDate: Date,
val driver: ActorRef,
- val appUiUrl: String)
+ val appUiUrl: String,
+ defaultCores: Int)
extends Serializable {
@transient var state: ApplicationState.Value = _
@@ -81,7 +82,9 @@ private[spark] class ApplicationInfo(
}
}
- def coresLeft: Int = desc.maxCores - coresGranted
+ private val myMaxCores = desc.maxCores.getOrElse(defaultCores)
+
+ def coresLeft: Int = myMaxCores - coresGranted
private var _retryCount = 0
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 7b696cfcca..6617b7100f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -88,7 +88,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
- val spreadOutApps = conf.get("spark.deploy.spreadOut", "true").toBoolean
+ val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
+
+ // Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
+ val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
+ if (defaultCores < 1) {
+ throw new SparkException("spark.deploy.defaultCores must be positive")
+ }
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
@@ -426,7 +432,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
- new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
+ new ApplicationInfo(
+ now, newApplicationId(date), desc, date, driver, desc.appUiUrl, defaultCores)
}
def registerApplication(app: ApplicationInfo): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9858717d13..73fc37444e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -38,7 +38,7 @@ private[spark] class SparkDeploySchedulerBackend(
var stopping = false
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
- val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
+ val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
override def start() {
super.start()
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 0b38e239f9..331fa3a642 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -70,10 +70,11 @@ class JsonProtocolSuite extends FunSuite {
def createAppDesc() : ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map())
- new ApplicationDescription("name", 4, 1234, cmd, "sparkHome", "appUiUrl")
+ new ApplicationDescription("name", Some(4), 1234, cmd, "sparkHome", "appUiUrl")
}
def createAppInfo() : ApplicationInfo = {
- new ApplicationInfo(3, "id", createAppDesc(), new Date(123456789), null, "appUriStr")
+ new ApplicationInfo(
+ 3, "id", createAppDesc(), new Date(123456789), null, "appUriStr", Int.MaxValue)
}
def createWorkerInfo() : WorkerInfo = {
new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 7e5aaa3f98..be93074b7b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -27,7 +27,7 @@ class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
def f(s:String) = new File(s)
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
- val appDesc = new ApplicationDescription("app name", 8, 500, Command("foo", Seq(),Map()),
+ val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(),Map()),
sparkHome, "appUiUrl")
val appId = "12345-worker321-9876"
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),