From b0c0021953826bccaee818a54afc44e8bdfa8572 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Wed, 4 Feb 2015 12:06:34 -0800 Subject: [SPARK-4964] [Streaming] Exactly-once semantics for Kafka Author: cody koeninger Closes #3798 from koeninger/kafkaRdd and squashes the following commits: 1dc2941 [cody koeninger] [SPARK-4964] silence ConsumerConfig warnings about broker connection props 59e29f6 [cody koeninger] [SPARK-4964] settle on "Direct" as a naming convention for the new stream 8c31855 [cody koeninger] [SPARK-4964] remove HasOffsetRanges interface from return types 0df3ebe [cody koeninger] [SPARK-4964] add comments per pwendell / dibbhatt 8991017 [cody koeninger] [SPARK-4964] formatting 825110f [cody koeninger] [SPARK-4964] rename stuff per TD 4354bce [cody koeninger] [SPARK-4964] per td, remove java interfaces, replace with final classes, corresponding changes to KafkaRDD constructor and checkpointing 9adaa0a [cody koeninger] [SPARK-4964] formatting 0090553 [cody koeninger] [SPARK-4964] javafication of interfaces 9a838c2 [cody koeninger] [SPARK-4964] code cleanup, add more tests 2b340d8 [cody koeninger] [SPARK-4964] refactor per TD feedback 80fd6ae [cody koeninger] [SPARK-4964] Rename createExactlyOnceStream so it isnt over-promising, change doc 99d2eba [cody koeninger] [SPARK-4964] Reduce level of nesting. If beginning is past end, its actually an error (may happen if Kafka topic was deleted and recreated) 19406cc [cody koeninger] Merge branch 'master' of https://github.com/apache/spark into kafkaRdd 2e67117 [cody koeninger] [SPARK-4964] one potential way of hiding most of the implementation, while still allowing access to offsets (but not subclassing) bb80bbe [cody koeninger] [SPARK-4964] scalastyle line length d4a7cf7 [cody koeninger] [SPARK-4964] allow for use cases that need to override compute for custom kafka dstreams c1bd6d9 [cody koeninger] [SPARK-4964] use newly available attemptNumber for correct retry behavior 548d529 [cody koeninger] Merge branch 'master' of https://github.com/apache/spark into kafkaRdd 0458e4e [cody koeninger] [SPARK-4964] recovery of generated rdds from checkpoint e86317b [cody koeninger] [SPARK-4964] try seed brokers in random order to spread metadata requests e93eb72 [cody koeninger] [SPARK-4964] refactor to add preferredLocations. depends on SPARK-4014 356c7cc [cody koeninger] [SPARK-4964] code cleanup per helena adf99a6 [cody koeninger] [SPARK-4964] fix serialization issues for checkpointing 1d50749 [cody koeninger] [SPARK-4964] code cleanup per tdas 8bfd6c0 [cody koeninger] [SPARK-4964] configure rate limiting via spark.streaming.receiver.maxRate e09045b [cody koeninger] [SPARK-4964] add foreachPartitionWithIndex, to avoid doing equivalent map + empty foreach boilerplate cac63ee [cody koeninger] additional testing, fix fencepost error 37d3053 [cody koeninger] make KafkaRDDPartition available to users so offsets can be committed per partition bcca8a4 [cody koeninger] Merge branch 'master' of https://github.com/apache/spark into kafkaRdd 6bf14f2 [cody koeninger] first attempt at a Kafka dstream that allows for exactly-once semantics 326ff3c [cody koeninger] add some tests 38bb727 [cody koeninger] give easy access to the parameters of a KafkaRDD 979da25 [cody koeninger] dont allow empty leader offsets to be returned 8d7de4a [cody koeninger] make sure leader offsets can be found even for leaders that arent in the seed brokers 4b078bf [cody koeninger] differentiate between leader and consumer offsets in error message 3c2a96a [cody koeninger] fix scalastyle errors 29c6b43 [cody koeninger] cleanup logging 783b477 [cody koeninger] update tests for kafka 8.1.1 7d050bc [cody koeninger] methods to set consumer offsets and get topic metadata, switch back to inclusive start / exclusive end to match typical kafka consumer behavior ce91c59 [cody koeninger] method to get consumer offsets, explicit error handling 4dafd1b [cody koeninger] method to get leader offsets, switch rdd bound to being exclusive start, inclusive end to match offsets typically returned from cluster 0b94b33 [cody koeninger] use dropWhile rather than filter to trim beginning of fetch response 1d70625 [cody koeninger] WIP on kafka cluster 76913e2 [cody koeninger] Batch oriented kafka rdd, WIP. todo: cluster metadata / finding leader --- .../spark/streaming/kafka/KafkaClusterSuite.scala | 73 ++++++++++++++++ .../streaming/kafka/KafkaDirectStreamSuite.scala | 92 ++++++++++++++++++++ .../spark/streaming/kafka/KafkaRDDSuite.scala | 99 ++++++++++++++++++++++ .../spark/streaming/kafka/KafkaStreamSuite.scala | 8 +- 4 files changed, 268 insertions(+), 4 deletions(-) create mode 100644 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala create mode 100644 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala create mode 100644 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala (limited to 'external/kafka/src/test') diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala new file mode 100644 index 0000000000..e57c8f6987 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.Random + +import org.scalatest.BeforeAndAfter +import kafka.common.TopicAndPartition + +class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfter { + val brokerHost = "localhost" + + val kafkaParams = Map("metadata.broker.list" -> s"$brokerHost:$brokerPort") + + val kc = new KafkaCluster(kafkaParams) + + val topic = "kcsuitetopic" + Random.nextInt(10000) + + val topicAndPartition = TopicAndPartition(topic, 0) + + before { + setupKafka() + createTopic(topic) + produceAndSendMessage(topic, Map("a" -> 1)) + } + + after { + tearDownKafka() + } + + test("metadata apis") { + val leader = kc.findLeaders(Set(topicAndPartition)).right.get + assert(leader(topicAndPartition) === (brokerHost, brokerPort), "didn't get leader") + + val parts = kc.getPartitions(Set(topic)).right.get + assert(parts(topicAndPartition), "didn't get partitions") + } + + test("leader offset apis") { + val earliest = kc.getEarliestLeaderOffsets(Set(topicAndPartition)).right.get + assert(earliest(topicAndPartition).offset === 0, "didn't get earliest") + + val latest = kc.getLatestLeaderOffsets(Set(topicAndPartition)).right.get + assert(latest(topicAndPartition).offset === 1, "didn't get latest") + } + + test("consumer offset apis") { + val group = "kcsuitegroup" + Random.nextInt(10000) + + val offset = Random.nextInt(10000) + + val set = kc.setConsumerOffsets(group, Map(topicAndPartition -> offset)) + assert(set.isRight, "didn't set consumer offsets") + + val get = kc.getConsumerOffsets(group, Set(topicAndPartition)).right.get + assert(get(topicAndPartition) === offset, "didn't get consumer offsets") + } +} diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala new file mode 100644 index 0000000000..0891ce344f --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.Random +import scala.concurrent.duration._ + +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.Eventually + +import kafka.serializer.StringDecoder + +import org.apache.spark.SparkConf +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Milliseconds, StreamingContext} + +class KafkaDirectStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually { + val sparkConf = new SparkConf() + .setMaster("local[4]") + .setAppName(this.getClass.getSimpleName) + + val brokerHost = "localhost" + + val kafkaParams = Map( + "metadata.broker.list" -> s"$brokerHost:$brokerPort", + "auto.offset.reset" -> "smallest" + ) + + var ssc: StreamingContext = _ + + before { + setupKafka() + + ssc = new StreamingContext(sparkConf, Milliseconds(500)) + } + + after { + if (ssc != null) { + ssc.stop() + } + tearDownKafka() + } + + test("multi topic stream") { + val topics = Set("newA", "newB") + val data = Map("a" -> 7, "b" -> 9) + topics.foreach { t => + createTopic(t) + produceAndSendMessage(t, data) + } + val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topics) + var total = 0L; + + stream.foreachRDD { rdd => + val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + val collected = rdd.mapPartitionsWithIndex { (i, iter) => + val off = offsets(i) + val all = iter.toSeq + val partSize = all.size + val rangeSize = off.untilOffset - off.fromOffset + all.map { _ => + (partSize, rangeSize) + }.toIterator + }.collect + collected.foreach { case (partSize, rangeSize) => + assert(partSize === rangeSize, "offset ranges are wrong") + } + total += collected.size + } + ssc.start() + eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { + assert(total === data.values.sum * topics.size, "didn't get all messages") + } + ssc.stop() + } +} diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala new file mode 100644 index 0000000000..9b9e3f5fce --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.Random + +import kafka.serializer.StringDecoder +import kafka.common.TopicAndPartition +import org.scalatest.BeforeAndAfter + +import org.apache.spark._ +import org.apache.spark.SparkContext._ + +class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { + var sc: SparkContext = _ + before { + setupKafka() + } + + after { + if (sc != null) { + sc.stop + sc = null + } + tearDownKafka() + } + + test("Kafka RDD") { + val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) + sc = new SparkContext(sparkConf) + val topic = "topic1" + val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) + createTopic(topic) + produceAndSendMessage(topic, sent) + + val kafkaParams = Map("metadata.broker.list" -> s"localhost:$brokerPort", + "group.id" -> s"test-consumer-${Random.nextInt(10000)}") + + val kc = new KafkaCluster(kafkaParams) + + val rdd = getRdd(kc, Set(topic)) + // this is the "lots of messages" case + // make sure we get all of them + assert(rdd.isDefined) + assert(rdd.get.count === sent.values.sum) + + kc.setConsumerOffsets( + kafkaParams("group.id"), + rdd.get.offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap) + + val rdd2 = getRdd(kc, Set(topic)) + val sent2 = Map("d" -> 1) + produceAndSendMessage(topic, sent2) + // this is the "0 messages" case + // make sure we dont get anything, since messages were sent after rdd was defined + assert(rdd2.isDefined) + assert(rdd2.get.count === 0) + + val rdd3 = getRdd(kc, Set(topic)) + produceAndSendMessage(topic, Map("extra" -> 22)) + // this is the "exactly 1 message" case + // make sure we get exactly one message, despite there being lots more available + assert(rdd3.isDefined) + assert(rdd3.get.count === sent2.values.sum) + + } + + // get an rdd from the committed consumer offsets until the latest leader offsets, + private def getRdd(kc: KafkaCluster, topics: Set[String]) = { + val groupId = kc.kafkaParams("group.id") + for { + topicPartitions <- kc.getPartitions(topics).right.toOption + from <- kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse( + kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs => + offs.map(kv => kv._1 -> kv._2.offset) + } + ) + until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption + } yield { + KafkaRDD[String, String, StringDecoder, StringDecoder, String]( + sc, kc.kafkaParams, from, until, mmd => s"${mmd.offset} ${mmd.message}") + } + } +} diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 0817c56d8f..f207dc6d4f 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -26,7 +26,7 @@ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.Random -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import kafka.common.{KafkaException, TopicAndPartition} import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import kafka.serializer.{StringDecoder, StringEncoder} @@ -56,7 +56,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin private val zkSessionTimeout = 6000 private var zookeeper: EmbeddedZookeeper = _ private var zkPort: Int = 0 - private var brokerPort = 9092 + protected var brokerPort = 9092 private var brokerConf: KafkaConfig = _ private var server: KafkaServer = _ private var producer: Producer[String, String] = _ @@ -130,7 +130,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin } def createTopic(topic: String) { - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0") + AdminUtils.createTopic(zkClient, topic, 1, 1) logInfo("==================== 5 ====================") // wait until metadata is propagated waitUntilMetadataIsPropagated(topic, 0) @@ -166,7 +166,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin private def waitUntilMetadataIsPropagated(topic: String, partition: Int) { eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { assert( - server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)), + server.apis.metadataCache.containsTopicAndPartition(topic, partition), s"Partition [$topic, $partition] metadata not propagated after timeout" ) } -- cgit v1.2.3