aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala9
1 files changed, 8 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 79d6254eb3..dbc50da21c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -24,6 +24,7 @@ import scala.util.Failure
import org.apache.commons.lang3.SerializationUtils
+import org.apache.spark.ExecutorAllocationClient
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
import org.apache.spark.streaming._
@@ -83,8 +84,14 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
+
+ val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
+ case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
+ case _ => null
+ }
+
executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
- ssc.sparkContext,
+ executorAllocClient,
receiverTracker,
ssc.conf,
ssc.graph.batchDuration.milliseconds,