aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2016-07-06 16:21:41 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-07-06 16:21:41 -0700
commitb8ebf63c1e1fa1ab53ea760fa293051c08ce5f59 (patch)
tree22d63233b9b7f6c9e0d0e7a43dd8b5826561a5f6
parent8e3e4ed6c090d18675d49eec46b3ee572457db95 (diff)
downloadspark-b8ebf63c1e1fa1ab53ea760fa293051c08ce5f59.tar.gz
spark-b8ebf63c1e1fa1ab53ea760fa293051c08ce5f59.tar.bz2
spark-b8ebf63c1e1fa1ab53ea760fa293051c08ce5f59.zip
[SPARK-16212][STREAMING][KAFKA] apply test tweaks from 0-10 to 0-8 as well
## What changes were proposed in this pull request? Bring the kafka-0-8 subproject up to date with some test modifications from development on 0-10. Main changes are - eliminating waits on concurrent queue in favor of an assert on received results, - atomics instead of volatile (although this probably doesn't matter) - increasing uniqueness of topic names ## How was this patch tested? Unit tests Author: cody koeninger <cody@koeninger.org> Closes #14073 from koeninger/kafka-0-8-test-direct-cleanup.
-rw-r--r--external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala41
-rw-r--r--external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala8
2 files changed, 24 insertions, 25 deletions
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index cb782d27fe..ab1c5055a2 100644
--- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -244,12 +244,9 @@ class DirectKafkaStreamSuite
)
// Send data to Kafka and wait for it to be received
- def sendDataAndWaitForReceive(data: Seq[Int]) {
+ def sendData(data: Seq[Int]) {
val strings = data.map { _.toString}
kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
- assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains })
- }
}
// Setup the streaming context
@@ -264,21 +261,21 @@ class DirectKafkaStreamSuite
}
ssc.checkpoint(testDir.getAbsolutePath)
- // This is to collect the raw data received from Kafka
- kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
- val data = rdd.map { _._2 }.collect()
- DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*))
- }
-
// This is ensure all the data is eventually receiving only once
stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
- rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = x._2 }
+ rdd.collect().headOption.foreach { x =>
+ DirectKafkaStreamSuite.total.set(x._2)
+ }
}
ssc.start()
- // Send some data and wait for them to be received
+ // Send some data
for (i <- (1 to 10).grouped(4)) {
- sendDataAndWaitForReceive(i)
+ sendData(i)
+ }
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
}
ssc.stop()
@@ -302,23 +299,26 @@ class DirectKafkaStreamSuite
val recoveredStream = ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]]
// Verify offset ranges have been recovered
- val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
+ val recoveredOffsetRanges = getOffsetRanges(recoveredStream).map { x => (x._1, x._2.toSet) }
assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
- val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) }
+ val earlierOffsetRanges = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) }
assert(
recoveredOffsetRanges.forall { or =>
- earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
+ earlierOffsetRanges.contains((or._1, or._2))
},
"Recovered ranges are not the same as the ones generated\n" +
s"recoveredOffsetRanges: $recoveredOffsetRanges\n" +
- s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets"
+ s"earlierOffsetRanges: $earlierOffsetRanges"
)
// Restart context, give more data and verify the total at the end
// If the total is write that means each records has been received only once
ssc.start()
- sendDataAndWaitForReceive(11 to 20)
+ for (i <- (11 to 20).grouped(4)) {
+ sendData(i)
+ }
+
eventually(timeout(10 seconds), interval(50 milliseconds)) {
- assert(DirectKafkaStreamSuite.total === (1 to 20).sum)
+ assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
}
ssc.stop()
}
@@ -488,8 +488,7 @@ class DirectKafkaStreamSuite
}
object DirectKafkaStreamSuite {
- val collectedData = new ConcurrentLinkedQueue[String]()
- @volatile var total = -1L
+ val total = new AtomicLong(-1L)
class InputInfoCollector extends StreamingListener {
val numRecordsSubmitted = new AtomicLong(0L)
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
index 5e539c1d79..809699a739 100644
--- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -53,13 +53,13 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
}
test("basic usage") {
- val topic = s"topicbasic-${Random.nextInt}"
+ val topic = s"topicbasic-${Random.nextInt}-${System.currentTimeMillis}"
kafkaTestUtils.createTopic(topic)
val messages = Array("the", "quick", "brown", "fox")
kafkaTestUtils.sendMessages(topic, messages)
val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
- "group.id" -> s"test-consumer-${Random.nextInt}")
+ "group.id" -> s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}")
val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
@@ -92,12 +92,12 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
test("iterator boundary conditions") {
// the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
- val topic = s"topicboundary-${Random.nextInt}"
+ val topic = s"topicboundary-${Random.nextInt}-${System.currentTimeMillis}"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
kafkaTestUtils.createTopic(topic)
val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
- "group.id" -> s"test-consumer-${Random.nextInt}")
+ "group.id" -> s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}")
val kc = new KafkaCluster(kafkaParams)