diff options
author | Xin Ren <iamshrek@126.com> | 2016-08-17 16:31:42 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-08-17 16:31:42 -0700 |
commit | e6bef7d52f0e19ec771fb0f3e96c7ddbd1a6a19b (patch) | |
tree | 319b48b2da22bf9a98a3140e40ee4ed2d68d8d62 /streaming/src/test/scala | |
parent | d60af8f6aa53373de1333cc642cf2a9d7b39d912 (diff) | |
download | spark-e6bef7d52f0e19ec771fb0f3e96c7ddbd1a6a19b.tar.gz spark-e6bef7d52f0e19ec771fb0f3e96c7ddbd1a6a19b.tar.bz2 spark-e6bef7d52f0e19ec771fb0f3e96c7ddbd1a6a19b.zip |
[SPARK-17038][STREAMING] fix metrics retrieval source of 'lastReceivedBatch'
https://issues.apache.org/jira/browse/SPARK-17038
## What changes were proposed in this pull request?
StreamingSource's lastReceivedBatch_submissionTime, lastReceivedBatch_processingTimeStart, and lastReceivedBatch_processingTimeEnd all use data from lastCompletedBatch instead of lastReceivedBatch.
In particular, this makes it impossible to match lastReceivedBatch_records with a batchID/submission time.
This is apparent when looking at StreamingSource.scala, lines 89-94.
## How was this patch tested?
Manually running unit tests on local laptop
Author: Xin Ren <iamshrek@126.com>
Closes #14681 from keypointt/SPARK-17038.
Diffstat (limited to 'streaming/src/test/scala')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala | 3 |
1 files changed, 3 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala index 26b757cc2d..46ab3ac8de 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala @@ -68,6 +68,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted))) listener.runningBatches should be (Nil) listener.retainedCompletedBatches should be (Nil) + listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoSubmitted))) listener.lastCompletedBatch should be (None) listener.numUnprocessedBatches should be (1) listener.numTotalCompletedBatches should be (0) @@ -81,6 +82,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.waitingBatches should be (Nil) listener.runningBatches should be (List(BatchUIData(batchInfoStarted))) listener.retainedCompletedBatches should be (Nil) + listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoStarted))) listener.lastCompletedBatch should be (None) listener.numUnprocessedBatches should be (1) listener.numTotalCompletedBatches should be (0) @@ -123,6 +125,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.waitingBatches should be (Nil) listener.runningBatches should be (Nil) listener.retainedCompletedBatches should be (List(BatchUIData(batchInfoCompleted))) + listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoCompleted))) listener.lastCompletedBatch should be (Some(BatchUIData(batchInfoCompleted))) listener.numUnprocessedBatches should be (0) listener.numTotalCompletedBatches should be (1) |