aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala23
1 files changed, 21 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 61f9e0974c..ac18f73ea8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -17,11 +17,14 @@
package org.apache.spark.streaming.scheduler
+import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConverters._
import scala.util.Failure
+import org.apache.commons.lang.SerializationUtils
+
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
import org.apache.spark.streaming._
@@ -57,6 +60,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// A tracker to track all the input stream information as well as processed record number
var inputInfoTracker: InputInfoTracker = null
+ private var executorAllocationManager: Option[ExecutorAllocationManager] = None
+
private var eventLoop: EventLoop[JobSchedulerEvent] = null
def start(): Unit = synchronized {
@@ -79,8 +84,16 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
+ executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
+ ssc.sparkContext,
+ receiverTracker,
+ ssc.conf,
+ ssc.graph.batchDuration.milliseconds,
+ clock)
+ executorAllocationManager.foreach(ssc.addStreamingListener)
receiverTracker.start()
jobGenerator.start()
+ executorAllocationManager.foreach(_.start())
logInfo("Started JobScheduler")
}
@@ -93,6 +106,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
receiverTracker.stop(processAllReceivedData)
}
+ if (executorAllocationManager != null) {
+ executorAllocationManager.foreach(_.stop())
+ }
+
// Second, stop generating jobs. If it has to process all received data,
// then this will wait for all the processing through JobScheduler to be over.
jobGenerator.stop(processAllReceivedData)
@@ -200,7 +217,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
import JobScheduler._
def run() {
+ val oldProps = ssc.sparkContext.getLocalProperties
try {
+ ssc.sparkContext.setLocalProperties(
+ SerializationUtils.clone(ssc.savedProperties.get()).asInstanceOf[Properties])
val formattedTime = UIUtils.formatBatchTime(
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
@@ -234,8 +254,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// JobScheduler has been stopped.
}
} finally {
- ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
- ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
+ ssc.sparkContext.setLocalProperties(oldProps)
}
}
}