From c9c1c0e54d34773ac2cf5457fe5925559ece36c7 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 24 May 2016 22:01:40 -0700 Subject: [SPARK-15508][STREAMING][TESTS] Fix flaky test: JavaKafkaStreamSuite.testKafkaStream ## What changes were proposed in this pull request? `JavaKafkaStreamSuite.testKafkaStream` assumes when `sent.size == result.size`, the contents of `sent` and `result` should be same. However, that's not true. The content of `result` may not be the final content. This PR modified the test to always retry the assertions even if the contents of `sent` and `result` are not same. Here is the failure in Jenkins: http://spark-tests.appspot.com/tests/org.apache.spark.streaming.kafka.JavaKafkaStreamSuite/testKafkaStream ## How was this patch tested? Jenkins unit tests. Author: Shixiong Zhu Closes #13281 from zsxwing/flaky-kafka-test. --- .../spark/streaming/kafka/JavaKafkaStreamSuite.java | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) (limited to 'external') diff --git a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 868df64e8c..98fe38e826 100644 --- a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -122,14 +122,23 @@ public class JavaKafkaStreamSuite implements Serializable { ssc.start(); long startTime = System.currentTimeMillis(); - boolean sizeMatches = false; - while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) { - sizeMatches = sent.size() == result.size(); + AssertionError lastError = null; + while (System.currentTimeMillis() - startTime < 20000) { + try { + Assert.assertEquals(sent.size(), result.size()); + for (Map.Entry e : sent.entrySet()) { + Assert.assertEquals(e.getValue().intValue(), result.get(e.getKey()).intValue()); + } + return; + } catch (AssertionError e) { + lastError = e; + } Thread.sleep(200); } - Assert.assertEquals(sent.size(), result.size()); - for (Map.Entry e : sent.entrySet()) { - Assert.assertEquals(e.getValue().intValue(), result.get(e.getKey()).intValue()); + if (lastError != null) { + throw lastError; + } else { + Assert.fail("timeout"); } } } -- cgit v1.2.3