diff options
author | Dhruve Ashar <dashar@yahoo-inc.com> | 2016-09-22 10:10:37 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2016-09-22 10:10:37 -0700 |
commit | 17b72d31e0c59711eddeb525becb8085930eadcc (patch) | |
tree | 89c82299dc5a3a6be368d78b9dd3caa64e1e5ec7 /streaming | |
parent | 8a02410a92429bff50d6ce082f873cea9e9fa91e (diff) | |
download | spark-17b72d31e0c59711eddeb525becb8085930eadcc.tar.gz spark-17b72d31e0c59711eddeb525becb8085930eadcc.tar.bz2 spark-17b72d31e0c59711eddeb525becb8085930eadcc.zip |
[SPARK-17365][CORE] Remove/Kill multiple executors together to reduce RPC call time.
## What changes were proposed in this pull request?
We are killing multiple executors together instead of iterating over expensive RPC calls to kill single executor.
## How was this patch tested?
Executed sample spark job to observe executors being killed/removed with dynamic allocation enabled.
Author: Dhruve Ashar <dashar@yahoo-inc.com>
Author: Dhruve Ashar <dhruveashar@gmail.com>
Closes #15152 from dhruve/impr/SPARK-17365.
Diffstat (limited to 'streaming')
3 files changed, 12 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala index fb5587edec..7b29b40668 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala @@ -226,7 +226,7 @@ private[streaming] object ExecutorAllocationManager extends Logging { conf: SparkConf, batchDurationMs: Long, clock: Clock): Option[ExecutorAllocationManager] = { - if (isDynamicAllocationEnabled(conf)) { + if (isDynamicAllocationEnabled(conf) && client != null) { Some(new ExecutorAllocationManager(client, receiverTracker, conf, batchDurationMs, clock)) } else None } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 79d6254eb3..dbc50da21c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -24,6 +24,7 @@ import scala.util.Failure import org.apache.commons.lang3.SerializationUtils +import org.apache.spark.ExecutorAllocationClient import org.apache.spark.internal.Logging import org.apache.spark.rdd.{PairRDDFunctions, RDD} import org.apache.spark.streaming._ @@ -83,8 +84,14 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { listenerBus.start() receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) + + val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match { + case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient] + case _ => null + } + executorAllocationManager = ExecutorAllocationManager.createIfEnabled( - ssc.sparkContext, + executorAllocClient, receiverTracker, ssc.conf, ssc.graph.batchDuration.milliseconds, diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index 7630f4a75e..b49e579071 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -380,8 +380,9 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite } private def withStreamingContext(conf: SparkConf)(body: StreamingContext => Unit): Unit = { - conf.setMaster("local").setAppName(this.getClass.getSimpleName).set( - "spark.streaming.dynamicAllocation.testing", "true") // to test dynamic allocation + conf.setMaster("myDummyLocalExternalClusterManager") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.dynamicAllocation.testing", "true") // to test dynamic allocation var ssc: StreamingContext = null try { |