aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala12
1 files changed, 10 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 61f852a0d3..95f582106c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -27,7 +27,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.scheduler._
-private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
+private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
extends SparkListener with StreamingListener {
private val waitingBatchUIData = new HashMap[Time, BatchUIData]
@@ -39,6 +39,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private var totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
+ private var _startTime = -1L
+
// Because onJobStart and onBatchXXX messages are processed in different threads,
// we may not be able to get the corresponding BatchUIData when receiving onJobStart. So here we
// cannot use a map of (Time, BatchUIData).
@@ -66,6 +68,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
val batchDuration = ssc.graph.batchDuration.milliseconds
+ override def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) {
+ _startTime = streamingStarted.time
+ }
+
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
synchronized {
receiverInfos(receiverStarted.receiverInfo.streamId) = receiverStarted.receiverInfo
@@ -152,6 +158,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}
+ def startTime: Long = _startTime
+
def numReceivers: Int = synchronized {
receiverInfos.size
}
@@ -267,7 +275,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}
-private[streaming] object StreamingJobProgressListener {
+private[spark] object StreamingJobProgressListener {
type SparkJobId = Int
type OutputOpId = Int
}