aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-8
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-05-24 22:01:40 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-05-24 22:01:40 -0700
commitc9c1c0e54d34773ac2cf5457fe5925559ece36c7 (patch)
treeebafa9efe64f0917fae0e984da6966e850db03ee /external/kafka-0-8
parent50b660d725269dc0c11e0d350ddd7fc8b19539a0 (diff)
downloadspark-c9c1c0e54d34773ac2cf5457fe5925559ece36c7.tar.gz
spark-c9c1c0e54d34773ac2cf5457fe5925559ece36c7.tar.bz2
spark-c9c1c0e54d34773ac2cf5457fe5925559ece36c7.zip
[SPARK-15508][STREAMING][TESTS] Fix flaky test: JavaKafkaStreamSuite.testKafkaStream
## What changes were proposed in this pull request? `JavaKafkaStreamSuite.testKafkaStream` assumes when `sent.size == result.size`, the contents of `sent` and `result` should be same. However, that's not true. The content of `result` may not be the final content. This PR modified the test to always retry the assertions even if the contents of `sent` and `result` are not same. Here is the failure in Jenkins: http://spark-tests.appspot.com/tests/org.apache.spark.streaming.kafka.JavaKafkaStreamSuite/testKafkaStream ## How was this patch tested? Jenkins unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13281 from zsxwing/flaky-kafka-test.
Diffstat (limited to 'external/kafka-0-8')
-rw-r--r--external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java21
1 files changed, 15 insertions, 6 deletions
diff --git a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 868df64e8c..98fe38e826 100644
--- a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -122,14 +122,23 @@ public class JavaKafkaStreamSuite implements Serializable {
ssc.start();
long startTime = System.currentTimeMillis();
- boolean sizeMatches = false;
- while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) {
- sizeMatches = sent.size() == result.size();
+ AssertionError lastError = null;
+ while (System.currentTimeMillis() - startTime < 20000) {
+ try {
+ Assert.assertEquals(sent.size(), result.size());
+ for (Map.Entry<String, Integer> e : sent.entrySet()) {
+ Assert.assertEquals(e.getValue().intValue(), result.get(e.getKey()).intValue());
+ }
+ return;
+ } catch (AssertionError e) {
+ lastError = e;
+ }
Thread.sleep(200);
}
- Assert.assertEquals(sent.size(), result.size());
- for (Map.Entry<String, Integer> e : sent.entrySet()) {
- Assert.assertEquals(e.getValue().intValue(), result.get(e.getKey()).intValue());
+ if (lastError != null) {
+ throw lastError;
+ } else {
+ Assert.fail("timeout");
}
}
}