aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-05-10 13:26:53 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-05-10 13:26:53 -0700
commit9533f5390a3ad7ab96a7bea01cdb6aed89503a51 (patch)
treecaef694bf7758ba3138fb7232f052e3e5104e5da /external
parent603c4f8ebde3ff61ac84a9460e6602c38a176e9f (diff)
downloadspark-9533f5390a3ad7ab96a7bea01cdb6aed89503a51.tar.gz
spark-9533f5390a3ad7ab96a7bea01cdb6aed89503a51.tar.bz2
spark-9533f5390a3ad7ab96a7bea01cdb6aed89503a51.zip
[SPARK-6005][TESTS] Fix flaky test: o.a.s.streaming.kafka.DirectKafkaStreamSuite.offset recovery
## What changes were proposed in this pull request? Because this test extracts data from `DStream.generatedRDDs` before stopping, it may get data before checkpointing. Then after recovering from the checkpoint, `recoveredOffsetRanges` may contain something not in `offsetRangesBeforeStop`, which will fail the test. Adding `Thread.sleep(1000)` before `ssc.stop()` will reproduce this failure. This PR just moves the logic of `offsetRangesBeforeStop` (also renamed to `offsetRangesAfterStop`) after `ssc.stop()` to fix the flaky test. ## How was this patch tested? Jenkins unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #12903 from zsxwing/SPARK-6005.
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala20
1 files changed, 14 insertions, 6 deletions
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index f14ff6705f..cb782d27fe 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -281,14 +281,20 @@ class DirectKafkaStreamSuite
sendDataAndWaitForReceive(i)
}
+ ssc.stop()
+
// Verify that offset ranges were generated
- val offsetRangesBeforeStop = getOffsetRanges(kafkaStream)
- assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated")
+ // Since "offsetRangesAfterStop" will be used to compare with "recoveredOffsetRanges", we should
+ // collect offset ranges after stopping. Otherwise, because new RDDs keep being generated before
+ // stopping, we may not be able to get the latest RDDs, then "recoveredOffsetRanges" will
+ // contain something not in "offsetRangesAfterStop".
+ val offsetRangesAfterStop = getOffsetRanges(kafkaStream)
+ assert(offsetRangesAfterStop.size >= 1, "No offset ranges generated")
assert(
- offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 },
+ offsetRangesAfterStop.head._2.forall { _.fromOffset === 0 },
"starting offset not zero"
)
- ssc.stop()
+
logInfo("====== RESTARTING ========")
// Recover context from checkpoints
@@ -298,12 +304,14 @@ class DirectKafkaStreamSuite
// Verify offset ranges have been recovered
val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
- val earlierOffsetRangesAsSets = offsetRangesBeforeStop.map { x => (x._1, x._2.toSet) }
+ val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) }
assert(
recoveredOffsetRanges.forall { or =>
earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
},
- "Recovered ranges are not the same as the ones generated"
+ "Recovered ranges are not the same as the ones generated\n" +
+ s"recoveredOffsetRanges: $recoveredOffsetRanges\n" +
+ s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets"
)
// Restart context, give more data and verify the total at the end
// If the total is write that means each records has been received only once