diff options
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala | 12 |
1 files changed, 10 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 61f852a0d3..95f582106c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -27,7 +27,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.scheduler._ -private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) +private[spark] class StreamingJobProgressListener(ssc: StreamingContext) extends SparkListener with StreamingListener { private val waitingBatchUIData = new HashMap[Time, BatchUIData] @@ -39,6 +39,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) private var totalProcessedRecords = 0L private val receiverInfos = new HashMap[Int, ReceiverInfo] + private var _startTime = -1L + // Because onJobStart and onBatchXXX messages are processed in different threads, // we may not be able to get the corresponding BatchUIData when receiving onJobStart. So here we // cannot use a map of (Time, BatchUIData). @@ -66,6 +68,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) val batchDuration = ssc.graph.batchDuration.milliseconds + override def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { + _startTime = streamingStarted.time + } + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { synchronized { receiverInfos(receiverStarted.receiverInfo.streamId) = receiverStarted.receiverInfo @@ -152,6 +158,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) } } + def startTime: Long = _startTime + def numReceivers: Int = synchronized { receiverInfos.size } @@ -267,7 +275,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) } } -private[streaming] object StreamingJobProgressListener { +private[spark] object StreamingJobProgressListener { type SparkJobId = Int type OutputOpId = Int } |