diff options
author | Matei Zaharia <matei@databricks.com> | 2014-01-07 14:35:52 -0500 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-01-07 14:35:52 -0500 |
commit | d8bcc8e9a095c1b20dd7a17b6535800d39bff80e (patch) | |
tree | f3f5a1368a43b765b541be706921903cc6ac8da0 /core | |
parent | 15d953450167c4ec45c9d0a2c7ab8ee71be2e576 (diff) | |
download | spark-d8bcc8e9a095c1b20dd7a17b6535800d39bff80e.tar.gz spark-d8bcc8e9a095c1b20dd7a17b6535800d39bff80e.tar.bz2 spark-d8bcc8e9a095c1b20dd7a17b6535800d39bff80e.zip |
Add way to limit default # of cores used by applications on standalone mode
Also documents the spark.deploy.spreadOut option.
Diffstat (limited to 'core')
4 files changed, 18 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index b166527614..2de32231e8 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -67,7 +67,7 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with /** Set JAR files to distribute to the cluster. */ def setJars(jars: Seq[String]): SparkConf = { - for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor") + for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor") set("spark.jars", jars.filter(_ != null).mkString(",")) } @@ -165,6 +165,11 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with getOption(key).map(_.toDouble).getOrElse(defaultValue) } + /** Get a parameter as a boolean, falling back to a default if not set */ + def getBoolean(key: String, defaultValue: Boolean): Boolean = { + getOption(key).map(_.toBoolean).getOrElse(defaultValue) + } + /** Get all executor environment variables set on this SparkConf */ def getExecutorEnv: Seq[(String, String)] = { val prefix = "spark.executorEnv." diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 99dcced7d7..0e47f4e442 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -116,7 +116,7 @@ class SparkContext( throw new SparkException("An application must be set in your configuration") } - if (conf.get("spark.log-conf", "false").toBoolean) { + if (conf.get("spark.logConf", "false").toBoolean) { logInfo("Spark configuration:\n" + conf.toDebugString) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 5150b7c7de..1321d9200b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -28,7 +28,8 @@ private[spark] class ApplicationInfo( val desc: ApplicationDescription, val submitDate: Date, val driver: ActorRef, - val appUiUrl: String) + val appUiUrl: String, + defaultCores: Int) extends Serializable { @transient var state: ApplicationState.Value = _ @@ -81,7 +82,9 @@ private[spark] class ApplicationInfo( } } - def coresLeft: Int = desc.maxCores - coresGranted + private val myMaxCores = if (desc.maxCores == Int.MaxValue) defaultCores else desc.maxCores + + def coresLeft: Int = myMaxCores - coresGranted private var _retryCount = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7b696cfcca..ee01fb11df 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -88,7 +88,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act // As a temporary workaround before better ways of configuring memory, we allow users to set // a flag that will perform round-robin scheduling across the nodes (spreading out each app // among all the nodes) instead of trying to consolidate each app onto a small # of nodes. - val spreadOutApps = conf.get("spark.deploy.spreadOut", "true").toBoolean + val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true) + + // Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue) + val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue) override def preStart() { logInfo("Starting Spark master at " + masterUrl) @@ -426,7 +429,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = { val now = System.currentTimeMillis() val date = new Date(now) - new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl) + new ApplicationInfo( + now, newApplicationId(date), desc, date, driver, desc.appUiUrl, defaultCores) } def registerApplication(app: ApplicationInfo): Unit = { |