aboutsummaryrefslogtreecommitdiff
path: root/external/kafka/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka/src/test')
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala73
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala92
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala99
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala8
4 files changed, 268 insertions, 4 deletions
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
new file mode 100644
index 0000000000..e57c8f6987
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfter
+import kafka.common.TopicAndPartition
+
+class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+ val brokerHost = "localhost"
+
+ val kafkaParams = Map("metadata.broker.list" -> s"$brokerHost:$brokerPort")
+
+ val kc = new KafkaCluster(kafkaParams)
+
+ val topic = "kcsuitetopic" + Random.nextInt(10000)
+
+ val topicAndPartition = TopicAndPartition(topic, 0)
+
+ before {
+ setupKafka()
+ createTopic(topic)
+ produceAndSendMessage(topic, Map("a" -> 1))
+ }
+
+ after {
+ tearDownKafka()
+ }
+
+ test("metadata apis") {
+ val leader = kc.findLeaders(Set(topicAndPartition)).right.get
+ assert(leader(topicAndPartition) === (brokerHost, brokerPort), "didn't get leader")
+
+ val parts = kc.getPartitions(Set(topic)).right.get
+ assert(parts(topicAndPartition), "didn't get partitions")
+ }
+
+ test("leader offset apis") {
+ val earliest = kc.getEarliestLeaderOffsets(Set(topicAndPartition)).right.get
+ assert(earliest(topicAndPartition).offset === 0, "didn't get earliest")
+
+ val latest = kc.getLatestLeaderOffsets(Set(topicAndPartition)).right.get
+ assert(latest(topicAndPartition).offset === 1, "didn't get latest")
+ }
+
+ test("consumer offset apis") {
+ val group = "kcsuitegroup" + Random.nextInt(10000)
+
+ val offset = Random.nextInt(10000)
+
+ val set = kc.setConsumerOffsets(group, Map(topicAndPartition -> offset))
+ assert(set.isRight, "didn't set consumer offsets")
+
+ val get = kc.getConsumerOffsets(group, Set(topicAndPartition)).right.get
+ assert(get(topicAndPartition) === offset, "didn't get consumer offsets")
+ }
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala
new file mode 100644
index 0000000000..0891ce344f
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+import scala.concurrent.duration._
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually
+
+import kafka.serializer.StringDecoder
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+
+class KafkaDirectStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
+ val sparkConf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName(this.getClass.getSimpleName)
+
+ val brokerHost = "localhost"
+
+ val kafkaParams = Map(
+ "metadata.broker.list" -> s"$brokerHost:$brokerPort",
+ "auto.offset.reset" -> "smallest"
+ )
+
+ var ssc: StreamingContext = _
+
+ before {
+ setupKafka()
+
+ ssc = new StreamingContext(sparkConf, Milliseconds(500))
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ }
+ tearDownKafka()
+ }
+
+ test("multi topic stream") {
+ val topics = Set("newA", "newB")
+ val data = Map("a" -> 7, "b" -> 9)
+ topics.foreach { t =>
+ createTopic(t)
+ produceAndSendMessage(t, data)
+ }
+ val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics)
+ var total = 0L;
+
+ stream.foreachRDD { rdd =>
+ val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
+ val off = offsets(i)
+ val all = iter.toSeq
+ val partSize = all.size
+ val rangeSize = off.untilOffset - off.fromOffset
+ all.map { _ =>
+ (partSize, rangeSize)
+ }.toIterator
+ }.collect
+ collected.foreach { case (partSize, rangeSize) =>
+ assert(partSize === rangeSize, "offset ranges are wrong")
+ }
+ total += collected.size
+ }
+ ssc.start()
+ eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+ assert(total === data.values.sum * topics.size, "didn't get all messages")
+ }
+ ssc.stop()
+ }
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
new file mode 100644
index 0000000000..9b9e3f5fce
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+
+import kafka.serializer.StringDecoder
+import kafka.common.TopicAndPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+
+class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+ var sc: SparkContext = _
+ before {
+ setupKafka()
+ }
+
+ after {
+ if (sc != null) {
+ sc.stop
+ sc = null
+ }
+ tearDownKafka()
+ }
+
+ test("Kafka RDD") {
+ val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
+ sc = new SparkContext(sparkConf)
+ val topic = "topic1"
+ val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
+ createTopic(topic)
+ produceAndSendMessage(topic, sent)
+
+ val kafkaParams = Map("metadata.broker.list" -> s"localhost:$brokerPort",
+ "group.id" -> s"test-consumer-${Random.nextInt(10000)}")
+
+ val kc = new KafkaCluster(kafkaParams)
+
+ val rdd = getRdd(kc, Set(topic))
+ // this is the "lots of messages" case
+ // make sure we get all of them
+ assert(rdd.isDefined)
+ assert(rdd.get.count === sent.values.sum)
+
+ kc.setConsumerOffsets(
+ kafkaParams("group.id"),
+ rdd.get.offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap)
+
+ val rdd2 = getRdd(kc, Set(topic))
+ val sent2 = Map("d" -> 1)
+ produceAndSendMessage(topic, sent2)
+ // this is the "0 messages" case
+ // make sure we dont get anything, since messages were sent after rdd was defined
+ assert(rdd2.isDefined)
+ assert(rdd2.get.count === 0)
+
+ val rdd3 = getRdd(kc, Set(topic))
+ produceAndSendMessage(topic, Map("extra" -> 22))
+ // this is the "exactly 1 message" case
+ // make sure we get exactly one message, despite there being lots more available
+ assert(rdd3.isDefined)
+ assert(rdd3.get.count === sent2.values.sum)
+
+ }
+
+ // get an rdd from the committed consumer offsets until the latest leader offsets,
+ private def getRdd(kc: KafkaCluster, topics: Set[String]) = {
+ val groupId = kc.kafkaParams("group.id")
+ for {
+ topicPartitions <- kc.getPartitions(topics).right.toOption
+ from <- kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
+ kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs =>
+ offs.map(kv => kv._1 -> kv._2.offset)
+ }
+ )
+ until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption
+ } yield {
+ KafkaRDD[String, String, StringDecoder, StringDecoder, String](
+ sc, kc.kafkaParams, from, until, mmd => s"${mmd.offset} ${mmd.message}")
+ }
+ }
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 0817c56d8f..f207dc6d4f 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -26,7 +26,7 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
import kafka.common.{KafkaException, TopicAndPartition}
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.serializer.{StringDecoder, StringEncoder}
@@ -56,7 +56,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
private val zkSessionTimeout = 6000
private var zookeeper: EmbeddedZookeeper = _
private var zkPort: Int = 0
- private var brokerPort = 9092
+ protected var brokerPort = 9092
private var brokerConf: KafkaConfig = _
private var server: KafkaServer = _
private var producer: Producer[String, String] = _
@@ -130,7 +130,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
}
def createTopic(topic: String) {
- CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+ AdminUtils.createTopic(zkClient, topic, 1, 1)
logInfo("==================== 5 ====================")
// wait until metadata is propagated
waitUntilMetadataIsPropagated(topic, 0)
@@ -166,7 +166,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
private def waitUntilMetadataIsPropagated(topic: String, partition: Int) {
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(
- server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)),
+ server.apis.metadataCache.containsTopicAndPartition(topic, partition),
s"Partition [$topic, $partition] metadata not propagated after timeout"
)
}