diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-05-10 13:26:53 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-05-10 13:26:53 -0700 |
commit | 9533f5390a3ad7ab96a7bea01cdb6aed89503a51 (patch) | |
tree | caef694bf7758ba3138fb7232f052e3e5104e5da | |
parent | 603c4f8ebde3ff61ac84a9460e6602c38a176e9f (diff) | |
download | spark-9533f5390a3ad7ab96a7bea01cdb6aed89503a51.tar.gz spark-9533f5390a3ad7ab96a7bea01cdb6aed89503a51.tar.bz2 spark-9533f5390a3ad7ab96a7bea01cdb6aed89503a51.zip |
[SPARK-6005][TESTS] Fix flaky test: o.a.s.streaming.kafka.DirectKafkaStreamSuite.offset recovery
## What changes were proposed in this pull request?
Because this test extracts data from `DStream.generatedRDDs` before stopping, it may get data before checkpointing. Then after recovering from the checkpoint, `recoveredOffsetRanges` may contain something not in `offsetRangesBeforeStop`, which will fail the test. Adding `Thread.sleep(1000)` before `ssc.stop()` will reproduce this failure.
This PR just moves the logic of `offsetRangesBeforeStop` (also renamed to `offsetRangesAfterStop`) after `ssc.stop()` to fix the flaky test.
## How was this patch tested?
Jenkins unit tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #12903 from zsxwing/SPARK-6005.
-rw-r--r-- | external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala | 20 |
1 files changed, 14 insertions, 6 deletions
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index f14ff6705f..cb782d27fe 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -281,14 +281,20 @@ class DirectKafkaStreamSuite sendDataAndWaitForReceive(i) } + ssc.stop() + // Verify that offset ranges were generated - val offsetRangesBeforeStop = getOffsetRanges(kafkaStream) - assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated") + // Since "offsetRangesAfterStop" will be used to compare with "recoveredOffsetRanges", we should + // collect offset ranges after stopping. Otherwise, because new RDDs keep being generated before + // stopping, we may not be able to get the latest RDDs, then "recoveredOffsetRanges" will + // contain something not in "offsetRangesAfterStop". + val offsetRangesAfterStop = getOffsetRanges(kafkaStream) + assert(offsetRangesAfterStop.size >= 1, "No offset ranges generated") assert( - offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 }, + offsetRangesAfterStop.head._2.forall { _.fromOffset === 0 }, "starting offset not zero" ) - ssc.stop() + logInfo("====== RESTARTING ========") // Recover context from checkpoints @@ -298,12 +304,14 @@ class DirectKafkaStreamSuite // Verify offset ranges have been recovered val recoveredOffsetRanges = getOffsetRanges(recoveredStream) assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered") - val earlierOffsetRangesAsSets = offsetRangesBeforeStop.map { x => (x._1, x._2.toSet) } + val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) } assert( recoveredOffsetRanges.forall { or => earlierOffsetRangesAsSets.contains((or._1, or._2.toSet)) }, - "Recovered ranges are not the same as the ones generated" + "Recovered ranges are not the same as the ones generated\n" + + s"recoveredOffsetRanges: $recoveredOffsetRanges\n" + + s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets" ) // Restart context, give more data and verify the total at the end // If the total is write that means each records has been received only once |