aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDhruve Ashar <dashar@yahoo-inc.com>2016-09-22 10:10:37 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-09-22 10:10:37 -0700
commit17b72d31e0c59711eddeb525becb8085930eadcc (patch)
tree89c82299dc5a3a6be368d78b9dd3caa64e1e5ec7
parent8a02410a92429bff50d6ce082f873cea9e9fa91e (diff)
downloadspark-17b72d31e0c59711eddeb525becb8085930eadcc.tar.gz
spark-17b72d31e0c59711eddeb525becb8085930eadcc.tar.bz2
spark-17b72d31e0c59711eddeb525becb8085930eadcc.zip
[SPARK-17365][CORE] Remove/Kill multiple executors together to reduce RPC call time.
## What changes were proposed in this pull request? We are killing multiple executors together instead of iterating over expensive RPC calls to kill single executor. ## How was this patch tested? Executed sample spark job to observe executors being killed/removed with dynamic allocation enabled. Author: Dhruve Ashar <dashar@yahoo-inc.com> Author: Dhruve Ashar <dhruveashar@gmail.com> Closes #15152 from dhruve/impr/SPARK-17365.
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala86
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala12
-rw-r--r--core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager3
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala135
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala6
-rw-r--r--project/MimaExcludes.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala5
11 files changed, 239 insertions, 55 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 8baddf45bf..5d47f624ac 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -54,13 +54,16 @@ private[spark] trait ExecutorAllocationClient {
/**
* Request that the cluster manager kill the specified executors.
- * @return whether the request is acknowledged by the cluster manager.
+ * @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
- def killExecutors(executorIds: Seq[String]): Boolean
+ def killExecutors(executorIds: Seq[String]): Seq[String]
/**
* Request that the cluster manager kill the specified executor.
* @return whether the request is acknowledged by the cluster manager.
*/
- def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
+ def killExecutor(executorId: String): Boolean = {
+ val killedExecutors = killExecutors(Seq(executorId))
+ killedExecutors.nonEmpty && killedExecutors(0).equals(executorId)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 6f320c5242..1366251d06 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -20,6 +20,7 @@ package org.apache.spark
import java.util.concurrent.TimeUnit
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import scala.util.control.ControlThrowable
import com.codahale.metrics.{Gauge, MetricRegistry}
@@ -279,14 +280,18 @@ private[spark] class ExecutorAllocationManager(
updateAndSyncNumExecutorsTarget(now)
+ val executorIdsToBeRemoved = ArrayBuffer[String]()
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
- removeExecutor(executorId)
+ executorIdsToBeRemoved += executorId
}
!expired
}
+ if (executorIdsToBeRemoved.nonEmpty) {
+ removeExecutors(executorIdsToBeRemoved)
+ }
}
/**
@@ -392,10 +397,66 @@ private[spark] class ExecutorAllocationManager(
}
/**
+ * Request the cluster manager to remove the given executors.
+ * Returns the list of executors which are removed.
+ */
+ private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized {
+ val executorIdsToBeRemoved = new ArrayBuffer[String]
+
+ logInfo("Request to remove executorIds: " + executors.mkString(", "))
+ val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size
+
+ var newExecutorTotal = numExistingExecutors
+ executors.foreach { executorIdToBeRemoved =>
+ if (newExecutorTotal - 1 < minNumExecutors) {
+ logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
+ s"$newExecutorTotal executor(s) left (limit $minNumExecutors)")
+ } else if (canBeKilled(executorIdToBeRemoved)) {
+ executorIdsToBeRemoved += executorIdToBeRemoved
+ newExecutorTotal -= 1
+ }
+ }
+
+ if (executorIdsToBeRemoved.isEmpty) {
+ return Seq.empty[String]
+ }
+
+ // Send a request to the backend to kill this executor(s)
+ val executorsRemoved = if (testing) {
+ executorIdsToBeRemoved
+ } else {
+ client.killExecutors(executorIdsToBeRemoved)
+ }
+ // reset the newExecutorTotal to the existing number of executors
+ newExecutorTotal = numExistingExecutors
+ if (testing || executorsRemoved.nonEmpty) {
+ executorsRemoved.foreach { removedExecutorId =>
+ newExecutorTotal -= 1
+ logInfo(s"Removing executor $removedExecutorId because it has been idle for " +
+ s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)")
+ executorsPendingToRemove.add(removedExecutorId)
+ }
+ executorsRemoved
+ } else {
+ logWarning(s"Unable to reach the cluster manager to kill executor/s " +
+ "executorIdsToBeRemoved.mkString(\",\") or no executor eligible to kill!")
+ Seq.empty[String]
+ }
+ }
+
+ /**
* Request the cluster manager to remove the given executor.
- * Return whether the request is received.
+ * Return whether the request is acknowledged.
*/
private def removeExecutor(executorId: String): Boolean = synchronized {
+ val executorsRemoved = removeExecutors(Seq(executorId))
+ executorsRemoved.nonEmpty && executorsRemoved(0) == executorId
+ }
+
+ /**
+ * Determine if the given executor can be killed.
+ */
+ private def canBeKilled(executorId: String): Boolean = synchronized {
// Do not kill the executor if we are not aware of it (should never happen)
if (!executorIds.contains(executorId)) {
logWarning(s"Attempted to remove unknown executor $executorId!")
@@ -409,26 +470,7 @@ private[spark] class ExecutorAllocationManager(
return false
}
- // Do not kill the executor if we have already reached the lower bound
- val numExistingExecutors = executorIds.size - executorsPendingToRemove.size
- if (numExistingExecutors - 1 < minNumExecutors) {
- logDebug(s"Not removing idle executor $executorId because there are only " +
- s"$numExistingExecutors executor(s) left (limit $minNumExecutors)")
- return false
- }
-
- // Send a request to the backend to kill this executor
- val removeRequestAcknowledged = testing || client.killExecutor(executorId)
- if (removeRequestAcknowledged) {
- logInfo(s"Removing executor $executorId because it has been idle for " +
- s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})")
- executorsPendingToRemove.add(executorId)
- true
- } else {
- logWarning(s"Unable to reach the cluster manager to kill executor $executorId," +
- s"or no executor eligible to kill!")
- false
- }
+ true
}
/**
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1981ad5671..f58037e100 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -73,7 +73,7 @@ import org.apache.spark.util._
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
*/
-class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
+class SparkContext(config: SparkConf) extends Logging {
// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()
@@ -534,7 +534,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
- Some(new ExecutorAllocationManager(this, listenerBus, _conf))
+ schedulerBackend match {
+ case b: ExecutorAllocationClient =>
+ Some(new ExecutorAllocationManager(
+ schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
+ case _ =>
+ None
+ }
} else {
None
}
@@ -1473,7 +1479,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
listenerBus.addListener(listener)
}
- private[spark] override def getExecutorIds(): Seq[String] = {
+ private[spark] def getExecutorIds(): Seq[String] = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.getExecutorIds()
@@ -1498,7 +1504,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is acknowledged by the cluster manager.
*/
@DeveloperApi
- override def requestTotalExecutors(
+ def requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
@@ -1518,7 +1524,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
@DeveloperApi
- override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
+ def requestExecutors(numAdditionalExecutors: Int): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
@@ -1540,10 +1546,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
@DeveloperApi
- override def killExecutors(executorIds: Seq[String]): Boolean = {
+ def killExecutors(executorIds: Seq[String]): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
- b.killExecutors(executorIds, replace = false, force = true)
+ b.killExecutors(executorIds, replace = false, force = true).nonEmpty
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
@@ -1562,7 +1568,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
@DeveloperApi
- override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
+ def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
/**
* Request that the cluster manager kill the specified executor without adjusting the
@@ -1581,7 +1587,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
- b.killExecutors(Seq(executorId), replace = true, force = true)
+ b.killExecutors(Seq(executorId), replace = true, force = true).nonEmpty
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index c6b3fdf439..edc3c19937 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -528,7 +528,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* @return whether the kill request is acknowledged. If list to kill is empty, it will return
* false.
*/
- final override def killExecutors(executorIds: Seq[String]): Boolean = {
+ final override def killExecutors(executorIds: Seq[String]): Seq[String] = {
killExecutors(executorIds, replace = false, force = false)
}
@@ -548,7 +548,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
final def killExecutors(
executorIds: Seq[String],
replace: Boolean,
- force: Boolean): Boolean = {
+ force: Boolean): Seq[String] = {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
val response = synchronized {
@@ -564,6 +564,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
.filter { id => force || !scheduler.isExecutorBusy(id) }
executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
+ logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}")
+
// If we do not wish to replace the executors we kill, sync the target number of executors
// with the cluster manager to avoid allocating new ones. When computing the new target,
// take into account executors that are pending to be added or removed.
@@ -583,7 +585,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
_ => Future.successful(false)
}
- adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
+ val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
+
+ killResponse.flatMap(killSuccessful =>
+ Future.successful (if (killSuccessful) executorsToKill else Seq.empty[String])
+ )(ThreadUtils.sameThread)
}
defaultAskTimeout.awaitResult(response)
diff --git a/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
index 757c6d2296..cf8565c74e 100644
--- a/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
+++ b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
@@ -1,2 +1,3 @@
org.apache.spark.scheduler.DummyExternalClusterManager
-org.apache.spark.scheduler.MockExternalClusterManager \ No newline at end of file
+org.apache.spark.scheduler.MockExternalClusterManager
+org.apache.spark.DummyLocalExternalClusterManager
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index c130649830..ec409712b9 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -23,7 +23,9 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.ExternalClusterManager
import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.util.ManualClock
/**
@@ -49,7 +51,7 @@ class ExecutorAllocationManagerSuite
test("verify min/max executors") {
val conf = new SparkConf()
- .setMaster("local")
+ .setMaster("myDummyLocalExternalClusterManager")
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
@@ -263,6 +265,55 @@ class ExecutorAllocationManagerSuite
assert(executorsPendingToRemove(manager).isEmpty)
}
+ test("remove multiple executors") {
+ sc = createSparkContext(5, 10, 5)
+ val manager = sc.executorAllocationManager.get
+ (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
+
+ // Keep removing until the limit is reached
+ assert(executorsPendingToRemove(manager).isEmpty)
+ assert(removeExecutors(manager, Seq("1")) === Seq("1"))
+ assert(executorsPendingToRemove(manager).size === 1)
+ assert(executorsPendingToRemove(manager).contains("1"))
+ assert(removeExecutors(manager, Seq("2", "3")) === Seq("2", "3"))
+ assert(executorsPendingToRemove(manager).size === 3)
+ assert(executorsPendingToRemove(manager).contains("2"))
+ assert(executorsPendingToRemove(manager).contains("3"))
+ assert(!removeExecutor(manager, "100")) // remove non-existent executors
+ assert(removeExecutors(manager, Seq("101", "102")) !== Seq("101", "102"))
+ assert(executorsPendingToRemove(manager).size === 3)
+ assert(removeExecutor(manager, "4"))
+ assert(removeExecutors(manager, Seq("5")) === Seq("5"))
+ assert(!removeExecutor(manager, "6")) // reached the limit of 5
+ assert(executorsPendingToRemove(manager).size === 5)
+ assert(executorsPendingToRemove(manager).contains("4"))
+ assert(executorsPendingToRemove(manager).contains("5"))
+ assert(!executorsPendingToRemove(manager).contains("6"))
+
+ // Kill executors previously requested to remove
+ onExecutorRemoved(manager, "1")
+ assert(executorsPendingToRemove(manager).size === 4)
+ assert(!executorsPendingToRemove(manager).contains("1"))
+ onExecutorRemoved(manager, "2")
+ onExecutorRemoved(manager, "3")
+ assert(executorsPendingToRemove(manager).size === 2)
+ assert(!executorsPendingToRemove(manager).contains("2"))
+ assert(!executorsPendingToRemove(manager).contains("3"))
+ onExecutorRemoved(manager, "2") // duplicates should not count
+ onExecutorRemoved(manager, "3")
+ assert(executorsPendingToRemove(manager).size === 2)
+ onExecutorRemoved(manager, "4")
+ onExecutorRemoved(manager, "5")
+ assert(executorsPendingToRemove(manager).isEmpty)
+
+ // Try removing again
+ // This should still fail because the number pending + running is still at the limit
+ assert(!removeExecutor(manager, "7"))
+ assert(executorsPendingToRemove(manager).isEmpty)
+ assert(removeExecutors(manager, Seq("8")) !== Seq("8"))
+ assert(executorsPendingToRemove(manager).isEmpty)
+ }
+
test ("interleaving add and remove") {
sc = createSparkContext(5, 10, 5)
val manager = sc.executorAllocationManager.get
@@ -283,8 +334,7 @@ class ExecutorAllocationManagerSuite
// Remove until limit
assert(removeExecutor(manager, "1"))
- assert(removeExecutor(manager, "2"))
- assert(removeExecutor(manager, "3"))
+ assert(removeExecutors(manager, Seq("2", "3")) === Seq("2", "3"))
assert(!removeExecutor(manager, "4")) // lower limit reached
assert(!removeExecutor(manager, "5"))
onExecutorRemoved(manager, "1")
@@ -296,7 +346,7 @@ class ExecutorAllocationManagerSuite
assert(addExecutors(manager) === 2) // upper limit reached
assert(addExecutors(manager) === 0)
assert(!removeExecutor(manager, "4")) // still at lower limit
- assert(!removeExecutor(manager, "5"))
+ assert((manager, Seq("5")) !== Seq("5"))
onExecutorAdded(manager, "9")
onExecutorAdded(manager, "10")
onExecutorAdded(manager, "11")
@@ -305,9 +355,7 @@ class ExecutorAllocationManagerSuite
assert(executorIds(manager).size === 10)
// Remove succeeds again, now that we are no longer at the lower limit
- assert(removeExecutor(manager, "4"))
- assert(removeExecutor(manager, "5"))
- assert(removeExecutor(manager, "6"))
+ assert(removeExecutors(manager, Seq("4", "5", "6")) === Seq("4", "5", "6"))
assert(removeExecutor(manager, "7"))
assert(executorIds(manager).size === 10)
assert(addExecutors(manager) === 0)
@@ -870,8 +918,8 @@ class ExecutorAllocationManagerSuite
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
removeExecutor(manager, "first")
- removeExecutor(manager, "second")
- assert(executorsPendingToRemove(manager) === Set("first", "second"))
+ removeExecutors(manager, Seq("second", "third"))
+ assert(executorsPendingToRemove(manager) === Set("first", "second", "third"))
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
@@ -895,7 +943,7 @@ class ExecutorAllocationManagerSuite
maxExecutors: Int = 5,
initialExecutors: Int = 1): SparkContext = {
val conf = new SparkConf()
- .setMaster("local")
+ .setMaster("myDummyLocalExternalClusterManager")
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
@@ -953,6 +1001,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val _updateAndSyncNumExecutorsTarget =
PrivateMethod[Int]('updateAndSyncNumExecutorsTarget)
private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor)
+ private val _removeExecutors = PrivateMethod[Seq[String]]('removeExecutors)
private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded)
private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved)
private val _onSchedulerBacklogged = PrivateMethod[Unit]('onSchedulerBacklogged)
@@ -1008,6 +1057,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
manager invokePrivate _removeExecutor(id)
}
+ private def removeExecutors(manager: ExecutorAllocationManager, ids: Seq[String]): Seq[String] = {
+ manager invokePrivate _removeExecutors(ids)
+ }
+
private def onExecutorAdded(manager: ExecutorAllocationManager, id: String): Unit = {
manager invokePrivate _onExecutorAdded(id)
}
@@ -1040,3 +1093,65 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
manager invokePrivate _hostToLocalTaskCount()
}
}
+
+/**
+ * A cluster manager which wraps around the scheduler and backend for local mode. It is used for
+ * testing the dynamic allocation policy.
+ */
+private class DummyLocalExternalClusterManager extends ExternalClusterManager {
+
+ def canCreate(masterURL: String): Boolean = masterURL == "myDummyLocalExternalClusterManager"
+
+ override def createTaskScheduler(
+ sc: SparkContext,
+ masterURL: String): TaskScheduler = new TaskSchedulerImpl(sc, 1, isLocal = true)
+
+ override def createSchedulerBackend(
+ sc: SparkContext,
+ masterURL: String,
+ scheduler: TaskScheduler): SchedulerBackend = {
+ val sb = new LocalSchedulerBackend(sc.getConf, scheduler.asInstanceOf[TaskSchedulerImpl], 1)
+ new DummyLocalSchedulerBackend(sc, sb)
+ }
+
+ override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
+ val sc = scheduler.asInstanceOf[TaskSchedulerImpl]
+ sc.initialize(backend)
+ }
+}
+
+/**
+ * A scheduler backend which wraps around local scheduler backend and exposes the executor
+ * allocation client interface for testing dynamic allocation.
+ */
+private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend)
+ extends SchedulerBackend with ExecutorAllocationClient {
+
+ override private[spark] def getExecutorIds(): Seq[String] = sc.getExecutorIds()
+
+ override private[spark] def requestTotalExecutors(
+ numExecutors: Int,
+ localityAwareTasks: Int,
+ hostToLocalTaskCount: Map[String, Int]): Boolean =
+ sc.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
+
+ override def requestExecutors(numAdditionalExecutors: Int): Boolean =
+ sc.requestExecutors(numAdditionalExecutors)
+
+ override def killExecutors(executorIds: Seq[String]): Seq[String] = {
+ val response = sc.killExecutors(executorIds)
+ if (response) {
+ executorIds
+ } else {
+ Seq.empty[String]
+ }
+ }
+
+ override def start(): Unit = sb.start()
+
+ override def stop(): Unit = sb.stop()
+
+ override def reviveOffers(): Unit = sb.reviveOffers()
+
+ override def defaultParallelism(): Int = sb.defaultParallelism()
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 814027076d..e29eb8552e 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -438,12 +438,12 @@ class StandaloneDynamicAllocationSuite
val executorIdToTaskCount = taskScheduler invokePrivate getMap()
executorIdToTaskCount(executors.head) = 1
// kill the busy executor without force; this should fail
- assert(!killExecutor(sc, executors.head, force = false))
+ assert(killExecutor(sc, executors.head, force = false).isEmpty)
apps = getApplications()
assert(apps.head.executors.size === 2)
// force kill busy executor
- assert(killExecutor(sc, executors.head, force = true))
+ assert(killExecutor(sc, executors.head, force = true).nonEmpty)
apps = getApplications()
// kill executor successfully
assert(apps.head.executors.size === 1)
@@ -518,7 +518,7 @@ class StandaloneDynamicAllocationSuite
}
/** Kill the given executor, specifying whether to force kill it. */
- private def killExecutor(sc: SparkContext, executorId: String, force: Boolean): Boolean = {
+ private def killExecutor(sc: SparkContext, executorId: String, force: Boolean): Seq[String] = {
syncExecutors(sc)
sc.schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index f13f3ff789..0a56a6b19e 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -818,6 +818,9 @@ object MimaExcludes {
) ++ Seq(
// [SPARK-17017] Add chiSquare selector based on False Positive Rate (FPR) test
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.feature.ChiSqSelectorModel.isSorted")
+ ) ++ Seq(
+ // [SPARK-17365][Core] Remove/Kill multiple executors together to reduce RPC call time
+ ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.SparkContext")
)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
index fb5587edec..7b29b40668 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
@@ -226,7 +226,7 @@ private[streaming] object ExecutorAllocationManager extends Logging {
conf: SparkConf,
batchDurationMs: Long,
clock: Clock): Option[ExecutorAllocationManager] = {
- if (isDynamicAllocationEnabled(conf)) {
+ if (isDynamicAllocationEnabled(conf) && client != null) {
Some(new ExecutorAllocationManager(client, receiverTracker, conf, batchDurationMs, clock))
} else None
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 79d6254eb3..dbc50da21c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -24,6 +24,7 @@ import scala.util.Failure
import org.apache.commons.lang3.SerializationUtils
+import org.apache.spark.ExecutorAllocationClient
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
import org.apache.spark.streaming._
@@ -83,8 +84,14 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
+
+ val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
+ case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
+ case _ => null
+ }
+
executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
- ssc.sparkContext,
+ executorAllocClient,
receiverTracker,
ssc.conf,
ssc.graph.batchDuration.milliseconds,
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index 7630f4a75e..b49e579071 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -380,8 +380,9 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
}
private def withStreamingContext(conf: SparkConf)(body: StreamingContext => Unit): Unit = {
- conf.setMaster("local").setAppName(this.getClass.getSimpleName).set(
- "spark.streaming.dynamicAllocation.testing", "true") // to test dynamic allocation
+ conf.setMaster("myDummyLocalExternalClusterManager")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.dynamicAllocation.testing", "true") // to test dynamic allocation
var ssc: StreamingContext = null
try {