diff options
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala | 6 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala | 3 |
2 files changed, 6 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala index 9697437dd2..0b306a28d1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala @@ -87,11 +87,11 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { // Gauge for last received batch, useful for monitoring the streaming job's running status, // displayed data -1 for any abnormal condition. registerGaugeWithOption("lastReceivedBatch_submissionTime", - _.lastCompletedBatch.map(_.submissionTime), -1L) + _.lastReceivedBatch.map(_.submissionTime), -1L) registerGaugeWithOption("lastReceivedBatch_processingStartTime", - _.lastCompletedBatch.flatMap(_.processingStartTime), -1L) + _.lastReceivedBatch.flatMap(_.processingStartTime), -1L) registerGaugeWithOption("lastReceivedBatch_processingEndTime", - _.lastCompletedBatch.flatMap(_.processingEndTime), -1L) + _.lastReceivedBatch.flatMap(_.processingEndTime), -1L) // Gauge for last received batch records. registerGauge("lastReceivedBatch_records", _.lastReceivedBatchRecords.values.sum, 0L) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala index 26b757cc2d..46ab3ac8de 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala @@ -68,6 +68,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted))) listener.runningBatches should be (Nil) listener.retainedCompletedBatches should be (Nil) + listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoSubmitted))) listener.lastCompletedBatch should be (None) listener.numUnprocessedBatches should be (1) listener.numTotalCompletedBatches should be (0) @@ -81,6 +82,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.waitingBatches should be (Nil) listener.runningBatches should be (List(BatchUIData(batchInfoStarted))) listener.retainedCompletedBatches should be (Nil) + listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoStarted))) listener.lastCompletedBatch should be (None) listener.numUnprocessedBatches should be (1) listener.numTotalCompletedBatches should be (0) @@ -123,6 +125,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.waitingBatches should be (Nil) listener.runningBatches should be (Nil) listener.retainedCompletedBatches should be (List(BatchUIData(batchInfoCompleted))) + listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoCompleted))) listener.lastCompletedBatch should be (Some(BatchUIData(batchInfoCompleted))) listener.numUnprocessedBatches should be (0) listener.numTotalCompletedBatches should be (1) |