aboutsummaryrefslogtreecommitdiff
path: root/external/kafka/src/test
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2015-02-04 12:06:34 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-02-04 12:06:34 -0800
commitb0c0021953826bccaee818a54afc44e8bdfa8572 (patch)
tree2d196c9e214dcdc15f4d6598c1dc1e17dce363ca /external/kafka/src/test
parentac0b2b788ff144970d6fdbdc445367772770458d (diff)
downloadspark-b0c0021953826bccaee818a54afc44e8bdfa8572.tar.gz
spark-b0c0021953826bccaee818a54afc44e8bdfa8572.tar.bz2
spark-b0c0021953826bccaee818a54afc44e8bdfa8572.zip
[SPARK-4964] [Streaming] Exactly-once semantics for Kafka
Author: cody koeninger <cody@koeninger.org> Closes #3798 from koeninger/kafkaRdd and squashes the following commits: 1dc2941 [cody koeninger] [SPARK-4964] silence ConsumerConfig warnings about broker connection props 59e29f6 [cody koeninger] [SPARK-4964] settle on "Direct" as a naming convention for the new stream 8c31855 [cody koeninger] [SPARK-4964] remove HasOffsetRanges interface from return types 0df3ebe [cody koeninger] [SPARK-4964] add comments per pwendell / dibbhatt 8991017 [cody koeninger] [SPARK-4964] formatting 825110f [cody koeninger] [SPARK-4964] rename stuff per TD 4354bce [cody koeninger] [SPARK-4964] per td, remove java interfaces, replace with final classes, corresponding changes to KafkaRDD constructor and checkpointing 9adaa0a [cody koeninger] [SPARK-4964] formatting 0090553 [cody koeninger] [SPARK-4964] javafication of interfaces 9a838c2 [cody koeninger] [SPARK-4964] code cleanup, add more tests 2b340d8 [cody koeninger] [SPARK-4964] refactor per TD feedback 80fd6ae [cody koeninger] [SPARK-4964] Rename createExactlyOnceStream so it isnt over-promising, change doc 99d2eba [cody koeninger] [SPARK-4964] Reduce level of nesting. If beginning is past end, its actually an error (may happen if Kafka topic was deleted and recreated) 19406cc [cody koeninger] Merge branch 'master' of https://github.com/apache/spark into kafkaRdd 2e67117 [cody koeninger] [SPARK-4964] one potential way of hiding most of the implementation, while still allowing access to offsets (but not subclassing) bb80bbe [cody koeninger] [SPARK-4964] scalastyle line length d4a7cf7 [cody koeninger] [SPARK-4964] allow for use cases that need to override compute for custom kafka dstreams c1bd6d9 [cody koeninger] [SPARK-4964] use newly available attemptNumber for correct retry behavior 548d529 [cody koeninger] Merge branch 'master' of https://github.com/apache/spark into kafkaRdd 0458e4e [cody koeninger] [SPARK-4964] recovery of generated rdds from checkpoint e86317b [cody koeninger] [SPARK-4964] try seed brokers in random order to spread metadata requests e93eb72 [cody koeninger] [SPARK-4964] refactor to add preferredLocations. depends on SPARK-4014 356c7cc [cody koeninger] [SPARK-4964] code cleanup per helena adf99a6 [cody koeninger] [SPARK-4964] fix serialization issues for checkpointing 1d50749 [cody koeninger] [SPARK-4964] code cleanup per tdas 8bfd6c0 [cody koeninger] [SPARK-4964] configure rate limiting via spark.streaming.receiver.maxRate e09045b [cody koeninger] [SPARK-4964] add foreachPartitionWithIndex, to avoid doing equivalent map + empty foreach boilerplate cac63ee [cody koeninger] additional testing, fix fencepost error 37d3053 [cody koeninger] make KafkaRDDPartition available to users so offsets can be committed per partition bcca8a4 [cody koeninger] Merge branch 'master' of https://github.com/apache/spark into kafkaRdd 6bf14f2 [cody koeninger] first attempt at a Kafka dstream that allows for exactly-once semantics 326ff3c [cody koeninger] add some tests 38bb727 [cody koeninger] give easy access to the parameters of a KafkaRDD 979da25 [cody koeninger] dont allow empty leader offsets to be returned 8d7de4a [cody koeninger] make sure leader offsets can be found even for leaders that arent in the seed brokers 4b078bf [cody koeninger] differentiate between leader and consumer offsets in error message 3c2a96a [cody koeninger] fix scalastyle errors 29c6b43 [cody koeninger] cleanup logging 783b477 [cody koeninger] update tests for kafka 8.1.1 7d050bc [cody koeninger] methods to set consumer offsets and get topic metadata, switch back to inclusive start / exclusive end to match typical kafka consumer behavior ce91c59 [cody koeninger] method to get consumer offsets, explicit error handling 4dafd1b [cody koeninger] method to get leader offsets, switch rdd bound to being exclusive start, inclusive end to match offsets typically returned from cluster 0b94b33 [cody koeninger] use dropWhile rather than filter to trim beginning of fetch response 1d70625 [cody koeninger] WIP on kafka cluster 76913e2 [cody koeninger] Batch oriented kafka rdd, WIP. todo: cluster metadata / finding leader
Diffstat (limited to 'external/kafka/src/test')
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala73
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala92
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala99
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala8
4 files changed, 268 insertions, 4 deletions
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
new file mode 100644
index 0000000000..e57c8f6987
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfter
+import kafka.common.TopicAndPartition
+
+class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+ val brokerHost = "localhost"
+
+ val kafkaParams = Map("metadata.broker.list" -> s"$brokerHost:$brokerPort")
+
+ val kc = new KafkaCluster(kafkaParams)
+
+ val topic = "kcsuitetopic" + Random.nextInt(10000)
+
+ val topicAndPartition = TopicAndPartition(topic, 0)
+
+ before {
+ setupKafka()
+ createTopic(topic)
+ produceAndSendMessage(topic, Map("a" -> 1))
+ }
+
+ after {
+ tearDownKafka()
+ }
+
+ test("metadata apis") {
+ val leader = kc.findLeaders(Set(topicAndPartition)).right.get
+ assert(leader(topicAndPartition) === (brokerHost, brokerPort), "didn't get leader")
+
+ val parts = kc.getPartitions(Set(topic)).right.get
+ assert(parts(topicAndPartition), "didn't get partitions")
+ }
+
+ test("leader offset apis") {
+ val earliest = kc.getEarliestLeaderOffsets(Set(topicAndPartition)).right.get
+ assert(earliest(topicAndPartition).offset === 0, "didn't get earliest")
+
+ val latest = kc.getLatestLeaderOffsets(Set(topicAndPartition)).right.get
+ assert(latest(topicAndPartition).offset === 1, "didn't get latest")
+ }
+
+ test("consumer offset apis") {
+ val group = "kcsuitegroup" + Random.nextInt(10000)
+
+ val offset = Random.nextInt(10000)
+
+ val set = kc.setConsumerOffsets(group, Map(topicAndPartition -> offset))
+ assert(set.isRight, "didn't set consumer offsets")
+
+ val get = kc.getConsumerOffsets(group, Set(topicAndPartition)).right.get
+ assert(get(topicAndPartition) === offset, "didn't get consumer offsets")
+ }
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala
new file mode 100644
index 0000000000..0891ce344f
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+import scala.concurrent.duration._
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually
+
+import kafka.serializer.StringDecoder
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+
+class KafkaDirectStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
+ val sparkConf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName(this.getClass.getSimpleName)
+
+ val brokerHost = "localhost"
+
+ val kafkaParams = Map(
+ "metadata.broker.list" -> s"$brokerHost:$brokerPort",
+ "auto.offset.reset" -> "smallest"
+ )
+
+ var ssc: StreamingContext = _
+
+ before {
+ setupKafka()
+
+ ssc = new StreamingContext(sparkConf, Milliseconds(500))
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ }
+ tearDownKafka()
+ }
+
+ test("multi topic stream") {
+ val topics = Set("newA", "newB")
+ val data = Map("a" -> 7, "b" -> 9)
+ topics.foreach { t =>
+ createTopic(t)
+ produceAndSendMessage(t, data)
+ }
+ val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics)
+ var total = 0L;
+
+ stream.foreachRDD { rdd =>
+ val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
+ val off = offsets(i)
+ val all = iter.toSeq
+ val partSize = all.size
+ val rangeSize = off.untilOffset - off.fromOffset
+ all.map { _ =>
+ (partSize, rangeSize)
+ }.toIterator
+ }.collect
+ collected.foreach { case (partSize, rangeSize) =>
+ assert(partSize === rangeSize, "offset ranges are wrong")
+ }
+ total += collected.size
+ }
+ ssc.start()
+ eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+ assert(total === data.values.sum * topics.size, "didn't get all messages")
+ }
+ ssc.stop()
+ }
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
new file mode 100644
index 0000000000..9b9e3f5fce
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+
+import kafka.serializer.StringDecoder
+import kafka.common.TopicAndPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+
+class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+ var sc: SparkContext = _
+ before {
+ setupKafka()
+ }
+
+ after {
+ if (sc != null) {
+ sc.stop
+ sc = null
+ }
+ tearDownKafka()
+ }
+
+ test("Kafka RDD") {
+ val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
+ sc = new SparkContext(sparkConf)
+ val topic = "topic1"
+ val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
+ createTopic(topic)
+ produceAndSendMessage(topic, sent)
+
+ val kafkaParams = Map("metadata.broker.list" -> s"localhost:$brokerPort",
+ "group.id" -> s"test-consumer-${Random.nextInt(10000)}")
+
+ val kc = new KafkaCluster(kafkaParams)
+
+ val rdd = getRdd(kc, Set(topic))
+ // this is the "lots of messages" case
+ // make sure we get all of them
+ assert(rdd.isDefined)
+ assert(rdd.get.count === sent.values.sum)
+
+ kc.setConsumerOffsets(
+ kafkaParams("group.id"),
+ rdd.get.offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap)
+
+ val rdd2 = getRdd(kc, Set(topic))
+ val sent2 = Map("d" -> 1)
+ produceAndSendMessage(topic, sent2)
+ // this is the "0 messages" case
+ // make sure we dont get anything, since messages were sent after rdd was defined
+ assert(rdd2.isDefined)
+ assert(rdd2.get.count === 0)
+
+ val rdd3 = getRdd(kc, Set(topic))
+ produceAndSendMessage(topic, Map("extra" -> 22))
+ // this is the "exactly 1 message" case
+ // make sure we get exactly one message, despite there being lots more available
+ assert(rdd3.isDefined)
+ assert(rdd3.get.count === sent2.values.sum)
+
+ }
+
+ // get an rdd from the committed consumer offsets until the latest leader offsets,
+ private def getRdd(kc: KafkaCluster, topics: Set[String]) = {
+ val groupId = kc.kafkaParams("group.id")
+ for {
+ topicPartitions <- kc.getPartitions(topics).right.toOption
+ from <- kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
+ kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs =>
+ offs.map(kv => kv._1 -> kv._2.offset)
+ }
+ )
+ until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption
+ } yield {
+ KafkaRDD[String, String, StringDecoder, StringDecoder, String](
+ sc, kc.kafkaParams, from, until, mmd => s"${mmd.offset} ${mmd.message}")
+ }
+ }
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 0817c56d8f..f207dc6d4f 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -26,7 +26,7 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
import kafka.common.{KafkaException, TopicAndPartition}
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.serializer.{StringDecoder, StringEncoder}
@@ -56,7 +56,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
private val zkSessionTimeout = 6000
private var zookeeper: EmbeddedZookeeper = _
private var zkPort: Int = 0
- private var brokerPort = 9092
+ protected var brokerPort = 9092
private var brokerConf: KafkaConfig = _
private var server: KafkaServer = _
private var producer: Producer[String, String] = _
@@ -130,7 +130,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
}
def createTopic(topic: String) {
- CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+ AdminUtils.createTopic(zkClient, topic, 1, 1)
logInfo("==================== 5 ====================")
// wait until metadata is propagated
waitUntilMetadataIsPropagated(topic, 0)
@@ -166,7 +166,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
private def waitUntilMetadataIsPropagated(topic: String, partition: Int) {
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(
- server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)),
+ server.apis.metadataCache.containsTopicAndPartition(topic, partition),
s"Partition [$topic, $partition] metadata not propagated after timeout"
)
}