aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-06-17 15:00:03 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-06-17 15:00:03 -0700
commita06d9c8e76bb904d48764802aa3affff93b00baa (patch)
treee811edc0ca1df4d0110f70549008f8f037de4752
parent302556ff999ba9a1960281de6932e0d904197204 (diff)
downloadspark-a06d9c8e76bb904d48764802aa3affff93b00baa.tar.gz
spark-a06d9c8e76bb904d48764802aa3affff93b00baa.tar.bz2
spark-a06d9c8e76bb904d48764802aa3affff93b00baa.zip
[SPARK-8404] [STREAMING] [TESTS] Use thread-safe collections to make the tests more reliable
KafkaStreamSuite, DirectKafkaStreamSuite, JavaKafkaStreamSuite and JavaDirectKafkaStreamSuite use non-thread-safe collections to collect data in one thread and check it in another thread. It may fail the tests. This PR changes them to thread-safe collections. Note: I cannot reproduce the test failures in my environment. But at least, this PR should make the tests more reliable. Author: zsxwing <zsxwing@gmail.com> Closes #6852 from zsxwing/fix-KafkaStreamSuite and squashes the following commits: d464211 [zsxwing] Use thread-safe collections to make the tests more reliable
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java6
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java6
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala14
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala7
4 files changed, 14 insertions, 19 deletions
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index 4c1d6a03eb..c0669fb336 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -18,9 +18,7 @@
package org.apache.spark.streaming.kafka;
import java.io.Serializable;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Arrays;
+import java.util.*;
import scala.Tuple2;
@@ -116,7 +114,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
);
JavaDStream<String> unifiedStream = stream1.union(stream2);
- final HashSet<String> result = new HashSet<String>();
+ final Set<String> result = Collections.synchronizedSet(new HashSet<String>());
unifiedStream.foreachRDD(
new Function<JavaRDD<String>, Void>() {
@Override
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 540f4ceaba..e4c659215b 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -18,9 +18,7 @@
package org.apache.spark.streaming.kafka;
import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
+import java.util.*;
import scala.Tuple2;
@@ -94,7 +92,7 @@ public class JavaKafkaStreamSuite implements Serializable {
topics,
StorageLevel.MEMORY_ONLY_SER());
- final HashMap<String, Long> result = new HashMap<String, Long>();
+ final Map<String, Long> result = Collections.synchronizedMap(new HashMap<String, Long>());
JavaDStream<String> words = stream.map(
new Function<Tuple2<String, String>, String>() {
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 47bbfb6058..212eb35c61 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -99,7 +99,8 @@ class DirectKafkaStreamSuite
ssc, kafkaParams, topics)
}
- val allReceived = new ArrayBuffer[(String, String)]
+ val allReceived =
+ new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
stream.foreachRDD { rdd =>
// Get the offset ranges in the RDD
@@ -162,7 +163,7 @@ class DirectKafkaStreamSuite
"Start offset not from latest"
)
- val collectedData = new mutable.ArrayBuffer[String]()
+ val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() }
ssc.start()
val newData = Map("b" -> 10)
@@ -208,7 +209,7 @@ class DirectKafkaStreamSuite
"Start offset not from latest"
)
- val collectedData = new mutable.ArrayBuffer[String]()
+ val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
stream.foreachRDD { rdd => collectedData ++= rdd.collect() }
ssc.start()
val newData = Map("b" -> 10)
@@ -324,7 +325,8 @@ class DirectKafkaStreamSuite
ssc, kafkaParams, Set(topic))
}
- val allReceived = new ArrayBuffer[(String, String)]
+ val allReceived =
+ new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
ssc.start()
@@ -350,8 +352,8 @@ class DirectKafkaStreamSuite
}
object DirectKafkaStreamSuite {
- val collectedData = new mutable.ArrayBuffer[String]()
- var total = -1L
+ val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
+ @volatile var total = -1L
class InputInfoCollector extends StreamingListener {
val numRecordsSubmitted = new AtomicLong(0L)
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 8ee2cc660f..797b07f80d 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -65,7 +65,7 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
- val result = new mutable.HashMap[String, Long]()
+ val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long]
stream.map(_._2).countByValue().foreachRDD { r =>
val ret = r.collect()
ret.toMap.foreach { kv =>
@@ -77,10 +77,7 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
ssc.start()
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
- assert(sent.size === result.size)
- sent.keys.foreach { k =>
- assert(sent(k) === result(k).toInt)
- }
+ assert(sent === result)
}
}
}