aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org
diff options
context:
space:
mode:
authorDhruve Ashar <dashar@yahoo-inc.com>2016-09-22 10:10:37 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-09-22 10:10:37 -0700
commit17b72d31e0c59711eddeb525becb8085930eadcc (patch)
tree89c82299dc5a3a6be368d78b9dd3caa64e1e5ec7 /core/src/test/scala/org
parent8a02410a92429bff50d6ce082f873cea9e9fa91e (diff)
downloadspark-17b72d31e0c59711eddeb525becb8085930eadcc.tar.gz
spark-17b72d31e0c59711eddeb525becb8085930eadcc.tar.bz2
spark-17b72d31e0c59711eddeb525becb8085930eadcc.zip
[SPARK-17365][CORE] Remove/Kill multiple executors together to reduce RPC call time.
## What changes were proposed in this pull request? We are killing multiple executors together instead of iterating over expensive RPC calls to kill single executor. ## How was this patch tested? Executed sample spark job to observe executors being killed/removed with dynamic allocation enabled. Author: Dhruve Ashar <dashar@yahoo-inc.com> Author: Dhruve Ashar <dhruveashar@gmail.com> Closes #15152 from dhruve/impr/SPARK-17365.
Diffstat (limited to 'core/src/test/scala/org')
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala135
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala6
2 files changed, 128 insertions, 13 deletions
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index c130649830..ec409712b9 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -23,7 +23,9 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.ExternalClusterManager
import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.util.ManualClock
/**
@@ -49,7 +51,7 @@ class ExecutorAllocationManagerSuite
test("verify min/max executors") {
val conf = new SparkConf()
- .setMaster("local")
+ .setMaster("myDummyLocalExternalClusterManager")
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
@@ -263,6 +265,55 @@ class ExecutorAllocationManagerSuite
assert(executorsPendingToRemove(manager).isEmpty)
}
+ test("remove multiple executors") {
+ sc = createSparkContext(5, 10, 5)
+ val manager = sc.executorAllocationManager.get
+ (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
+
+ // Keep removing until the limit is reached
+ assert(executorsPendingToRemove(manager).isEmpty)
+ assert(removeExecutors(manager, Seq("1")) === Seq("1"))
+ assert(executorsPendingToRemove(manager).size === 1)
+ assert(executorsPendingToRemove(manager).contains("1"))
+ assert(removeExecutors(manager, Seq("2", "3")) === Seq("2", "3"))
+ assert(executorsPendingToRemove(manager).size === 3)
+ assert(executorsPendingToRemove(manager).contains("2"))
+ assert(executorsPendingToRemove(manager).contains("3"))
+ assert(!removeExecutor(manager, "100")) // remove non-existent executors
+ assert(removeExecutors(manager, Seq("101", "102")) !== Seq("101", "102"))
+ assert(executorsPendingToRemove(manager).size === 3)
+ assert(removeExecutor(manager, "4"))
+ assert(removeExecutors(manager, Seq("5")) === Seq("5"))
+ assert(!removeExecutor(manager, "6")) // reached the limit of 5
+ assert(executorsPendingToRemove(manager).size === 5)
+ assert(executorsPendingToRemove(manager).contains("4"))
+ assert(executorsPendingToRemove(manager).contains("5"))
+ assert(!executorsPendingToRemove(manager).contains("6"))
+
+ // Kill executors previously requested to remove
+ onExecutorRemoved(manager, "1")
+ assert(executorsPendingToRemove(manager).size === 4)
+ assert(!executorsPendingToRemove(manager).contains("1"))
+ onExecutorRemoved(manager, "2")
+ onExecutorRemoved(manager, "3")
+ assert(executorsPendingToRemove(manager).size === 2)
+ assert(!executorsPendingToRemove(manager).contains("2"))
+ assert(!executorsPendingToRemove(manager).contains("3"))
+ onExecutorRemoved(manager, "2") // duplicates should not count
+ onExecutorRemoved(manager, "3")
+ assert(executorsPendingToRemove(manager).size === 2)
+ onExecutorRemoved(manager, "4")
+ onExecutorRemoved(manager, "5")
+ assert(executorsPendingToRemove(manager).isEmpty)
+
+ // Try removing again
+ // This should still fail because the number pending + running is still at the limit
+ assert(!removeExecutor(manager, "7"))
+ assert(executorsPendingToRemove(manager).isEmpty)
+ assert(removeExecutors(manager, Seq("8")) !== Seq("8"))
+ assert(executorsPendingToRemove(manager).isEmpty)
+ }
+
test ("interleaving add and remove") {
sc = createSparkContext(5, 10, 5)
val manager = sc.executorAllocationManager.get
@@ -283,8 +334,7 @@ class ExecutorAllocationManagerSuite
// Remove until limit
assert(removeExecutor(manager, "1"))
- assert(removeExecutor(manager, "2"))
- assert(removeExecutor(manager, "3"))
+ assert(removeExecutors(manager, Seq("2", "3")) === Seq("2", "3"))
assert(!removeExecutor(manager, "4")) // lower limit reached
assert(!removeExecutor(manager, "5"))
onExecutorRemoved(manager, "1")
@@ -296,7 +346,7 @@ class ExecutorAllocationManagerSuite
assert(addExecutors(manager) === 2) // upper limit reached
assert(addExecutors(manager) === 0)
assert(!removeExecutor(manager, "4")) // still at lower limit
- assert(!removeExecutor(manager, "5"))
+ assert((manager, Seq("5")) !== Seq("5"))
onExecutorAdded(manager, "9")
onExecutorAdded(manager, "10")
onExecutorAdded(manager, "11")
@@ -305,9 +355,7 @@ class ExecutorAllocationManagerSuite
assert(executorIds(manager).size === 10)
// Remove succeeds again, now that we are no longer at the lower limit
- assert(removeExecutor(manager, "4"))
- assert(removeExecutor(manager, "5"))
- assert(removeExecutor(manager, "6"))
+ assert(removeExecutors(manager, Seq("4", "5", "6")) === Seq("4", "5", "6"))
assert(removeExecutor(manager, "7"))
assert(executorIds(manager).size === 10)
assert(addExecutors(manager) === 0)
@@ -870,8 +918,8 @@ class ExecutorAllocationManagerSuite
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
removeExecutor(manager, "first")
- removeExecutor(manager, "second")
- assert(executorsPendingToRemove(manager) === Set("first", "second"))
+ removeExecutors(manager, Seq("second", "third"))
+ assert(executorsPendingToRemove(manager) === Set("first", "second", "third"))
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
@@ -895,7 +943,7 @@ class ExecutorAllocationManagerSuite
maxExecutors: Int = 5,
initialExecutors: Int = 1): SparkContext = {
val conf = new SparkConf()
- .setMaster("local")
+ .setMaster("myDummyLocalExternalClusterManager")
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
@@ -953,6 +1001,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val _updateAndSyncNumExecutorsTarget =
PrivateMethod[Int]('updateAndSyncNumExecutorsTarget)
private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor)
+ private val _removeExecutors = PrivateMethod[Seq[String]]('removeExecutors)
private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded)
private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved)
private val _onSchedulerBacklogged = PrivateMethod[Unit]('onSchedulerBacklogged)
@@ -1008,6 +1057,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
manager invokePrivate _removeExecutor(id)
}
+ private def removeExecutors(manager: ExecutorAllocationManager, ids: Seq[String]): Seq[String] = {
+ manager invokePrivate _removeExecutors(ids)
+ }
+
private def onExecutorAdded(manager: ExecutorAllocationManager, id: String): Unit = {
manager invokePrivate _onExecutorAdded(id)
}
@@ -1040,3 +1093,65 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
manager invokePrivate _hostToLocalTaskCount()
}
}
+
+/**
+ * A cluster manager which wraps around the scheduler and backend for local mode. It is used for
+ * testing the dynamic allocation policy.
+ */
+private class DummyLocalExternalClusterManager extends ExternalClusterManager {
+
+ def canCreate(masterURL: String): Boolean = masterURL == "myDummyLocalExternalClusterManager"
+
+ override def createTaskScheduler(
+ sc: SparkContext,
+ masterURL: String): TaskScheduler = new TaskSchedulerImpl(sc, 1, isLocal = true)
+
+ override def createSchedulerBackend(
+ sc: SparkContext,
+ masterURL: String,
+ scheduler: TaskScheduler): SchedulerBackend = {
+ val sb = new LocalSchedulerBackend(sc.getConf, scheduler.asInstanceOf[TaskSchedulerImpl], 1)
+ new DummyLocalSchedulerBackend(sc, sb)
+ }
+
+ override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
+ val sc = scheduler.asInstanceOf[TaskSchedulerImpl]
+ sc.initialize(backend)
+ }
+}
+
+/**
+ * A scheduler backend which wraps around local scheduler backend and exposes the executor
+ * allocation client interface for testing dynamic allocation.
+ */
+private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend)
+ extends SchedulerBackend with ExecutorAllocationClient {
+
+ override private[spark] def getExecutorIds(): Seq[String] = sc.getExecutorIds()
+
+ override private[spark] def requestTotalExecutors(
+ numExecutors: Int,
+ localityAwareTasks: Int,
+ hostToLocalTaskCount: Map[String, Int]): Boolean =
+ sc.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
+
+ override def requestExecutors(numAdditionalExecutors: Int): Boolean =
+ sc.requestExecutors(numAdditionalExecutors)
+
+ override def killExecutors(executorIds: Seq[String]): Seq[String] = {
+ val response = sc.killExecutors(executorIds)
+ if (response) {
+ executorIds
+ } else {
+ Seq.empty[String]
+ }
+ }
+
+ override def start(): Unit = sb.start()
+
+ override def stop(): Unit = sb.stop()
+
+ override def reviveOffers(): Unit = sb.reviveOffers()
+
+ override def defaultParallelism(): Int = sb.defaultParallelism()
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 814027076d..e29eb8552e 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -438,12 +438,12 @@ class StandaloneDynamicAllocationSuite
val executorIdToTaskCount = taskScheduler invokePrivate getMap()
executorIdToTaskCount(executors.head) = 1
// kill the busy executor without force; this should fail
- assert(!killExecutor(sc, executors.head, force = false))
+ assert(killExecutor(sc, executors.head, force = false).isEmpty)
apps = getApplications()
assert(apps.head.executors.size === 2)
// force kill busy executor
- assert(killExecutor(sc, executors.head, force = true))
+ assert(killExecutor(sc, executors.head, force = true).nonEmpty)
apps = getApplications()
// kill executor successfully
assert(apps.head.executors.size === 1)
@@ -518,7 +518,7 @@ class StandaloneDynamicAllocationSuite
}
/** Kill the given executor, specifying whether to force kill it. */
- private def killExecutor(sc: SparkContext, executorId: String, force: Boolean): Boolean = {
+ private def killExecutor(sc: SparkContext, executorId: String, force: Boolean): Seq[String] = {
syncExecutors(sc)
sc.schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>