aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-04-06 15:46:20 -0700
committerAndrew Or <andrew@databricks.com>2016-04-06 15:46:20 -0700
commit9af5423ec28258becf27dbe89833b4f7d324d26a (patch)
tree2578d6e122b485e955b6ae389c7f36a46cd051a2
parentde4792605ad94d3d7548a2139372bb6cac331079 (diff)
downloadspark-9af5423ec28258becf27dbe89833b4f7d324d26a.tar.gz
spark-9af5423ec28258becf27dbe89833b4f7d324d26a.tar.bz2
spark-9af5423ec28258becf27dbe89833b4f7d324d26a.zip
[SPARK-12133][STREAMING] Streaming dynamic allocation
## What changes were proposed in this pull request? Added a new Executor Allocation Manager for the Streaming scheduler for doing Streaming Dynamic Allocation. ## How was this patch tested Unit tests, and cluster tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #12154 from tdas/streaming-dynamic-allocation.
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala233
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala14
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala19
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala395
8 files changed, 683 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 842bfdbadc..8baddf45bf 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -23,6 +23,10 @@ package org.apache.spark
*/
private[spark] trait ExecutorAllocationClient {
+
+ /** Get the list of currently active executors */
+ private[spark] def getExecutorIds(): Seq[String]
+
/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4b3264cbf5..c40fada64b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1360,6 +1360,16 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
listenerBus.addListener(listener)
}
+ private[spark] override def getExecutorIds(): Seq[String] = {
+ schedulerBackend match {
+ case b: CoarseGrainedSchedulerBackend =>
+ b.getExecutorIds()
+ case _ =>
+ logWarning("Requesting executors is only supported in coarse-grained mode")
+ Nil
+ }
+ }
+
/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f71bfd489d..e5abf0e150 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -430,6 +430,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
*/
private def numExistingExecutors: Int = executorDataMap.size
+ override def getExecutorIds(): Seq[String] = {
+ executorDataMap.keySet.toSeq
+ }
+
/**
* Request an additional number of executors from the cluster manager.
* @return whether the request is acknowledged.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 83a1092b16..cc187f5cb4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -43,7 +43,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
+import org.apache.spark.streaming.scheduler.{ExecutorAllocationManager, JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, Utils}
@@ -527,11 +527,12 @@ class StreamingContext private[streaming] (
}
}
- if (Utils.isDynamicAllocationEnabled(sc.conf)) {
+ if (Utils.isDynamicAllocationEnabled(sc.conf) ||
+ ExecutorAllocationManager.isDynamicAllocationEnabled(conf)) {
logWarning("Dynamic Allocation is enabled for this application. " +
"Enabling Dynamic allocation for Spark Streaming applications can cause data loss if " +
"Write Ahead Log is not enabled for non-replayable sources like Flume. " +
- "See the programming guide for details on how to enable the Write Ahead Log")
+ "See the programming guide for details on how to enable the Write Ahead Log.")
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
new file mode 100644
index 0000000000..f7b6584893
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.streaming.scheduler
+
+import scala.util.Random
+
+import org.apache.spark.{ExecutorAllocationClient, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.util.RecurringTimer
+import org.apache.spark.util.{Clock, Utils}
+
+/**
+ * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
+ * executors based on the statistics of the streaming computation. This is different from the core
+ * dynamic allocation policy; the core policy relies on executors being idle for a while, but the
+ * micro-batch model of streaming prevents any particular executors from being idle for a long
+ * time. Instead, the measure of "idle-ness" needs to be based on the time taken to process
+ * each batch.
+ *
+ * At a high level, the policy implemented by this class is as follows:
+ * - Use StreamingListener interface get batch processing times of completed batches
+ * - Periodically take the average batch completion times and compare with the batch interval
+ * - If (avg. proc. time / batch interval) >= scaling up ratio, then request more executors.
+ * The number of executors requested is based on the ratio = (avg. proc. time / batch interval).
+ * - If (avg. proc. time / batch interval) <= scaling down ratio, then try to kill a executor that
+ * is not running a receiver.
+ *
+ * This features should ideally be used in conjunction with backpressure, as backpressure ensures
+ * system stability, while executors are being readjusted.
+ */
+private[streaming] class ExecutorAllocationManager(
+ client: ExecutorAllocationClient,
+ receiverTracker: ReceiverTracker,
+ conf: SparkConf,
+ batchDurationMs: Long,
+ clock: Clock) extends StreamingListener with Logging {
+
+ import ExecutorAllocationManager._
+
+ private val scalingIntervalSecs = conf.getTimeAsSeconds(
+ SCALING_INTERVAL_KEY,
+ s"${SCALING_INTERVAL_DEFAULT_SECS}s")
+ private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY, SCALING_UP_RATIO_DEFAULT)
+ private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY, SCALING_DOWN_RATIO_DEFAULT)
+ private val minNumExecutors = conf.getInt(
+ MIN_EXECUTORS_KEY,
+ math.max(1, receiverTracker.numReceivers))
+ private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY, Integer.MAX_VALUE)
+ private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
+ _ => manageAllocation(), "streaming-executor-allocation-manager")
+
+ @volatile private var batchProcTimeSum = 0L
+ @volatile private var batchProcTimeCount = 0
+
+ validateSettings()
+
+ def start(): Unit = {
+ timer.start()
+ logInfo(s"ExecutorAllocationManager started with " +
+ s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = $scalingIntervalSecs sec")
+ }
+
+ def stop(): Unit = {
+ timer.stop(interruptTimer = true)
+ logInfo("ExecutorAllocationManager stopped")
+ }
+
+ /**
+ * Manage executor allocation by requesting or killing executors based on the collected
+ * batch statistics.
+ */
+ private def manageAllocation(): Unit = synchronized {
+ logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]")
+ if (batchProcTimeCount > 0) {
+ val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
+ val ratio = averageBatchProcTime.toDouble / batchDurationMs
+ logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
+ if (ratio >= scalingUpRatio) {
+ logDebug("Requesting executors")
+ val numNewExecutors = math.max(math.round(ratio).toInt, 1)
+ requestExecutors(numNewExecutors)
+ } else if (ratio <= scalingDownRatio) {
+ logDebug("Killing executors")
+ killExecutor()
+ }
+ }
+ batchProcTimeSum = 0
+ batchProcTimeCount = 0
+ }
+
+ /** Request the specified number of executors over the currently active one */
+ private def requestExecutors(numNewExecutors: Int): Unit = {
+ require(numNewExecutors >= 1)
+ val allExecIds = client.getExecutorIds()
+ logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
+ val targetTotalExecutors =
+ math.max(math.min(maxNumExecutors, allExecIds.size + numNewExecutors), minNumExecutors)
+ client.requestTotalExecutors(targetTotalExecutors, 0, Map.empty)
+ logInfo(s"Requested total $targetTotalExecutors executors")
+ }
+
+ /** Kill an executor that is not running any receiver, if possible */
+ private def killExecutor(): Unit = {
+ val allExecIds = client.getExecutorIds()
+ logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
+
+ if (allExecIds.nonEmpty && allExecIds.size > minNumExecutors) {
+ val execIdsWithReceivers = receiverTracker.allocatedExecutors.values.flatten.toSeq
+ logInfo(s"Executors with receivers (${execIdsWithReceivers.size}): ${execIdsWithReceivers}")
+
+ val removableExecIds = allExecIds.diff(execIdsWithReceivers)
+ logDebug(s"Removable executors (${removableExecIds.size}): ${removableExecIds}")
+ if (removableExecIds.nonEmpty) {
+ val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size))
+ client.killExecutor(execIdToRemove)
+ logInfo(s"Requested to kill executor $execIdToRemove")
+ } else {
+ logInfo(s"No non-receiver executors to kill")
+ }
+ } else {
+ logInfo("No available executor to kill")
+ }
+ }
+
+ private def addBatchProcTime(timeMs: Long): Unit = synchronized {
+ batchProcTimeSum += timeMs
+ batchProcTimeCount += 1
+ logDebug(
+ s"Added batch processing time $timeMs, sum = $batchProcTimeSum, count = $batchProcTimeCount")
+ }
+
+ private def validateSettings(): Unit = {
+ require(
+ scalingIntervalSecs > 0,
+ s"Config $SCALING_INTERVAL_KEY must be more than 0")
+
+ require(
+ scalingUpRatio > 0,
+ s"Config $SCALING_UP_RATIO_KEY must be more than 0")
+
+ require(
+ scalingDownRatio > 0,
+ s"Config $SCALING_DOWN_RATIO_KEY must be more than 0")
+
+ require(
+ minNumExecutors > 0,
+ s"Config $MIN_EXECUTORS_KEY must be more than 0")
+
+ require(
+ maxNumExecutors > 0,
+ s"$MAX_EXECUTORS_KEY must be more than 0")
+
+ require(
+ scalingUpRatio > scalingDownRatio,
+ s"Config $SCALING_UP_RATIO_KEY must be more than config $SCALING_DOWN_RATIO_KEY")
+
+ if (conf.contains(MIN_EXECUTORS_KEY) && conf.contains(MAX_EXECUTORS_KEY)) {
+ require(
+ maxNumExecutors >= minNumExecutors,
+ s"Config $MAX_EXECUTORS_KEY must be more than config $MIN_EXECUTORS_KEY")
+ }
+ }
+
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
+ logDebug("onBatchCompleted called: " + batchCompleted)
+ if (!batchCompleted.batchInfo.outputOperationInfos.values.exists(_.failureReason.nonEmpty)) {
+ batchCompleted.batchInfo.processingDelay.foreach(addBatchProcTime)
+ }
+ }
+}
+
+private[streaming] object ExecutorAllocationManager extends Logging {
+ val ENABLED_KEY = "spark.streaming.dynamicAllocation.enabled"
+
+ val SCALING_INTERVAL_KEY = "spark.streaming.dynamicAllocation.scalingInterval"
+ val SCALING_INTERVAL_DEFAULT_SECS = 60
+
+ val SCALING_UP_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingUpRatio"
+ val SCALING_UP_RATIO_DEFAULT = 0.9
+
+ val SCALING_DOWN_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingDownRatio"
+ val SCALING_DOWN_RATIO_DEFAULT = 0.3
+
+ val MIN_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.minExecutors"
+
+ val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors"
+
+ def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
+ val numExecutor = conf.getInt("spark.executor.instances", 0)
+ val streamingDynamicAllocationEnabled = conf.getBoolean(ENABLED_KEY, false)
+ if (numExecutor != 0 && streamingDynamicAllocationEnabled) {
+ throw new IllegalArgumentException(
+ "Dynamic Allocation for streaming cannot be enabled while spark.executor.instances is set.")
+ }
+ if (Utils.isDynamicAllocationEnabled(conf) && streamingDynamicAllocationEnabled) {
+ throw new IllegalArgumentException(
+ """
+ |Dynamic Allocation cannot be enabled for both streaming and core at the same time.
+ |Please disable core Dynamic Allocation by setting spark.dynamicAllocation.enabled to
+ |false to use Dynamic Allocation in streaming.
+ """.stripMargin)
+ }
+ val testing = conf.getBoolean("spark.streaming.dynamicAllocation.testing", false)
+ numExecutor == 0 && streamingDynamicAllocationEnabled && (!Utils.isLocalMaster(conf) || testing)
+ }
+
+ def createIfEnabled(
+ client: ExecutorAllocationClient,
+ receiverTracker: ReceiverTracker,
+ conf: SparkConf,
+ batchDurationMs: Long,
+ clock: Clock): Option[ExecutorAllocationManager] = {
+ if (isDynamicAllocationEnabled(conf)) {
+ Some(new ExecutorAllocationManager(client, receiverTracker, conf, batchDurationMs, clock))
+ } else None
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 61f9e0974c..303c325274 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -57,6 +57,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// A tracker to track all the input stream information as well as processed record number
var inputInfoTracker: InputInfoTracker = null
+ private var executorAllocationManager: Option[ExecutorAllocationManager] = None
+
private var eventLoop: EventLoop[JobSchedulerEvent] = null
def start(): Unit = synchronized {
@@ -79,8 +81,16 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
+ executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
+ ssc.sparkContext,
+ receiverTracker,
+ ssc.conf,
+ ssc.graph.batchDuration.milliseconds,
+ clock)
+ executorAllocationManager.foreach(ssc.addStreamingListener)
receiverTracker.start()
jobGenerator.start()
+ executorAllocationManager.foreach(_.start())
logInfo("Started JobScheduler")
}
@@ -93,6 +103,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
receiverTracker.stop(processAllReceivedData)
}
+ if (executorAllocationManager != null) {
+ executorAllocationManager.foreach(_.stop())
+ }
+
// Second, stop generating jobs. If it has to process all received data,
// then this will wait for all the processing through JobScheduler to be over.
jobGenerator.stop(processAllReceivedData)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index b3ae287001..d67f70732d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -92,6 +92,8 @@ private[streaming] case object AllReceiverIds extends ReceiverTrackerLocalMessag
private[streaming] case class UpdateReceiverRateLimit(streamUID: Int, newRate: Long)
extends ReceiverTrackerLocalMessage
+private[streaming] case object GetAllReceiverInfo extends ReceiverTrackerLocalMessage
+
/**
* This class manages the execution of the receivers of ReceiverInputDStreams. Instance of
* this class must be created after all input streams have been added and StreamingContext.start()
@@ -234,6 +236,20 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
}
+ /**
+ * Get the executors allocated to each receiver.
+ * @return a map containing receiver ids to optional executor ids.
+ */
+ def allocatedExecutors(): Map[Int, Option[String]] = {
+ endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
+ _.runningExecutor.map { _.executorId }
+ }
+ }
+
+ def numReceivers(): Int = {
+ receiverInputStreams.size
+ }
+
/** Register a receiver */
private def registerReceiver(
streamId: Int,
@@ -506,9 +522,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
case DeregisterReceiver(streamId, message, error) =>
deregisterReceiver(streamId, message, error)
context.reply(true)
+
// Local messages
case AllReceiverIds =>
context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
+ case GetAllReceiverInfo =>
+ context.reply(receiverTrackingInfos.toMap)
case StopAllReceivers =>
assert(isTrackerStopping || isTrackerStopped)
stopReceivers()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
new file mode 100644
index 0000000000..7630f4a75e
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.mockito.Matchers.{eq => meq}
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, PrivateMethodTester}
+import org.scalatest.concurrent.Eventually.{eventually, timeout}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkFunSuite}
+import org.apache.spark.streaming.{DummyInputDStream, Seconds, StreamingContext}
+import org.apache.spark.util.{ManualClock, Utils}
+
+
+class ExecutorAllocationManagerSuite extends SparkFunSuite
+ with BeforeAndAfter with BeforeAndAfterAll with MockitoSugar with PrivateMethodTester {
+
+ import ExecutorAllocationManager._
+
+ private val batchDurationMillis = 1000L
+ private var allocationClient: ExecutorAllocationClient = null
+ private var clock: ManualClock = null
+
+ before {
+ allocationClient = mock[ExecutorAllocationClient]
+ clock = new ManualClock()
+ }
+
+ test("basic functionality") {
+ // Test that adding batch processing time info to allocation manager
+ // causes executors to be requested and killed accordingly
+
+ // There is 1 receiver, and exec 1 has been allocated to it
+ withAllocationManager(numReceivers = 1) { case (receiverTracker, allocationManager) =>
+ when(receiverTracker.allocatedExecutors).thenReturn(Map(1 -> Some("1")))
+
+ /** Add data point for batch processing time and verify executor allocation */
+ def addBatchProcTimeAndVerifyAllocation(batchProcTimeMs: Double)(body: => Unit): Unit = {
+ // 2 active executors
+ reset(allocationClient)
+ when(allocationClient.getExecutorIds()).thenReturn(Seq("1", "2"))
+ addBatchProcTime(allocationManager, batchProcTimeMs.toLong)
+ clock.advance(SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1)
+ eventually(timeout(10 seconds)) {
+ body
+ }
+ }
+
+ /** Verify that the expected number of total executor were requested */
+ def verifyTotalRequestedExecs(expectedRequestedTotalExecs: Option[Int]): Unit = {
+ if (expectedRequestedTotalExecs.nonEmpty) {
+ require(expectedRequestedTotalExecs.get > 0)
+ verify(allocationClient, times(1)).requestTotalExecutors(
+ meq(expectedRequestedTotalExecs.get), meq(0), meq(Map.empty))
+ } else {
+ verify(allocationClient, never).requestTotalExecutors(0, 0, Map.empty)
+ }
+ }
+
+ /** Verify that a particular executor was killed */
+ def verifyKilledExec(expectedKilledExec: Option[String]): Unit = {
+ if (expectedKilledExec.nonEmpty) {
+ verify(allocationClient, times(1)).killExecutor(meq(expectedKilledExec.get))
+ } else {
+ verify(allocationClient, never).killExecutor(null)
+ }
+ }
+
+ // Batch proc time = batch interval, should increase allocation by 1
+ addBatchProcTimeAndVerifyAllocation(batchDurationMillis) {
+ verifyTotalRequestedExecs(Some(3)) // one already allocated, increase allocation by 1
+ verifyKilledExec(None)
+ }
+
+ // Batch proc time = batch interval * 2, should increase allocation by 2
+ addBatchProcTimeAndVerifyAllocation(batchDurationMillis * 2) {
+ verifyTotalRequestedExecs(Some(4))
+ verifyKilledExec(None)
+ }
+
+ // Batch proc time slightly more than the scale up ratio, should increase allocation by 1
+ addBatchProcTimeAndVerifyAllocation(batchDurationMillis * SCALING_UP_RATIO_DEFAULT + 1) {
+ verifyTotalRequestedExecs(Some(3))
+ verifyKilledExec(None)
+ }
+
+ // Batch proc time slightly less than the scale up ratio, should not change allocation
+ addBatchProcTimeAndVerifyAllocation(batchDurationMillis * SCALING_UP_RATIO_DEFAULT - 1) {
+ verifyTotalRequestedExecs(None)
+ verifyKilledExec(None)
+ }
+
+ // Batch proc time slightly more than the scale down ratio, should not change allocation
+ addBatchProcTimeAndVerifyAllocation(batchDurationMillis * SCALING_DOWN_RATIO_DEFAULT + 1) {
+ verifyTotalRequestedExecs(None)
+ verifyKilledExec(None)
+ }
+
+ // Batch proc time slightly more than the scale down ratio, should not change allocation
+ addBatchProcTimeAndVerifyAllocation(batchDurationMillis * SCALING_DOWN_RATIO_DEFAULT - 1) {
+ verifyTotalRequestedExecs(None)
+ verifyKilledExec(Some("2"))
+ }
+ }
+ }
+
+ test("requestExecutors policy") {
+
+ /** Verify that the expected number of total executor were requested */
+ def verifyRequestedExecs(
+ numExecs: Int,
+ numNewExecs: Int,
+ expectedRequestedTotalExecs: Int)(
+ implicit allocationManager: ExecutorAllocationManager): Unit = {
+ reset(allocationClient)
+ when(allocationClient.getExecutorIds()).thenReturn((1 to numExecs).map(_.toString))
+ requestExecutors(allocationManager, numNewExecs)
+ verify(allocationClient, times(1)).requestTotalExecutors(
+ meq(expectedRequestedTotalExecs), meq(0), meq(Map.empty))
+ }
+
+ withAllocationManager(numReceivers = 1) { case (_, allocationManager) =>
+ implicit val am = allocationManager
+ intercept[IllegalArgumentException] {
+ verifyRequestedExecs(numExecs = 0, numNewExecs = 0, 0)
+ }
+ verifyRequestedExecs(numExecs = 0, numNewExecs = 1, expectedRequestedTotalExecs = 1)
+ verifyRequestedExecs(numExecs = 1, numNewExecs = 1, expectedRequestedTotalExecs = 2)
+ verifyRequestedExecs(numExecs = 2, numNewExecs = 2, expectedRequestedTotalExecs = 4)
+ }
+
+ withAllocationManager(numReceivers = 2) { case(_, allocationManager) =>
+ implicit val am = allocationManager
+
+ verifyRequestedExecs(numExecs = 0, numNewExecs = 1, expectedRequestedTotalExecs = 2)
+ verifyRequestedExecs(numExecs = 1, numNewExecs = 1, expectedRequestedTotalExecs = 2)
+ verifyRequestedExecs(numExecs = 2, numNewExecs = 2, expectedRequestedTotalExecs = 4)
+ }
+
+ withAllocationManager(
+ // Test min 2 executors
+ new SparkConf().set("spark.streaming.dynamicAllocation.minExecutors", "2")) {
+ case (_, allocationManager) =>
+ implicit val am = allocationManager
+
+ verifyRequestedExecs(numExecs = 0, numNewExecs = 1, expectedRequestedTotalExecs = 2)
+ verifyRequestedExecs(numExecs = 0, numNewExecs = 3, expectedRequestedTotalExecs = 3)
+ verifyRequestedExecs(numExecs = 1, numNewExecs = 1, expectedRequestedTotalExecs = 2)
+ verifyRequestedExecs(numExecs = 1, numNewExecs = 2, expectedRequestedTotalExecs = 3)
+ verifyRequestedExecs(numExecs = 2, numNewExecs = 1, expectedRequestedTotalExecs = 3)
+ verifyRequestedExecs(numExecs = 2, numNewExecs = 2, expectedRequestedTotalExecs = 4)
+ }
+
+ withAllocationManager(
+ // Test with max 2 executors
+ new SparkConf().set("spark.streaming.dynamicAllocation.maxExecutors", "2")) {
+ case (_, allocationManager) =>
+ implicit val am = allocationManager
+
+ verifyRequestedExecs(numExecs = 0, numNewExecs = 1, expectedRequestedTotalExecs = 1)
+ verifyRequestedExecs(numExecs = 0, numNewExecs = 3, expectedRequestedTotalExecs = 2)
+ verifyRequestedExecs(numExecs = 1, numNewExecs = 2, expectedRequestedTotalExecs = 2)
+ verifyRequestedExecs(numExecs = 2, numNewExecs = 1, expectedRequestedTotalExecs = 2)
+ verifyRequestedExecs(numExecs = 2, numNewExecs = 2, expectedRequestedTotalExecs = 2)
+ }
+ }
+
+ test("killExecutor policy") {
+
+ /**
+ * Verify that a particular executor was killed, given active executors and executors
+ * allocated to receivers.
+ */
+ def verifyKilledExec(
+ execIds: Seq[String],
+ receiverExecIds: Map[Int, Option[String]],
+ expectedKilledExec: Option[String])(
+ implicit x: (ReceiverTracker, ExecutorAllocationManager)): Unit = {
+ val (receiverTracker, allocationManager) = x
+
+ reset(allocationClient)
+ when(allocationClient.getExecutorIds()).thenReturn(execIds)
+ when(receiverTracker.allocatedExecutors).thenReturn(receiverExecIds)
+ killExecutor(allocationManager)
+ if (expectedKilledExec.nonEmpty) {
+ verify(allocationClient, times(1)).killExecutor(meq(expectedKilledExec.get))
+ } else {
+ verify(allocationClient, never).killExecutor(null)
+ }
+ }
+
+ withAllocationManager() { case (receiverTracker, allocationManager) =>
+ implicit val rcvrTrackerAndExecAllocMgr = (receiverTracker, allocationManager)
+
+ verifyKilledExec(Nil, Map.empty, None)
+ verifyKilledExec(Seq("1", "2"), Map.empty, None)
+ verifyKilledExec(Seq("1"), Map(1 -> Some("1")), None)
+ verifyKilledExec(Seq("1", "2"), Map(1 -> Some("1")), Some("2"))
+ verifyKilledExec(Seq("1", "2"), Map(1 -> Some("1"), 2 -> Some("2")), None)
+ }
+
+ withAllocationManager(
+ new SparkConf().set("spark.streaming.dynamicAllocation.minExecutors", "2")) {
+ case (receiverTracker, allocationManager) =>
+ implicit val rcvrTrackerAndExecAllocMgr = (receiverTracker, allocationManager)
+
+ verifyKilledExec(Seq("1", "2"), Map.empty, None)
+ verifyKilledExec(Seq("1", "2", "3"), Map(1 -> Some("1"), 2 -> Some("2")), Some("3"))
+ }
+ }
+
+ test("parameter validation") {
+
+ def validateParams(
+ numReceivers: Int = 1,
+ scalingIntervalSecs: Option[Int] = None,
+ scalingUpRatio: Option[Double] = None,
+ scalingDownRatio: Option[Double] = None,
+ minExecs: Option[Int] = None,
+ maxExecs: Option[Int] = None): Unit = {
+ require(numReceivers > 0)
+ val receiverTracker = mock[ReceiverTracker]
+ when(receiverTracker.numReceivers()).thenReturn(numReceivers)
+ val conf = new SparkConf()
+ if (scalingIntervalSecs.nonEmpty) {
+ conf.set(
+ "spark.streaming.dynamicAllocation.scalingInterval",
+ s"${scalingIntervalSecs.get}s")
+ }
+ if (scalingUpRatio.nonEmpty) {
+ conf.set("spark.streaming.dynamicAllocation.scalingUpRatio", scalingUpRatio.get.toString)
+ }
+ if (scalingDownRatio.nonEmpty) {
+ conf.set(
+ "spark.streaming.dynamicAllocation.scalingDownRatio",
+ scalingDownRatio.get.toString)
+ }
+ if (minExecs.nonEmpty) {
+ conf.set("spark.streaming.dynamicAllocation.minExecutors", minExecs.get.toString)
+ }
+ if (maxExecs.nonEmpty) {
+ conf.set("spark.streaming.dynamicAllocation.maxExecutors", maxExecs.get.toString)
+ }
+ new ExecutorAllocationManager(
+ allocationClient, receiverTracker, conf, batchDurationMillis, clock)
+ }
+
+ validateParams(numReceivers = 1)
+ validateParams(numReceivers = 2, minExecs = Some(1))
+ validateParams(numReceivers = 2, minExecs = Some(3))
+ validateParams(numReceivers = 2, maxExecs = Some(3))
+ validateParams(numReceivers = 2, maxExecs = Some(1))
+ validateParams(minExecs = Some(3), maxExecs = Some(3))
+ validateParams(scalingIntervalSecs = Some(1))
+ validateParams(scalingUpRatio = Some(1.1))
+ validateParams(scalingDownRatio = Some(0.1))
+ validateParams(scalingUpRatio = Some(1.1), scalingDownRatio = Some(0.1))
+
+ intercept[IllegalArgumentException] {
+ validateParams(minExecs = Some(0))
+ }
+ intercept[IllegalArgumentException] {
+ validateParams(minExecs = Some(-1))
+ }
+ intercept[IllegalArgumentException] {
+ validateParams(maxExecs = Some(0))
+ }
+ intercept[IllegalArgumentException] {
+ validateParams(maxExecs = Some(-1))
+ }
+ intercept[IllegalArgumentException] {
+ validateParams(minExecs = Some(4), maxExecs = Some(3))
+ }
+ intercept[IllegalArgumentException] {
+ validateParams(scalingIntervalSecs = Some(-1))
+ }
+ intercept[IllegalArgumentException] {
+ validateParams(scalingIntervalSecs = Some(0))
+ }
+ intercept[IllegalArgumentException] {
+ validateParams(scalingUpRatio = Some(-0.1))
+ }
+ intercept[IllegalArgumentException] {
+ validateParams(scalingUpRatio = Some(0))
+ }
+ intercept[IllegalArgumentException] {
+ validateParams(scalingDownRatio = Some(-0.1))
+ }
+ intercept[IllegalArgumentException] {
+ validateParams(scalingDownRatio = Some(0))
+ }
+ intercept[IllegalArgumentException] {
+ validateParams(scalingUpRatio = Some(0.5), scalingDownRatio = Some(0.5))
+ }
+ intercept[IllegalArgumentException] {
+ validateParams(scalingUpRatio = Some(0.3), scalingDownRatio = Some(0.5))
+ }
+ }
+
+ test("enabling and disabling") {
+ withStreamingContext(new SparkConf()) { ssc =>
+ ssc.start()
+ assert(getExecutorAllocationManager(ssc).isEmpty)
+ }
+
+ withStreamingContext(
+ new SparkConf().set("spark.streaming.dynamicAllocation.enabled", "true")) { ssc =>
+ ssc.start()
+ assert(getExecutorAllocationManager(ssc).nonEmpty)
+ }
+
+ val confWithBothDynamicAllocationEnabled = new SparkConf()
+ .set("spark.streaming.dynamicAllocation.enabled", "true")
+ .set("spark.dynamicAllocation.enabled", "true")
+ .set("spark.dynamicAllocation.testing", "true")
+ require(Utils.isDynamicAllocationEnabled(confWithBothDynamicAllocationEnabled) === true)
+ withStreamingContext(confWithBothDynamicAllocationEnabled) { ssc =>
+ intercept[IllegalArgumentException] {
+ ssc.start()
+ }
+ }
+ }
+
+ private def withAllocationManager(
+ conf: SparkConf = new SparkConf,
+ numReceivers: Int = 1
+ )(body: (ReceiverTracker, ExecutorAllocationManager) => Unit): Unit = {
+
+ val receiverTracker = mock[ReceiverTracker]
+ when(receiverTracker.numReceivers()).thenReturn(numReceivers)
+
+ val manager = new ExecutorAllocationManager(
+ allocationClient, receiverTracker, conf, batchDurationMillis, clock)
+ try {
+ manager.start()
+ body(receiverTracker, manager)
+ } finally {
+ manager.stop()
+ }
+ }
+
+ private val _addBatchProcTime = PrivateMethod[Unit]('addBatchProcTime)
+ private val _requestExecutors = PrivateMethod[Unit]('requestExecutors)
+ private val _killExecutor = PrivateMethod[Unit]('killExecutor)
+ private val _executorAllocationManager =
+ PrivateMethod[Option[ExecutorAllocationManager]]('executorAllocationManager)
+
+ private def addBatchProcTime(manager: ExecutorAllocationManager, timeMs: Long): Unit = {
+ manager invokePrivate _addBatchProcTime(timeMs)
+ }
+
+ private def requestExecutors(manager: ExecutorAllocationManager, newExecs: Int): Unit = {
+ manager invokePrivate _requestExecutors(newExecs)
+ }
+
+ private def killExecutor(manager: ExecutorAllocationManager): Unit = {
+ manager invokePrivate _killExecutor()
+ }
+
+ private def getExecutorAllocationManager(
+ ssc: StreamingContext): Option[ExecutorAllocationManager] = {
+ ssc.scheduler invokePrivate _executorAllocationManager()
+ }
+
+ private def withStreamingContext(conf: SparkConf)(body: StreamingContext => Unit): Unit = {
+ conf.setMaster("local").setAppName(this.getClass.getSimpleName).set(
+ "spark.streaming.dynamicAllocation.testing", "true") // to test dynamic allocation
+
+ var ssc: StreamingContext = null
+ try {
+ ssc = new StreamingContext(conf, Seconds(1))
+ new DummyInputDStream(ssc).foreachRDD(_ => { })
+ body(ssc)
+ } finally {
+ if (ssc != null) ssc.stop()
+ }
+ }
+}