diff options
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala | 23 |
1 files changed, 21 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 61f9e0974c..ac18f73ea8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -17,11 +17,14 @@ package org.apache.spark.streaming.scheduler +import java.util.Properties import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.collection.JavaConverters._ import scala.util.Failure +import org.apache.commons.lang.SerializationUtils + import org.apache.spark.internal.Logging import org.apache.spark.rdd.{PairRDDFunctions, RDD} import org.apache.spark.streaming._ @@ -57,6 +60,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // A tracker to track all the input stream information as well as processed record number var inputInfoTracker: InputInfoTracker = null + private var executorAllocationManager: Option[ExecutorAllocationManager] = None + private var eventLoop: EventLoop[JobSchedulerEvent] = null def start(): Unit = synchronized { @@ -79,8 +84,16 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { listenerBus.start() receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) + executorAllocationManager = ExecutorAllocationManager.createIfEnabled( + ssc.sparkContext, + receiverTracker, + ssc.conf, + ssc.graph.batchDuration.milliseconds, + clock) + executorAllocationManager.foreach(ssc.addStreamingListener) receiverTracker.start() jobGenerator.start() + executorAllocationManager.foreach(_.start()) logInfo("Started JobScheduler") } @@ -93,6 +106,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { receiverTracker.stop(processAllReceivedData) } + if (executorAllocationManager != null) { + executorAllocationManager.foreach(_.stop()) + } + // Second, stop generating jobs. If it has to process all received data, // then this will wait for all the processing through JobScheduler to be over. jobGenerator.stop(processAllReceivedData) @@ -200,7 +217,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { import JobScheduler._ def run() { + val oldProps = ssc.sparkContext.getLocalProperties try { + ssc.sparkContext.setLocalProperties( + SerializationUtils.clone(ssc.savedProperties.get()).asInstanceOf[Properties]) val formattedTime = UIUtils.formatBatchTime( job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}" @@ -234,8 +254,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // JobScheduler has been stopped. } } finally { - ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null) - ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null) + ssc.sparkContext.setLocalProperties(oldProps) } } } |