diff options
author | zsxwing <zsxwing@gmail.com> | 2015-06-17 15:00:03 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-06-17 15:00:03 -0700 |
commit | a06d9c8e76bb904d48764802aa3affff93b00baa (patch) | |
tree | e811edc0ca1df4d0110f70549008f8f037de4752 /external/kafka/src | |
parent | 302556ff999ba9a1960281de6932e0d904197204 (diff) | |
download | spark-a06d9c8e76bb904d48764802aa3affff93b00baa.tar.gz spark-a06d9c8e76bb904d48764802aa3affff93b00baa.tar.bz2 spark-a06d9c8e76bb904d48764802aa3affff93b00baa.zip |
[SPARK-8404] [STREAMING] [TESTS] Use thread-safe collections to make the tests more reliable
KafkaStreamSuite, DirectKafkaStreamSuite, JavaKafkaStreamSuite and JavaDirectKafkaStreamSuite use non-thread-safe collections to collect data in one thread and check it in another thread. It may fail the tests.
This PR changes them to thread-safe collections.
Note: I cannot reproduce the test failures in my environment. But at least, this PR should make the tests more reliable.
Author: zsxwing <zsxwing@gmail.com>
Closes #6852 from zsxwing/fix-KafkaStreamSuite and squashes the following commits:
d464211 [zsxwing] Use thread-safe collections to make the tests more reliable
Diffstat (limited to 'external/kafka/src')
4 files changed, 14 insertions, 19 deletions
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index 4c1d6a03eb..c0669fb336 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -18,9 +18,7 @@ package org.apache.spark.streaming.kafka; import java.io.Serializable; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Arrays; +import java.util.*; import scala.Tuple2; @@ -116,7 +114,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable { ); JavaDStream<String> unifiedStream = stream1.union(stream2); - final HashSet<String> result = new HashSet<String>(); + final Set<String> result = Collections.synchronizedSet(new HashSet<String>()); unifiedStream.foreachRDD( new Function<JavaRDD<String>, Void>() { @Override diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 540f4ceaba..e4c659215b 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -18,9 +18,7 @@ package org.apache.spark.streaming.kafka; import java.io.Serializable; -import java.util.HashMap; -import java.util.List; -import java.util.Random; +import java.util.*; import scala.Tuple2; @@ -94,7 +92,7 @@ public class JavaKafkaStreamSuite implements Serializable { topics, StorageLevel.MEMORY_ONLY_SER()); - final HashMap<String, Long> result = new HashMap<String, Long>(); + final Map<String, Long> result = Collections.synchronizedMap(new HashMap<String, Long>()); JavaDStream<String> words = stream.map( new Function<Tuple2<String, String>, String>() { diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 47bbfb6058..212eb35c61 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -99,7 +99,8 @@ class DirectKafkaStreamSuite ssc, kafkaParams, topics) } - val allReceived = new ArrayBuffer[(String, String)] + val allReceived = + new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)] stream.foreachRDD { rdd => // Get the offset ranges in the RDD @@ -162,7 +163,7 @@ class DirectKafkaStreamSuite "Start offset not from latest" ) - val collectedData = new mutable.ArrayBuffer[String]() + val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String] stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() } ssc.start() val newData = Map("b" -> 10) @@ -208,7 +209,7 @@ class DirectKafkaStreamSuite "Start offset not from latest" ) - val collectedData = new mutable.ArrayBuffer[String]() + val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String] stream.foreachRDD { rdd => collectedData ++= rdd.collect() } ssc.start() val newData = Map("b" -> 10) @@ -324,7 +325,8 @@ class DirectKafkaStreamSuite ssc, kafkaParams, Set(topic)) } - val allReceived = new ArrayBuffer[(String, String)] + val allReceived = + new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)] stream.foreachRDD { rdd => allReceived ++= rdd.collect() } ssc.start() @@ -350,8 +352,8 @@ class DirectKafkaStreamSuite } object DirectKafkaStreamSuite { - val collectedData = new mutable.ArrayBuffer[String]() - var total = -1L + val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String] + @volatile var total = -1L class InputInfoCollector extends StreamingListener { val numRecordsSubmitted = new AtomicLong(0L) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 8ee2cc660f..797b07f80d 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -65,7 +65,7 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) - val result = new mutable.HashMap[String, Long]() + val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long] stream.map(_._2).countByValue().foreachRDD { r => val ret = r.collect() ret.toMap.foreach { kv => @@ -77,10 +77,7 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter ssc.start() eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { - assert(sent.size === result.size) - sent.keys.foreach { k => - assert(sent(k) === result(k).toInt) - } + assert(sent === result) } } } |