aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala3
2 files changed, 6 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
index 9697437dd2..0b306a28d1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
@@ -87,11 +87,11 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
// Gauge for last received batch, useful for monitoring the streaming job's running status,
// displayed data -1 for any abnormal condition.
registerGaugeWithOption("lastReceivedBatch_submissionTime",
- _.lastCompletedBatch.map(_.submissionTime), -1L)
+ _.lastReceivedBatch.map(_.submissionTime), -1L)
registerGaugeWithOption("lastReceivedBatch_processingStartTime",
- _.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
+ _.lastReceivedBatch.flatMap(_.processingStartTime), -1L)
registerGaugeWithOption("lastReceivedBatch_processingEndTime",
- _.lastCompletedBatch.flatMap(_.processingEndTime), -1L)
+ _.lastReceivedBatch.flatMap(_.processingEndTime), -1L)
// Gauge for last received batch records.
registerGauge("lastReceivedBatch_records", _.lastReceivedBatchRecords.values.sum, 0L)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index 26b757cc2d..46ab3ac8de 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -68,6 +68,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted)))
listener.runningBatches should be (Nil)
listener.retainedCompletedBatches should be (Nil)
+ listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoSubmitted)))
listener.lastCompletedBatch should be (None)
listener.numUnprocessedBatches should be (1)
listener.numTotalCompletedBatches should be (0)
@@ -81,6 +82,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.waitingBatches should be (Nil)
listener.runningBatches should be (List(BatchUIData(batchInfoStarted)))
listener.retainedCompletedBatches should be (Nil)
+ listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoStarted)))
listener.lastCompletedBatch should be (None)
listener.numUnprocessedBatches should be (1)
listener.numTotalCompletedBatches should be (0)
@@ -123,6 +125,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.waitingBatches should be (Nil)
listener.runningBatches should be (Nil)
listener.retainedCompletedBatches should be (List(BatchUIData(batchInfoCompleted)))
+ listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoCompleted)))
listener.lastCompletedBatch should be (Some(BatchUIData(batchInfoCompleted)))
listener.numUnprocessedBatches should be (0)
listener.numTotalCompletedBatches should be (1)