aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala20
1 files changed, 14 insertions, 6 deletions
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index f14ff6705f..cb782d27fe 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -281,14 +281,20 @@ class DirectKafkaStreamSuite
sendDataAndWaitForReceive(i)
}
+ ssc.stop()
+
// Verify that offset ranges were generated
- val offsetRangesBeforeStop = getOffsetRanges(kafkaStream)
- assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated")
+ // Since "offsetRangesAfterStop" will be used to compare with "recoveredOffsetRanges", we should
+ // collect offset ranges after stopping. Otherwise, because new RDDs keep being generated before
+ // stopping, we may not be able to get the latest RDDs, then "recoveredOffsetRanges" will
+ // contain something not in "offsetRangesAfterStop".
+ val offsetRangesAfterStop = getOffsetRanges(kafkaStream)
+ assert(offsetRangesAfterStop.size >= 1, "No offset ranges generated")
assert(
- offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 },
+ offsetRangesAfterStop.head._2.forall { _.fromOffset === 0 },
"starting offset not zero"
)
- ssc.stop()
+
logInfo("====== RESTARTING ========")
// Recover context from checkpoints
@@ -298,12 +304,14 @@ class DirectKafkaStreamSuite
// Verify offset ranges have been recovered
val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
- val earlierOffsetRangesAsSets = offsetRangesBeforeStop.map { x => (x._1, x._2.toSet) }
+ val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) }
assert(
recoveredOffsetRanges.forall { or =>
earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
},
- "Recovered ranges are not the same as the ones generated"
+ "Recovered ranges are not the same as the ones generated\n" +
+ s"recoveredOffsetRanges: $recoveredOffsetRanges\n" +
+ s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets"
)
// Restart context, give more data and verify the total at the end
// If the total is write that means each records has been received only once