From 811d1798d7a76a70bc684f11854031763faadd42 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Wed, 11 Feb 2015 00:13:27 -0800 Subject: [SPARK-4964] [Streaming] refactor createRDD to take leaders via map instead of array Author: cody koeninger Closes #4511 from koeninger/kafkaRdd-leader-to-broker and squashes the following commits: f7151d4 [cody koeninger] [SPARK-4964] test refactoring 6f8680b [cody koeninger] [SPARK-4964] add test of the scala api for KafkaUtils.createRDD f81e016 [cody koeninger] [SPARK-4964] leave KafkaStreamSuite host and port as private 5173f3f [cody koeninger] [SPARK-4964] test the Java variations of createRDD e9cece4 [cody koeninger] [SPARK-4964] pass leaders as a map to ensure 1 leader per TopicPartition (cherry picked from commit 658687b25491047f30ee8558733d11e5a0572070) Signed-off-by: Tathagata Das --- .../org/apache/spark/streaming/kafka/Broker.scala | 68 +++++++++ .../apache/spark/streaming/kafka/KafkaUtils.scala | 44 ++++-- .../org/apache/spark/streaming/kafka/Leader.scala | 57 -------- .../spark/streaming/kafka/JavaKafkaRDDSuite.java | 156 +++++++++++++++++++++ .../spark/streaming/kafka/KafkaRDDSuite.scala | 96 +++++++++---- 5 files changed, 321 insertions(+), 100 deletions(-) create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala delete mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala create mode 100644 external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala new file mode 100644 index 0000000000..5a74febb4b --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Represent the host and port info for a Kafka broker. + * Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID + */ +@Experimental +final class Broker private( + /** Broker's hostname */ + val host: String, + /** Broker's port */ + val port: Int) extends Serializable { + override def equals(obj: Any): Boolean = obj match { + case that: Broker => + this.host == that.host && + this.port == that.port + case _ => false + } + + override def hashCode: Int = { + 41 * (41 + host.hashCode) + port + } + + override def toString(): String = { + s"Broker($host, $port)" + } +} + +/** + * :: Experimental :: + * Companion object that provides methods to create instances of [[Broker]]. + */ +@Experimental +object Broker { + def create(host: String, port: Int): Broker = + new Broker(host, port) + + def apply(host: String, port: Int): Broker = + new Broker(host, port) + + def unapply(broker: Broker): Option[(String, Int)] = { + if (broker == null) { + None + } else { + Some((broker.host, broker.port)) + } + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 7a2c3abdcc..af04bc6576 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -154,6 +154,19 @@ object KafkaUtils { jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + /** get leaders for the given offset ranges, or throw an exception */ + private def leadersForRanges( + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, Int)] = { + val kc = new KafkaCluster(kafkaParams) + val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet + val leaders = kc.findLeaders(topics).fold( + errs => throw new SparkException(errs.mkString("\n")), + ok => ok + ) + leaders + } + /** * Create a RDD from Kafka using offset ranges for each topic and partition. * @@ -176,12 +189,7 @@ object KafkaUtils { offsetRanges: Array[OffsetRange] ): RDD[(K, V)] = { val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) - val kc = new KafkaCluster(kafkaParams) - val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet - val leaders = kc.findLeaders(topics).fold( - errs => throw new SparkException(errs.mkString("\n")), - ok => ok - ) + val leaders = leadersForRanges(kafkaParams, offsetRanges) new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) } @@ -198,7 +206,8 @@ object KafkaUtils { * host1:port1,host2:port2 form. * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition - * @param leaders Kafka leaders for each offset range in batch + * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map, + * in which case leaders will be looked up on the driver. * @param messageHandler Function for translating each message and metadata into the desired type */ @Experimental @@ -211,12 +220,17 @@ object KafkaUtils { sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange], - leaders: Array[Leader], + leaders: Map[TopicAndPartition, Broker], messageHandler: MessageAndMetadata[K, V] => R ): RDD[R] = { - val leaderMap = leaders - .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port)) - .toMap + val leaderMap = if (leaders.isEmpty) { + leadersForRanges(kafkaParams, offsetRanges) + } else { + // This could be avoided by refactoring KafkaRDD.leaders and KafkaCluster to use Broker + leaders.map { + case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port)) + }.toMap + } new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) } @@ -263,7 +277,8 @@ object KafkaUtils { * host1:port1,host2:port2 form. * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition - * @param leaders Kafka leaders for each offset range in batch + * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map, + * in which case leaders will be looked up on the driver. * @param messageHandler Function for translating each message and metadata into the desired type */ @Experimental @@ -276,7 +291,7 @@ object KafkaUtils { recordClass: Class[R], kafkaParams: JMap[String, String], offsetRanges: Array[OffsetRange], - leaders: Array[Leader], + leaders: JMap[TopicAndPartition, Broker], messageHandler: JFunction[MessageAndMetadata[K, V], R] ): JavaRDD[R] = { implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) @@ -284,8 +299,9 @@ object KafkaUtils { implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) + val leaderMap = Map(leaders.toSeq: _*) createRDD[K, V, KD, VD, R]( - jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _) + jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaderMap, messageHandler.call _) } /** diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala deleted file mode 100644 index c129a26836..0000000000 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import kafka.common.TopicAndPartition - -import org.apache.spark.annotation.Experimental - -/** - * :: Experimental :: - * Represent the host info for the leader of a Kafka partition. - */ -@Experimental -final class Leader private( - /** Kafka topic name */ - val topic: String, - /** Kafka partition id */ - val partition: Int, - /** Leader's hostname */ - val host: String, - /** Leader's port */ - val port: Int) extends Serializable - -/** - * :: Experimental :: - * Companion object the provides methods to create instances of [[Leader]]. - */ -@Experimental -object Leader { - def create(topic: String, partition: Int, host: String, port: Int): Leader = - new Leader(topic, partition, host, port) - - def create(topicAndPartition: TopicAndPartition, host: String, port: Int): Leader = - new Leader(topicAndPartition.topic, topicAndPartition.partition, host, port) - - def apply(topic: String, partition: Int, host: String, port: Int): Leader = - new Leader(topic, partition, host, port) - - def apply(topicAndPartition: TopicAndPartition, host: String, port: Int): Leader = - new Leader(topicAndPartition.topic, topicAndPartition.partition, host, port) - -} diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java new file mode 100644 index 0000000000..9d2e1705c6 --- /dev/null +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Arrays; + +import org.apache.spark.SparkConf; + +import scala.Tuple2; + +import junit.framework.Assert; + +import kafka.common.TopicAndPartition; +import kafka.message.MessageAndMetadata; +import kafka.serializer.StringDecoder; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +import org.junit.Test; +import org.junit.After; +import org.junit.Before; + +public class JavaKafkaRDDSuite implements Serializable { + private transient JavaSparkContext sc = null; + private transient KafkaStreamSuiteBase suiteBase = null; + + @Before + public void setUp() { + suiteBase = new KafkaStreamSuiteBase() { }; + suiteBase.setupKafka(); + System.clearProperty("spark.driver.port"); + SparkConf sparkConf = new SparkConf() + .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); + sc = new JavaSparkContext(sparkConf); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + suiteBase.tearDownKafka(); + } + + @Test + public void testKafkaRDD() throws InterruptedException { + String topic1 = "topic1"; + String topic2 = "topic2"; + + String[] topic1data = createTopicAndSendData(topic1); + String[] topic2data = createTopicAndSendData(topic2); + + HashMap kafkaParams = new HashMap(); + kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress()); + + OffsetRange[] offsetRanges = { + OffsetRange.create(topic1, 0, 0, 1), + OffsetRange.create(topic2, 0, 0, 1) + }; + + HashMap emptyLeaders = new HashMap(); + HashMap leaders = new HashMap(); + String[] hostAndPort = suiteBase.brokerAddress().split(":"); + Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1])); + leaders.put(new TopicAndPartition(topic1, 0), broker); + leaders.put(new TopicAndPartition(topic2, 0), broker); + + JavaRDD rdd1 = KafkaUtils.createRDD( + sc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + kafkaParams, + offsetRanges + ).map( + new Function, String>() { + @Override + public String call(scala.Tuple2 kv) throws Exception { + return kv._2(); + } + } + ); + + JavaRDD rdd2 = KafkaUtils.createRDD( + sc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + String.class, + kafkaParams, + offsetRanges, + emptyLeaders, + new Function, String>() { + @Override + public String call(MessageAndMetadata msgAndMd) throws Exception { + return msgAndMd.message(); + } + } + ); + + JavaRDD rdd3 = KafkaUtils.createRDD( + sc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + String.class, + kafkaParams, + offsetRanges, + leaders, + new Function, String>() { + @Override + public String call(MessageAndMetadata msgAndMd) throws Exception { + return msgAndMd.message(); + } + } + ); + + // just making sure the java user apis work; the scala tests handle logic corner cases + long count1 = rdd1.count(); + long count2 = rdd2.count(); + long count3 = rdd3.count(); + Assert.assertTrue(count1 > 0); + Assert.assertEquals(count1, count2); + Assert.assertEquals(count1, count3); + } + + private String[] createTopicAndSendData(String topic) { + String[] data = { topic + "-1", topic + "-2", topic + "-3"}; + suiteBase.createTopic(topic); + suiteBase.sendMessages(topic, data); + return data; + } +} diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index 6774db854a..a223da70b0 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -21,18 +21,22 @@ import scala.util.Random import kafka.serializer.StringDecoder import kafka.common.TopicAndPartition -import org.scalatest.BeforeAndAfter +import kafka.message.MessageAndMetadata +import org.scalatest.BeforeAndAfterAll import org.apache.spark._ import org.apache.spark.SparkContext._ -class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { +class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll { + val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) var sc: SparkContext = _ - before { + override def beforeAll { + sc = new SparkContext(sparkConf) + setupKafka() } - after { + override def afterAll { if (sc != null) { sc.stop sc = null @@ -40,60 +44,94 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { tearDownKafka() } - test("Kafka RDD") { - val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) - sc = new SparkContext(sparkConf) + test("basic usage") { + val topic = "topicbasic" + createTopic(topic) + val messages = Set("the", "quick", "brown", "fox") + sendMessages(topic, messages.toArray) + + + val kafkaParams = Map("metadata.broker.list" -> brokerAddress, + "group.id" -> s"test-consumer-${Random.nextInt(10000)}") + + val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) + + val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder]( + sc, kafkaParams, offsetRanges) + + val received = rdd.map(_._2).collect.toSet + assert(received === messages) + } + + test("iterator boundary conditions") { + // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd val topic = "topic1" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) createTopic(topic) - sendMessages(topic, sent) val kafkaParams = Map("metadata.broker.list" -> brokerAddress, "group.id" -> s"test-consumer-${Random.nextInt(10000)}") val kc = new KafkaCluster(kafkaParams) - val rdd = getRdd(kc, Set(topic)) // this is the "lots of messages" case - // make sure we get all of them + sendMessages(topic, sent) + // rdd defined from leaders after sending messages, should get the number sent + val rdd = getRdd(kc, Set(topic)) + assert(rdd.isDefined) - assert(rdd.get.count === sent.values.sum) + assert(rdd.get.count === sent.values.sum, "didn't get all sent messages") - kc.setConsumerOffsets( - kafkaParams("group.id"), - rdd.get.offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap) + val ranges = rdd.get.asInstanceOf[HasOffsetRanges] + .offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap + + kc.setConsumerOffsets(kafkaParams("group.id"), ranges) - val rdd2 = getRdd(kc, Set(topic)) - val sent2 = Map("d" -> 1) - sendMessages(topic, sent2) // this is the "0 messages" case - // make sure we dont get anything, since messages were sent after rdd was defined + val rdd2 = getRdd(kc, Set(topic)) + // shouldn't get anything, since message is sent after rdd was defined + val sentOnlyOne = Map("d" -> 1) + + sendMessages(topic, sentOnlyOne) assert(rdd2.isDefined) - assert(rdd2.get.count === 0) + assert(rdd2.get.count === 0, "got messages when there shouldn't be any") + // this is the "exactly 1 message" case, namely the single message from sentOnlyOne above val rdd3 = getRdd(kc, Set(topic)) + // send lots of messages after rdd was defined, they shouldn't show up sendMessages(topic, Map("extra" -> 22)) - // this is the "exactly 1 message" case - // make sure we get exactly one message, despite there being lots more available + assert(rdd3.isDefined) - assert(rdd3.get.count === sent2.values.sum) + assert(rdd3.get.count === sentOnlyOne.values.sum, "didn't get exactly one message") } // get an rdd from the committed consumer offsets until the latest leader offsets, private def getRdd(kc: KafkaCluster, topics: Set[String]) = { val groupId = kc.kafkaParams("group.id") - for { - topicPartitions <- kc.getPartitions(topics).right.toOption - from <- kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse( + def consumerOffsets(topicPartitions: Set[TopicAndPartition]) = { + kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse( kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs => offs.map(kv => kv._1 -> kv._2.offset) } ) - until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption - } yield { - KafkaRDD[String, String, StringDecoder, StringDecoder, String]( - sc, kc.kafkaParams, from, until, mmd => s"${mmd.offset} ${mmd.message}") + } + kc.getPartitions(topics).right.toOption.flatMap { topicPartitions => + consumerOffsets(topicPartitions).flatMap { from => + kc.getLatestLeaderOffsets(topicPartitions).right.toOption.map { until => + val offsetRanges = from.map { case (tp: TopicAndPartition, fromOffset: Long) => + OffsetRange(tp.topic, tp.partition, fromOffset, until(tp).offset) + }.toArray + + val leaders = until.map { case (tp: TopicAndPartition, lo: KafkaCluster.LeaderOffset) => + tp -> Broker(lo.host, lo.port) + }.toMap + + KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, String]( + sc, kc.kafkaParams, offsetRanges, leaders, + (mmd: MessageAndMetadata[String, String]) => s"${mmd.offset} ${mmd.message}") + } + } } } } -- cgit v1.2.3