diff options
author | cody koeninger <cody@koeninger.org> | 2015-02-11 00:13:27 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-02-11 00:13:27 -0800 |
commit | 658687b25491047f30ee8558733d11e5a0572070 (patch) | |
tree | 9f4d67993bcdf9b331a65a917d79b487bbf4cf7b /external/kafka/src/test | |
parent | c2131c0cdc57a4871ea23cd71e27e066d3c9a42c (diff) | |
download | spark-658687b25491047f30ee8558733d11e5a0572070.tar.gz spark-658687b25491047f30ee8558733d11e5a0572070.tar.bz2 spark-658687b25491047f30ee8558733d11e5a0572070.zip |
[SPARK-4964] [Streaming] refactor createRDD to take leaders via map instead of array
Author: cody koeninger <cody@koeninger.org>
Closes #4511 from koeninger/kafkaRdd-leader-to-broker and squashes the following commits:
f7151d4 [cody koeninger] [SPARK-4964] test refactoring
6f8680b [cody koeninger] [SPARK-4964] add test of the scala api for KafkaUtils.createRDD
f81e016 [cody koeninger] [SPARK-4964] leave KafkaStreamSuite host and port as private
5173f3f [cody koeninger] [SPARK-4964] test the Java variations of createRDD
e9cece4 [cody koeninger] [SPARK-4964] pass leaders as a map to ensure 1 leader per TopicPartition
Diffstat (limited to 'external/kafka/src/test')
-rw-r--r-- | external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java | 156 | ||||
-rw-r--r-- | external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala | 96 |
2 files changed, 223 insertions, 29 deletions
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java new file mode 100644 index 0000000000..9d2e1705c6 --- /dev/null +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Arrays; + +import org.apache.spark.SparkConf; + +import scala.Tuple2; + +import junit.framework.Assert; + +import kafka.common.TopicAndPartition; +import kafka.message.MessageAndMetadata; +import kafka.serializer.StringDecoder; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +import org.junit.Test; +import org.junit.After; +import org.junit.Before; + +public class JavaKafkaRDDSuite implements Serializable { + private transient JavaSparkContext sc = null; + private transient KafkaStreamSuiteBase suiteBase = null; + + @Before + public void setUp() { + suiteBase = new KafkaStreamSuiteBase() { }; + suiteBase.setupKafka(); + System.clearProperty("spark.driver.port"); + SparkConf sparkConf = new SparkConf() + .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); + sc = new JavaSparkContext(sparkConf); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + suiteBase.tearDownKafka(); + } + + @Test + public void testKafkaRDD() throws InterruptedException { + String topic1 = "topic1"; + String topic2 = "topic2"; + + String[] topic1data = createTopicAndSendData(topic1); + String[] topic2data = createTopicAndSendData(topic2); + + HashMap<String, String> kafkaParams = new HashMap<String, String>(); + kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress()); + + OffsetRange[] offsetRanges = { + OffsetRange.create(topic1, 0, 0, 1), + OffsetRange.create(topic2, 0, 0, 1) + }; + + HashMap<TopicAndPartition, Broker> emptyLeaders = new HashMap(); + HashMap<TopicAndPartition, Broker> leaders = new HashMap(); + String[] hostAndPort = suiteBase.brokerAddress().split(":"); + Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1])); + leaders.put(new TopicAndPartition(topic1, 0), broker); + leaders.put(new TopicAndPartition(topic2, 0), broker); + + JavaRDD<String> rdd1 = KafkaUtils.createRDD( + sc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + kafkaParams, + offsetRanges + ).map( + new Function<Tuple2<String, String>, String>() { + @Override + public String call(scala.Tuple2<String, String> kv) throws Exception { + return kv._2(); + } + } + ); + + JavaRDD<String> rdd2 = KafkaUtils.createRDD( + sc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + String.class, + kafkaParams, + offsetRanges, + emptyLeaders, + new Function<MessageAndMetadata<String, String>, String>() { + @Override + public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception { + return msgAndMd.message(); + } + } + ); + + JavaRDD<String> rdd3 = KafkaUtils.createRDD( + sc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + String.class, + kafkaParams, + offsetRanges, + leaders, + new Function<MessageAndMetadata<String, String>, String>() { + @Override + public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception { + return msgAndMd.message(); + } + } + ); + + // just making sure the java user apis work; the scala tests handle logic corner cases + long count1 = rdd1.count(); + long count2 = rdd2.count(); + long count3 = rdd3.count(); + Assert.assertTrue(count1 > 0); + Assert.assertEquals(count1, count2); + Assert.assertEquals(count1, count3); + } + + private String[] createTopicAndSendData(String topic) { + String[] data = { topic + "-1", topic + "-2", topic + "-3"}; + suiteBase.createTopic(topic); + suiteBase.sendMessages(topic, data); + return data; + } +} diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index 6774db854a..a223da70b0 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -21,18 +21,22 @@ import scala.util.Random import kafka.serializer.StringDecoder import kafka.common.TopicAndPartition -import org.scalatest.BeforeAndAfter +import kafka.message.MessageAndMetadata +import org.scalatest.BeforeAndAfterAll import org.apache.spark._ import org.apache.spark.SparkContext._ -class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { +class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll { + val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) var sc: SparkContext = _ - before { + override def beforeAll { + sc = new SparkContext(sparkConf) + setupKafka() } - after { + override def afterAll { if (sc != null) { sc.stop sc = null @@ -40,60 +44,94 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { tearDownKafka() } - test("Kafka RDD") { - val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) - sc = new SparkContext(sparkConf) + test("basic usage") { + val topic = "topicbasic" + createTopic(topic) + val messages = Set("the", "quick", "brown", "fox") + sendMessages(topic, messages.toArray) + + + val kafkaParams = Map("metadata.broker.list" -> brokerAddress, + "group.id" -> s"test-consumer-${Random.nextInt(10000)}") + + val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) + + val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder]( + sc, kafkaParams, offsetRanges) + + val received = rdd.map(_._2).collect.toSet + assert(received === messages) + } + + test("iterator boundary conditions") { + // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd val topic = "topic1" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) createTopic(topic) - sendMessages(topic, sent) val kafkaParams = Map("metadata.broker.list" -> brokerAddress, "group.id" -> s"test-consumer-${Random.nextInt(10000)}") val kc = new KafkaCluster(kafkaParams) - val rdd = getRdd(kc, Set(topic)) // this is the "lots of messages" case - // make sure we get all of them + sendMessages(topic, sent) + // rdd defined from leaders after sending messages, should get the number sent + val rdd = getRdd(kc, Set(topic)) + assert(rdd.isDefined) - assert(rdd.get.count === sent.values.sum) + assert(rdd.get.count === sent.values.sum, "didn't get all sent messages") - kc.setConsumerOffsets( - kafkaParams("group.id"), - rdd.get.offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap) + val ranges = rdd.get.asInstanceOf[HasOffsetRanges] + .offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap + + kc.setConsumerOffsets(kafkaParams("group.id"), ranges) - val rdd2 = getRdd(kc, Set(topic)) - val sent2 = Map("d" -> 1) - sendMessages(topic, sent2) // this is the "0 messages" case - // make sure we dont get anything, since messages were sent after rdd was defined + val rdd2 = getRdd(kc, Set(topic)) + // shouldn't get anything, since message is sent after rdd was defined + val sentOnlyOne = Map("d" -> 1) + + sendMessages(topic, sentOnlyOne) assert(rdd2.isDefined) - assert(rdd2.get.count === 0) + assert(rdd2.get.count === 0, "got messages when there shouldn't be any") + // this is the "exactly 1 message" case, namely the single message from sentOnlyOne above val rdd3 = getRdd(kc, Set(topic)) + // send lots of messages after rdd was defined, they shouldn't show up sendMessages(topic, Map("extra" -> 22)) - // this is the "exactly 1 message" case - // make sure we get exactly one message, despite there being lots more available + assert(rdd3.isDefined) - assert(rdd3.get.count === sent2.values.sum) + assert(rdd3.get.count === sentOnlyOne.values.sum, "didn't get exactly one message") } // get an rdd from the committed consumer offsets until the latest leader offsets, private def getRdd(kc: KafkaCluster, topics: Set[String]) = { val groupId = kc.kafkaParams("group.id") - for { - topicPartitions <- kc.getPartitions(topics).right.toOption - from <- kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse( + def consumerOffsets(topicPartitions: Set[TopicAndPartition]) = { + kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse( kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs => offs.map(kv => kv._1 -> kv._2.offset) } ) - until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption - } yield { - KafkaRDD[String, String, StringDecoder, StringDecoder, String]( - sc, kc.kafkaParams, from, until, mmd => s"${mmd.offset} ${mmd.message}") + } + kc.getPartitions(topics).right.toOption.flatMap { topicPartitions => + consumerOffsets(topicPartitions).flatMap { from => + kc.getLatestLeaderOffsets(topicPartitions).right.toOption.map { until => + val offsetRanges = from.map { case (tp: TopicAndPartition, fromOffset: Long) => + OffsetRange(tp.topic, tp.partition, fromOffset, until(tp).offset) + }.toArray + + val leaders = until.map { case (tp: TopicAndPartition, lo: KafkaCluster.LeaderOffset) => + tp -> Broker(lo.host, lo.port) + }.toMap + + KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, String]( + sc, kc.kafkaParams, offsetRanges, leaders, + (mmd: MessageAndMetadata[String, String]) => s"${mmd.offset} ${mmd.message}") + } + } } } } |