aboutsummaryrefslogtreecommitdiff
path: root/external/kafka/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-02-09 22:45:48 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-02-09 22:45:48 -0800
commitc15134632e74e3dee05eda20c6ef79915e15d02e (patch)
tree4c97e1c6b7951d97950a7ff45c43b79d60733ede /external/kafka/src/test
parentef2f55b97f58fa06acb30e9e0172fb66fba383bc (diff)
downloadspark-c15134632e74e3dee05eda20c6ef79915e15d02e.tar.gz
spark-c15134632e74e3dee05eda20c6ef79915e15d02e.tar.bz2
spark-c15134632e74e3dee05eda20c6ef79915e15d02e.zip
[SPARK-4964][Streaming][Kafka] More updates to Exactly-once Kafka stream
Changes - Added example - Added a critical unit test that verifies that offset ranges can be recovered through checkpoints Might add more changes. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #4384 from tdas/new-kafka-fixes and squashes the following commits: 7c931c3 [Tathagata Das] Small update 3ed9284 [Tathagata Das] updated scala doc 83d0402 [Tathagata Das] Added JavaDirectKafkaWordCount example. 26df23c [Tathagata Das] Updates based on PR comments from Cody e4abf69 [Tathagata Das] Scala doc improvements and stuff. bb65232 [Tathagata Das] Fixed test bug and refactored KafkaStreamSuite 50f2b56 [Tathagata Das] Added Java API and added more Scala and Java unit tests. Also updated docs. e73589c [Tathagata Das] Minor changes. 4986784 [Tathagata Das] Added unit test to kafka offset recovery 6a91cab [Tathagata Das] Added example
Diffstat (limited to 'external/kafka/src/test')
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java159
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java5
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala302
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala24
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala92
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala8
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala62
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala4
8 files changed, 516 insertions, 140 deletions
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
new file mode 100644
index 0000000000..1334cc8fd1
--- /dev/null
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Arrays;
+
+import org.apache.spark.SparkConf;
+
+import scala.Tuple2;
+
+import junit.framework.Assert;
+
+import kafka.common.TopicAndPartition;
+import kafka.message.MessageAndMetadata;
+import kafka.serializer.StringDecoder;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+import org.junit.Test;
+import org.junit.After;
+import org.junit.Before;
+
+public class JavaDirectKafkaStreamSuite implements Serializable {
+ private transient JavaStreamingContext ssc = null;
+ private transient Random random = new Random();
+ private transient KafkaStreamSuiteBase suiteBase = null;
+
+ @Before
+ public void setUp() {
+ suiteBase = new KafkaStreamSuiteBase() { };
+ suiteBase.setupKafka();
+ System.clearProperty("spark.driver.port");
+ SparkConf sparkConf = new SparkConf()
+ .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+ ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
+ }
+
+ @After
+ public void tearDown() {
+ ssc.stop();
+ ssc = null;
+ System.clearProperty("spark.driver.port");
+ suiteBase.tearDownKafka();
+ }
+
+ @Test
+ public void testKafkaStream() throws InterruptedException {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ String[] topic1data = createTopicAndSendData(topic1);
+ String[] topic2data = createTopicAndSendData(topic2);
+
+ HashSet<String> sent = new HashSet<String>();
+ sent.addAll(Arrays.asList(topic1data));
+ sent.addAll(Arrays.asList(topic2data));
+
+ HashMap<String, String> kafkaParams = new HashMap<String, String>();
+ kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress());
+ kafkaParams.put("auto.offset.reset", "smallest");
+
+ JavaDStream<String> stream1 = KafkaUtils.createDirectStream(
+ ssc,
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ kafkaParams,
+ topicToSet(topic1)
+ ).map(
+ new Function<Tuple2<String, String>, String>() {
+ @Override
+ public String call(scala.Tuple2<String, String> kv) throws Exception {
+ return kv._2();
+ }
+ }
+ );
+
+ JavaDStream<String> stream2 = KafkaUtils.createDirectStream(
+ ssc,
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ String.class,
+ kafkaParams,
+ topicOffsetToMap(topic2, (long) 0),
+ new Function<MessageAndMetadata<String, String>, String>() {
+ @Override
+ public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
+ return msgAndMd.message();
+ }
+ }
+ );
+ JavaDStream<String> unifiedStream = stream1.union(stream2);
+
+ final HashSet<String> result = new HashSet<String>();
+ unifiedStream.foreachRDD(
+ new Function<JavaRDD<String>, Void>() {
+ @Override
+ public Void call(org.apache.spark.api.java.JavaRDD<String> rdd) throws Exception {
+ result.addAll(rdd.collect());
+ return null;
+ }
+ }
+ );
+ ssc.start();
+ long startTime = System.currentTimeMillis();
+ boolean matches = false;
+ while (!matches && System.currentTimeMillis() - startTime < 20000) {
+ matches = sent.size() == result.size();
+ Thread.sleep(50);
+ }
+ Assert.assertEquals(sent, result);
+ ssc.stop();
+ }
+
+ private HashSet<String> topicToSet(String topic) {
+ HashSet<String> topicSet = new HashSet<String>();
+ topicSet.add(topic);
+ return topicSet;
+ }
+
+ private HashMap<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) {
+ HashMap<TopicAndPartition, Long> topicMap = new HashMap<TopicAndPartition, Long>();
+ topicMap.put(new TopicAndPartition(topic, 0), offsetToStart);
+ return topicMap;
+ }
+
+ private String[] createTopicAndSendData(String topic) {
+ String[] data = { topic + "-1", topic + "-2", topic + "-3"};
+ suiteBase.createTopic(topic);
+ suiteBase.sendMessages(topic, data);
+ return data;
+ }
+}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 6e1abf3f38..208cc51b29 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -79,9 +79,10 @@ public class JavaKafkaStreamSuite implements Serializable {
suiteBase.createTopic(topic);
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
- suiteBase.produceAndSendMessage(topic,
+ suiteBase.sendMessages(topic,
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
- Predef.<Tuple2<String, Object>>conforms()));
+ Predef.<Tuple2<String, Object>>conforms())
+ );
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
new file mode 100644
index 0000000000..b25c2120d5
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.io.File
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import kafka.serializer.StringDecoder
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.concurrent.{Eventually, Timeouts}
+
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
+import org.apache.spark.streaming.dstream.{DStream, InputDStream}
+import org.apache.spark.util.Utils
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+
+class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
+ with BeforeAndAfter with BeforeAndAfterAll with Eventually {
+ val sparkConf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName(this.getClass.getSimpleName)
+
+ var sc: SparkContext = _
+ var ssc: StreamingContext = _
+ var testDir: File = _
+
+ override def beforeAll {
+ setupKafka()
+ }
+
+ override def afterAll {
+ tearDownKafka()
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ sc = null
+ }
+ if (sc != null) {
+ sc.stop()
+ }
+ if (testDir != null) {
+ Utils.deleteRecursively(testDir)
+ }
+ }
+
+
+ test("basic stream receiving with multiple topics and smallest starting offset") {
+ val topics = Set("basic1", "basic2", "basic3")
+ val data = Map("a" -> 7, "b" -> 9)
+ topics.foreach { t =>
+ createTopic(t)
+ sendMessages(t, data)
+ }
+ val kafkaParams = Map(
+ "metadata.broker.list" -> s"$brokerAddress",
+ "auto.offset.reset" -> "smallest"
+ )
+
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics)
+ }
+ var total = 0L
+
+ stream.foreachRDD { rdd =>
+ // Get the offset ranges in the RDD
+ val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
+ // For each partition, get size of the range in the partition,
+ // and the number of items in the partition
+ val off = offsets(i)
+ val all = iter.toSeq
+ val partSize = all.size
+ val rangeSize = off.untilOffset - off.fromOffset
+ Iterator((partSize, rangeSize))
+ }.collect
+
+ // Verify whether number of elements in each partition
+ // matches with the corresponding offset range
+ collected.foreach { case (partSize, rangeSize) =>
+ assert(partSize === rangeSize, "offset ranges are wrong")
+ }
+ total += collected.size // Add up all the collected items
+ }
+ ssc.start()
+ eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+ assert(total === data.values.sum * topics.size, "didn't get all messages")
+ }
+ ssc.stop()
+ }
+
+ test("receiving from largest starting offset") {
+ val topic = "largest"
+ val topicPartition = TopicAndPartition(topic, 0)
+ val data = Map("a" -> 10)
+ createTopic(topic)
+ val kafkaParams = Map(
+ "metadata.broker.list" -> s"$brokerAddress",
+ "auto.offset.reset" -> "largest"
+ )
+ val kc = new KafkaCluster(kafkaParams)
+ def getLatestOffset(): Long = {
+ kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
+ }
+
+ // Send some initial messages before starting context
+ sendMessages(topic, data)
+ eventually(timeout(10 seconds), interval(20 milliseconds)) {
+ assert(getLatestOffset() > 3)
+ }
+ val offsetBeforeStart = getLatestOffset()
+
+ // Setup context and kafka stream with largest offset
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, Set(topic))
+ }
+ assert(
+ stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
+ .fromOffsets(topicPartition) >= offsetBeforeStart,
+ "Start offset not from latest"
+ )
+
+ val collectedData = new mutable.ArrayBuffer[String]()
+ stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() }
+ ssc.start()
+ val newData = Map("b" -> 10)
+ sendMessages(topic, newData)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ collectedData.contains("b")
+ }
+ assert(!collectedData.contains("a"))
+ }
+
+
+ test("creating stream by offset") {
+ val topic = "offset"
+ val topicPartition = TopicAndPartition(topic, 0)
+ val data = Map("a" -> 10)
+ createTopic(topic)
+ val kafkaParams = Map(
+ "metadata.broker.list" -> s"$brokerAddress",
+ "auto.offset.reset" -> "largest"
+ )
+ val kc = new KafkaCluster(kafkaParams)
+ def getLatestOffset(): Long = {
+ kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
+ }
+
+ // Send some initial messages before starting context
+ sendMessages(topic, data)
+ eventually(timeout(10 seconds), interval(20 milliseconds)) {
+ assert(getLatestOffset() >= 10)
+ }
+ val offsetBeforeStart = getLatestOffset()
+
+ // Setup context and kafka stream with largest offset
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
+ ssc, kafkaParams, Map(topicPartition -> 11L),
+ (m: MessageAndMetadata[String, String]) => m.message())
+ }
+ assert(
+ stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
+ .fromOffsets(topicPartition) >= offsetBeforeStart,
+ "Start offset not from latest"
+ )
+
+ val collectedData = new mutable.ArrayBuffer[String]()
+ stream.foreachRDD { rdd => collectedData ++= rdd.collect() }
+ ssc.start()
+ val newData = Map("b" -> 10)
+ sendMessages(topic, newData)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ collectedData.contains("b")
+ }
+ assert(!collectedData.contains("a"))
+ }
+
+ // Test to verify the offset ranges can be recovered from the checkpoints
+ test("offset recovery") {
+ val topic = "recovery"
+ createTopic(topic)
+ testDir = Utils.createTempDir()
+
+ val kafkaParams = Map(
+ "metadata.broker.list" -> s"$brokerAddress",
+ "auto.offset.reset" -> "smallest"
+ )
+
+ // Send data to Kafka and wait for it to be received
+ def sendDataAndWaitForReceive(data: Seq[Int]) {
+ val strings = data.map { _.toString}
+ sendMessages(topic, strings.map { _ -> 1}.toMap)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains })
+ }
+ }
+
+ // Setup the streaming context
+ ssc = new StreamingContext(sparkConf, Milliseconds(100))
+ val kafkaStream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, Set(topic))
+ }
+ val keyedStream = kafkaStream.map { v => "key" -> v._2.toInt }
+ val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
+ Some(values.sum + state.getOrElse(0))
+ }
+ ssc.checkpoint(testDir.getAbsolutePath)
+
+ // This is to collect the raw data received from Kafka
+ kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
+ val data = rdd.map { _._2 }.collect()
+ DirectKafkaStreamSuite.collectedData.appendAll(data)
+ }
+
+ // This is ensure all the data is eventually receiving only once
+ stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
+ rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = x._2 }
+ }
+ ssc.start()
+
+ // Send some data and wait for them to be received
+ for (i <- (1 to 10).grouped(4)) {
+ sendDataAndWaitForReceive(i)
+ }
+
+ // Verify that offset ranges were generated
+ val offsetRangesBeforeStop = getOffsetRanges(kafkaStream)
+ assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated")
+ assert(
+ offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 },
+ "starting offset not zero"
+ )
+ ssc.stop()
+ logInfo("====== RESTARTING ========")
+
+ // Recover context from checkpoints
+ ssc = new StreamingContext(testDir.getAbsolutePath)
+ val recoveredStream = ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]]
+
+ // Verify offset ranges have been recovered
+ val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
+ assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
+ val earlierOffsetRangesAsSets = offsetRangesBeforeStop.map { x => (x._1, x._2.toSet) }
+ assert(
+ recoveredOffsetRanges.forall { or =>
+ earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
+ },
+ "Recovered ranges are not the same as the ones generated"
+ )
+
+ // Restart context, give more data and verify the total at the end
+ // If the total is write that means each records has been received only once
+ ssc.start()
+ sendDataAndWaitForReceive(11 to 20)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ assert(DirectKafkaStreamSuite.total === (1 to 20).sum)
+ }
+ ssc.stop()
+ }
+
+ /** Get the generated offset ranges from the DirectKafkaStream */
+ private def getOffsetRanges[K, V](
+ kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = {
+ kafkaStream.generatedRDDs.mapValues { rdd =>
+ rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges
+ }.toSeq.sortBy { _._1 }
+ }
+}
+
+object DirectKafkaStreamSuite {
+ val collectedData = new mutable.ArrayBuffer[String]()
+ var total = -1L
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
index e57c8f6987..fc9275b720 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
@@ -19,33 +19,29 @@ package org.apache.spark.streaming.kafka
import scala.util.Random
-import org.scalatest.BeforeAndAfter
import kafka.common.TopicAndPartition
+import org.scalatest.BeforeAndAfterAll
-class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
- val brokerHost = "localhost"
-
- val kafkaParams = Map("metadata.broker.list" -> s"$brokerHost:$brokerPort")
-
- val kc = new KafkaCluster(kafkaParams)
-
+class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll {
val topic = "kcsuitetopic" + Random.nextInt(10000)
-
val topicAndPartition = TopicAndPartition(topic, 0)
+ var kc: KafkaCluster = null
- before {
+ override def beforeAll() {
setupKafka()
createTopic(topic)
- produceAndSendMessage(topic, Map("a" -> 1))
+ sendMessages(topic, Map("a" -> 1))
+ kc = new KafkaCluster(Map("metadata.broker.list" -> s"$brokerAddress"))
}
- after {
+ override def afterAll() {
tearDownKafka()
}
test("metadata apis") {
- val leader = kc.findLeaders(Set(topicAndPartition)).right.get
- assert(leader(topicAndPartition) === (brokerHost, brokerPort), "didn't get leader")
+ val leader = kc.findLeaders(Set(topicAndPartition)).right.get(topicAndPartition)
+ val leaderAddress = s"${leader._1}:${leader._2}"
+ assert(leaderAddress === brokerAddress, "didn't get leader")
val parts = kc.getPartitions(Set(topic)).right.get
assert(parts(topicAndPartition), "didn't get partitions")
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala
deleted file mode 100644
index 0891ce344f..0000000000
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.util.Random
-import scala.concurrent.duration._
-
-import org.scalatest.BeforeAndAfter
-import org.scalatest.concurrent.Eventually
-
-import kafka.serializer.StringDecoder
-
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-
-class KafkaDirectStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
- val sparkConf = new SparkConf()
- .setMaster("local[4]")
- .setAppName(this.getClass.getSimpleName)
-
- val brokerHost = "localhost"
-
- val kafkaParams = Map(
- "metadata.broker.list" -> s"$brokerHost:$brokerPort",
- "auto.offset.reset" -> "smallest"
- )
-
- var ssc: StreamingContext = _
-
- before {
- setupKafka()
-
- ssc = new StreamingContext(sparkConf, Milliseconds(500))
- }
-
- after {
- if (ssc != null) {
- ssc.stop()
- }
- tearDownKafka()
- }
-
- test("multi topic stream") {
- val topics = Set("newA", "newB")
- val data = Map("a" -> 7, "b" -> 9)
- topics.foreach { t =>
- createTopic(t)
- produceAndSendMessage(t, data)
- }
- val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
- ssc, kafkaParams, topics)
- var total = 0L;
-
- stream.foreachRDD { rdd =>
- val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
- val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
- val off = offsets(i)
- val all = iter.toSeq
- val partSize = all.size
- val rangeSize = off.untilOffset - off.fromOffset
- all.map { _ =>
- (partSize, rangeSize)
- }.toIterator
- }.collect
- collected.foreach { case (partSize, rangeSize) =>
- assert(partSize === rangeSize, "offset ranges are wrong")
- }
- total += collected.size
- }
- ssc.start()
- eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
- assert(total === data.values.sum * topics.size, "didn't get all messages")
- }
- ssc.stop()
- }
-}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
index 9b9e3f5fce..6774db854a 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -46,9 +46,9 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
- produceAndSendMessage(topic, sent)
+ sendMessages(topic, sent)
- val kafkaParams = Map("metadata.broker.list" -> s"localhost:$brokerPort",
+ val kafkaParams = Map("metadata.broker.list" -> brokerAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}")
val kc = new KafkaCluster(kafkaParams)
@@ -65,14 +65,14 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
val rdd2 = getRdd(kc, Set(topic))
val sent2 = Map("d" -> 1)
- produceAndSendMessage(topic, sent2)
+ sendMessages(topic, sent2)
// this is the "0 messages" case
// make sure we dont get anything, since messages were sent after rdd was defined
assert(rdd2.isDefined)
assert(rdd2.get.count === 0)
val rdd3 = getRdd(kc, Set(topic))
- produceAndSendMessage(topic, Map("extra" -> 22))
+ sendMessages(topic, Map("extra" -> 22))
// this is the "exactly 1 message" case
// make sure we get exactly one message, despite there being lots more available
assert(rdd3.isDefined)
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index f207dc6d4f..e4966eebb9 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -48,30 +48,41 @@ import org.apache.spark.util.Utils
*/
abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging {
- var zkAddress: String = _
- var zkClient: ZkClient = _
-
private val zkHost = "localhost"
+ private var zkPort: Int = 0
private val zkConnectionTimeout = 6000
private val zkSessionTimeout = 6000
private var zookeeper: EmbeddedZookeeper = _
- private var zkPort: Int = 0
- protected var brokerPort = 9092
+ private val brokerHost = "localhost"
+ private var brokerPort = 9092
private var brokerConf: KafkaConfig = _
private var server: KafkaServer = _
private var producer: Producer[String, String] = _
+ private var zkReady = false
+ private var brokerReady = false
+
+ protected var zkClient: ZkClient = _
+
+ def zkAddress: String = {
+ assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address")
+ s"$zkHost:$zkPort"
+ }
+
+ def brokerAddress: String = {
+ assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address")
+ s"$brokerHost:$brokerPort"
+ }
def setupKafka() {
// Zookeeper server startup
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
- zkAddress = s"$zkHost:$zkPort"
- logInfo("==================== 0 ====================")
+ zkReady = true
+ logInfo("==================== Zookeeper Started ====================")
- zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout,
- ZKStringSerializer)
- logInfo("==================== 1 ====================")
+ zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
+ logInfo("==================== Zookeeper Client Created ====================")
// Kafka broker startup
var bindSuccess: Boolean = false
@@ -80,9 +91,8 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
val brokerProps = getBrokerConfig()
brokerConf = new KafkaConfig(brokerProps)
server = new KafkaServer(brokerConf)
- logInfo("==================== 2 ====================")
server.startup()
- logInfo("==================== 3 ====================")
+ logInfo("==================== Kafka Broker Started ====================")
bindSuccess = true
} catch {
case e: KafkaException =>
@@ -94,10 +104,13 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
}
Thread.sleep(2000)
- logInfo("==================== 4 ====================")
+ logInfo("==================== Kafka + Zookeeper Ready ====================")
+ brokerReady = true
}
def tearDownKafka() {
+ brokerReady = false
+ zkReady = false
if (producer != null) {
producer.close()
producer = null
@@ -121,26 +134,23 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
}
}
- private def createTestMessage(topic: String, sent: Map[String, Int])
- : Seq[KeyedMessage[String, String]] = {
- val messages = for ((s, freq) <- sent; i <- 0 until freq) yield {
- new KeyedMessage[String, String](topic, s)
- }
- messages.toSeq
- }
-
def createTopic(topic: String) {
AdminUtils.createTopic(zkClient, topic, 1, 1)
- logInfo("==================== 5 ====================")
// wait until metadata is propagated
waitUntilMetadataIsPropagated(topic, 0)
+ logInfo(s"==================== Topic $topic Created ====================")
}
- def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
+ def sendMessages(topic: String, messageToFreq: Map[String, Int]) {
+ val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray
+ sendMessages(topic, messages)
+ }
+
+ def sendMessages(topic: String, messages: Array[String]) {
producer = new Producer[String, String](new ProducerConfig(getProducerConfig()))
- producer.send(createTestMessage(topic, sent): _*)
+ producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
producer.close()
- logInfo("==================== 6 ====================")
+ logInfo(s"==================== Sent Messages: ${messages.mkString(", ")} ====================")
}
private def getBrokerConfig(): Properties = {
@@ -218,7 +228,7 @@ class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
- produceAndSendMessage(topic, sent)
+ sendMessages(topic, sent)
val kafkaParams = Map("zookeeper.connect" -> zkAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}",
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
index 64ccc92c81..fc53c23abd 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
@@ -79,7 +79,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
test("Reliable Kafka input stream with single topic") {
var topic = "test-topic"
createTopic(topic)
- produceAndSendMessage(topic, data)
+ sendMessages(topic, data)
// Verify whether the offset of this group/topic/partition is 0 before starting.
assert(getCommitOffset(groupId, topic, 0) === None)
@@ -111,7 +111,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
topics.foreach { case (t, _) =>
createTopic(t)
- produceAndSendMessage(t, data)
+ sendMessages(t, data)
}
// Before started, verify all the group/topic/partition offsets are 0.