aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala28
1 files changed, 16 insertions, 12 deletions
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 926094449e..17ca9d145d 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -20,20 +20,21 @@ package org.apache.spark.streaming.kafka
import java.io.File
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.postfixOps
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.scalatest.concurrent.{Eventually, Timeouts}
+import org.scalatest.concurrent.Eventually
-import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
-import org.apache.spark.streaming.dstream.{DStream, InputDStream}
+import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.util.Utils
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
with BeforeAndAfter with BeforeAndAfterAll with Eventually {
@@ -67,13 +68,14 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
}
- ignore("basic stream receiving with multiple topics and smallest starting offset") {
+ test("basic stream receiving with multiple topics and smallest starting offset") {
val topics = Set("basic1", "basic2", "basic3")
val data = Map("a" -> 7, "b" -> 9)
topics.foreach { t =>
createTopic(t)
sendMessages(t, data)
}
+ val totalSent = data.values.sum * topics.size
val kafkaParams = Map(
"metadata.broker.list" -> s"$brokerAddress",
"auto.offset.reset" -> "smallest"
@@ -84,7 +86,8 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
}
- var total = 0L
+
+ val allReceived = new ArrayBuffer[(String, String)]
stream.foreachRDD { rdd =>
// Get the offset ranges in the RDD
@@ -104,16 +107,17 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
collected.foreach { case (partSize, rangeSize) =>
assert(partSize === rangeSize, "offset ranges are wrong")
}
- total += collected.size // Add up all the collected items
}
+ stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
- assert(total === data.values.sum * topics.size, "didn't get all messages")
+ assert(allReceived.size === totalSent,
+ "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n"))
}
ssc.stop()
}
- ignore("receiving from largest starting offset") {
+ test("receiving from largest starting offset") {
val topic = "largest"
val topicPartition = TopicAndPartition(topic, 0)
val data = Map("a" -> 10)
@@ -158,7 +162,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
}
- ignore("creating stream by offset") {
+ test("creating stream by offset") {
val topic = "offset"
val topicPartition = TopicAndPartition(topic, 0)
val data = Map("a" -> 10)
@@ -204,7 +208,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
}
// Test to verify the offset ranges can be recovered from the checkpoints
- ignore("offset recovery") {
+ test("offset recovery") {
val topic = "recovery"
createTopic(topic)
testDir = Utils.createTempDir()