diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-02-17 22:44:16 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-02-17 22:44:16 -0800 |
commit | 3912d332464dcd124c60b734724c34d9742466a4 (patch) | |
tree | e9a4fc0b299717e916f0a1ba95027b60b7408e1d | |
parent | e50934f11e1e3ded21a631e5ab69db3c79467137 (diff) | |
download | spark-3912d332464dcd124c60b734724c34d9742466a4.tar.gz spark-3912d332464dcd124c60b734724c34d9742466a4.tar.bz2 spark-3912d332464dcd124c60b734724c34d9742466a4.zip |
[SPARK-5731][Streaming][Test] Fix incorrect test in DirectKafkaStreamSuite
The test was incorrect. Instead of counting the number of records, it counted the number of partitions of RDD generated by DStream. Which is not its intention. I will be testing this patch multiple times to understand its flakiness.
PS: This was caused by my refactoring in https://github.com/apache/spark/pull/4384/
koeninger check it out.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #4597 from tdas/kafka-flaky-test and squashes the following commits:
d236235 [Tathagata Das] Unignored last test.
e9a1820 [Tathagata Das] fix test
-rw-r--r-- | external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala | 28 |
1 files changed, 16 insertions, 12 deletions
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 926094449e..17ca9d145d 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -20,20 +20,21 @@ package org.apache.spark.streaming.kafka import java.io.File import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.language.postfixOps +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import org.scalatest.concurrent.{Eventually, Timeouts} +import org.scalatest.concurrent.Eventually -import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} -import org.apache.spark.streaming.dstream.{DStream, InputDStream} +import org.apache.spark.streaming.dstream.DStream import org.apache.spark.util.Utils -import kafka.common.TopicAndPartition -import kafka.message.MessageAndMetadata class DirectKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with BeforeAndAfterAll with Eventually { @@ -67,13 +68,14 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase } - ignore("basic stream receiving with multiple topics and smallest starting offset") { + test("basic stream receiving with multiple topics and smallest starting offset") { val topics = Set("basic1", "basic2", "basic3") val data = Map("a" -> 7, "b" -> 9) topics.foreach { t => createTopic(t) sendMessages(t, data) } + val totalSent = data.values.sum * topics.size val kafkaParams = Map( "metadata.broker.list" -> s"$brokerAddress", "auto.offset.reset" -> "smallest" @@ -84,7 +86,8 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics) } - var total = 0L + + val allReceived = new ArrayBuffer[(String, String)] stream.foreachRDD { rdd => // Get the offset ranges in the RDD @@ -104,16 +107,17 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase collected.foreach { case (partSize, rangeSize) => assert(partSize === rangeSize, "offset ranges are wrong") } - total += collected.size // Add up all the collected items } + stream.foreachRDD { rdd => allReceived ++= rdd.collect() } ssc.start() eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { - assert(total === data.values.sum * topics.size, "didn't get all messages") + assert(allReceived.size === totalSent, + "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n")) } ssc.stop() } - ignore("receiving from largest starting offset") { + test("receiving from largest starting offset") { val topic = "largest" val topicPartition = TopicAndPartition(topic, 0) val data = Map("a" -> 10) @@ -158,7 +162,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase } - ignore("creating stream by offset") { + test("creating stream by offset") { val topic = "offset" val topicPartition = TopicAndPartition(topic, 0) val data = Map("a" -> 10) @@ -204,7 +208,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase } // Test to verify the offset ranges can be recovered from the checkpoints - ignore("offset recovery") { + test("offset recovery") { val topic = "recovery" createTopic(topic) testDir = Utils.createTempDir() |