aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorDhruve Ashar <dashar@yahoo-inc.com>2016-09-22 10:10:37 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-09-22 10:10:37 -0700
commit17b72d31e0c59711eddeb525becb8085930eadcc (patch)
tree89c82299dc5a3a6be368d78b9dd3caa64e1e5ec7 /streaming
parent8a02410a92429bff50d6ce082f873cea9e9fa91e (diff)
downloadspark-17b72d31e0c59711eddeb525becb8085930eadcc.tar.gz
spark-17b72d31e0c59711eddeb525becb8085930eadcc.tar.bz2
spark-17b72d31e0c59711eddeb525becb8085930eadcc.zip
[SPARK-17365][CORE] Remove/Kill multiple executors together to reduce RPC call time.
## What changes were proposed in this pull request? We are killing multiple executors together instead of iterating over expensive RPC calls to kill single executor. ## How was this patch tested? Executed sample spark job to observe executors being killed/removed with dynamic allocation enabled. Author: Dhruve Ashar <dashar@yahoo-inc.com> Author: Dhruve Ashar <dhruveashar@gmail.com> Closes #15152 from dhruve/impr/SPARK-17365.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala5
3 files changed, 12 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
index fb5587edec..7b29b40668 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
@@ -226,7 +226,7 @@ private[streaming] object ExecutorAllocationManager extends Logging {
conf: SparkConf,
batchDurationMs: Long,
clock: Clock): Option[ExecutorAllocationManager] = {
- if (isDynamicAllocationEnabled(conf)) {
+ if (isDynamicAllocationEnabled(conf) && client != null) {
Some(new ExecutorAllocationManager(client, receiverTracker, conf, batchDurationMs, clock))
} else None
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 79d6254eb3..dbc50da21c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -24,6 +24,7 @@ import scala.util.Failure
import org.apache.commons.lang3.SerializationUtils
+import org.apache.spark.ExecutorAllocationClient
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
import org.apache.spark.streaming._
@@ -83,8 +84,14 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
+
+ val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
+ case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
+ case _ => null
+ }
+
executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
- ssc.sparkContext,
+ executorAllocClient,
receiverTracker,
ssc.conf,
ssc.graph.batchDuration.milliseconds,
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index 7630f4a75e..b49e579071 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -380,8 +380,9 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
}
private def withStreamingContext(conf: SparkConf)(body: StreamingContext => Unit): Unit = {
- conf.setMaster("local").setAppName(this.getClass.getSimpleName).set(
- "spark.streaming.dynamicAllocation.testing", "true") // to test dynamic allocation
+ conf.setMaster("myDummyLocalExternalClusterManager")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.dynamicAllocation.testing", "true") // to test dynamic allocation
var ssc: StreamingContext = null
try {