aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-02-17 22:44:16 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-02-17 22:44:16 -0800
commit3912d332464dcd124c60b734724c34d9742466a4 (patch)
treee9a4fc0b299717e916f0a1ba95027b60b7408e1d /external
parente50934f11e1e3ded21a631e5ab69db3c79467137 (diff)
downloadspark-3912d332464dcd124c60b734724c34d9742466a4.tar.gz
spark-3912d332464dcd124c60b734724c34d9742466a4.tar.bz2
spark-3912d332464dcd124c60b734724c34d9742466a4.zip
[SPARK-5731][Streaming][Test] Fix incorrect test in DirectKafkaStreamSuite
The test was incorrect. Instead of counting the number of records, it counted the number of partitions of RDD generated by DStream. Which is not its intention. I will be testing this patch multiple times to understand its flakiness. PS: This was caused by my refactoring in https://github.com/apache/spark/pull/4384/ koeninger check it out. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #4597 from tdas/kafka-flaky-test and squashes the following commits: d236235 [Tathagata Das] Unignored last test. e9a1820 [Tathagata Das] fix test
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala28
1 files changed, 16 insertions, 12 deletions
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 926094449e..17ca9d145d 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -20,20 +20,21 @@ package org.apache.spark.streaming.kafka
import java.io.File
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.postfixOps
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.scalatest.concurrent.{Eventually, Timeouts}
+import org.scalatest.concurrent.Eventually
-import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
-import org.apache.spark.streaming.dstream.{DStream, InputDStream}
+import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.util.Utils
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
with BeforeAndAfter with BeforeAndAfterAll with Eventually {
@@ -67,13 +68,14 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
}
- ignore("basic stream receiving with multiple topics and smallest starting offset") {
+ test("basic stream receiving with multiple topics and smallest starting offset") {
val topics = Set("basic1", "basic2", "basic3")
val data = Map("a" -> 7, "b" -> 9)
topics.foreach { t =>
createTopic(t)
sendMessages(t, data)
}
+ val totalSent = data.values.sum * topics.size
val kafkaParams = Map(
"metadata.broker.list" -> s"$brokerAddress",
"auto.offset.reset" -> "smallest"
@@ -84,7 +86,8 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
}
- var total = 0L
+
+ val allReceived = new ArrayBuffer[(String, String)]
stream.foreachRDD { rdd =>
// Get the offset ranges in the RDD
@@ -104,16 +107,17 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
collected.foreach { case (partSize, rangeSize) =>
assert(partSize === rangeSize, "offset ranges are wrong")
}
- total += collected.size // Add up all the collected items
}
+ stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
- assert(total === data.values.sum * topics.size, "didn't get all messages")
+ assert(allReceived.size === totalSent,
+ "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n"))
}
ssc.stop()
}
- ignore("receiving from largest starting offset") {
+ test("receiving from largest starting offset") {
val topic = "largest"
val topicPartition = TopicAndPartition(topic, 0)
val data = Map("a" -> 10)
@@ -158,7 +162,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
}
- ignore("creating stream by offset") {
+ test("creating stream by offset") {
val topic = "offset"
val topicPartition = TopicAndPartition(topic, 0)
val data = Map("a" -> 10)
@@ -204,7 +208,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
}
// Test to verify the offset ranges can be recovered from the checkpoints
- ignore("offset recovery") {
+ test("offset recovery") {
val topic = "recovery"
createTopic(topic)
testDir = Utils.createTempDir()