aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Zhang <zjffdu@apache.org>2016-02-29 12:08:37 +0000
committerSean Owen <srowen@percolateur.local>2016-02-29 12:08:37 +0000
commit99fe8993f51d3c72cd95eb0825b090dd4d4cd2cd (patch)
treefbe325bdd51ab5e321704f54e99c102707ca97fb
parentd81a71357e24160244b6eeff028b0d9a4863becf (diff)
downloadspark-99fe8993f51d3c72cd95eb0825b090dd4d4cd2cd.tar.gz
spark-99fe8993f51d3c72cd95eb0825b090dd4d4cd2cd.tar.bz2
spark-99fe8993f51d3c72cd95eb0825b090dd4d4cd2cd.zip
[SPARK-12994][CORE] It is not necessary to create ExecutorAllocationM…
…anager in local mode Author: Jeff Zhang <zjffdu@apache.org> Closes #10914 from zjffdu/SPARK-12994.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala19
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala3
3 files changed, 21 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a1fa266e18..0e8b735b92 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -244,7 +244,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def eventLogDir: Option[URI] = _eventLogDir
private[spark] def eventLogCodec: Option[String] = _eventLogCodec
- def isLocal: Boolean = (master == "local" || master.startsWith("local["))
+ def isLocal: Boolean = Utils.isLocalMaster(_conf)
/**
* @return true if context is stopped or in the midst of stopping.
@@ -526,10 +526,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
- if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
- logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
- }
-
_executorAllocationManager =
if (dynamicAllocationEnabled) {
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index e0c9bf02a1..6103a10ccc 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2195,6 +2195,16 @@ private[spark] object Utils extends Logging {
isInDirectory(parent, child.getParentFile)
}
+
+ /**
+ *
+ * @return whether it is local mode
+ */
+ def isLocalMaster(conf: SparkConf): Boolean = {
+ val master = conf.get("spark.master", "")
+ master == "local" || master.startsWith("local[")
+ }
+
/**
* Return whether dynamic allocation is enabled in the given conf
* Dynamic allocation and explicitly setting the number of executors are inherently
@@ -2202,8 +2212,13 @@ private[spark] object Utils extends Logging {
* the latter should override the former (SPARK-9092).
*/
def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
- conf.getBoolean("spark.dynamicAllocation.enabled", false) &&
- conf.getInt("spark.executor.instances", 0) == 0
+ val numExecutor = conf.getInt("spark.executor.instances", 0)
+ val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false)
+ if (numExecutor != 0 && dynamicAllocationEnabled) {
+ logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
+ }
+ numExecutor == 0 && dynamicAllocationEnabled &&
+ (!isLocalMaster(conf) || conf.getBoolean("spark.dynamicAllocation.testing", false))
}
def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = {
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 7c6778b065..412c0ac9d9 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -722,6 +722,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
test("isDynamicAllocationEnabled") {
val conf = new SparkConf()
+ conf.set("spark.master", "yarn-client")
assert(Utils.isDynamicAllocationEnabled(conf) === false)
assert(Utils.isDynamicAllocationEnabled(
conf.set("spark.dynamicAllocation.enabled", "false")) === false)
@@ -731,6 +732,8 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
conf.set("spark.executor.instances", "1")) === false)
assert(Utils.isDynamicAllocationEnabled(
conf.set("spark.executor.instances", "0")) === true)
+ assert(Utils.isDynamicAllocationEnabled(conf.set("spark.master", "local")) === false)
+ assert(Utils.isDynamicAllocationEnabled(conf.set("spark.dynamicAllocation.testing", "true")))
}
test("encodeFileNameToURIRawPath") {