From 17b72d31e0c59711eddeb525becb8085930eadcc Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Thu, 22 Sep 2016 10:10:37 -0700 Subject: [SPARK-17365][CORE] Remove/Kill multiple executors together to reduce RPC call time. ## What changes were proposed in this pull request? We are killing multiple executors together instead of iterating over expensive RPC calls to kill single executor. ## How was this patch tested? Executed sample spark job to observe executors being killed/removed with dynamic allocation enabled. Author: Dhruve Ashar Author: Dhruve Ashar Closes #15152 from dhruve/impr/SPARK-17365. --- .../spark/streaming/scheduler/ExecutorAllocationManager.scala | 2 +- .../org/apache/spark/streaming/scheduler/JobScheduler.scala | 9 ++++++++- .../streaming/scheduler/ExecutorAllocationManagerSuite.scala | 5 +++-- 3 files changed, 12 insertions(+), 4 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala index fb5587edec..7b29b40668 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala @@ -226,7 +226,7 @@ private[streaming] object ExecutorAllocationManager extends Logging { conf: SparkConf, batchDurationMs: Long, clock: Clock): Option[ExecutorAllocationManager] = { - if (isDynamicAllocationEnabled(conf)) { + if (isDynamicAllocationEnabled(conf) && client != null) { Some(new ExecutorAllocationManager(client, receiverTracker, conf, batchDurationMs, clock)) } else None } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 79d6254eb3..dbc50da21c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -24,6 +24,7 @@ import scala.util.Failure import org.apache.commons.lang3.SerializationUtils +import org.apache.spark.ExecutorAllocationClient import org.apache.spark.internal.Logging import org.apache.spark.rdd.{PairRDDFunctions, RDD} import org.apache.spark.streaming._ @@ -83,8 +84,14 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { listenerBus.start() receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) + + val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match { + case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient] + case _ => null + } + executorAllocationManager = ExecutorAllocationManager.createIfEnabled( - ssc.sparkContext, + executorAllocClient, receiverTracker, ssc.conf, ssc.graph.batchDuration.milliseconds, diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index 7630f4a75e..b49e579071 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -380,8 +380,9 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite } private def withStreamingContext(conf: SparkConf)(body: StreamingContext => Unit): Unit = { - conf.setMaster("local").setAppName(this.getClass.getSimpleName).set( - "spark.streaming.dynamicAllocation.testing", "true") // to test dynamic allocation + conf.setMaster("myDummyLocalExternalClusterManager") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.dynamicAllocation.testing", "true") // to test dynamic allocation var ssc: StreamingContext = null try { -- cgit v1.2.3