aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2015-02-11 00:13:27 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-02-11 00:13:27 -0800
commit658687b25491047f30ee8558733d11e5a0572070 (patch)
tree9f4d67993bcdf9b331a65a917d79b487bbf4cf7b /external
parentc2131c0cdc57a4871ea23cd71e27e066d3c9a42c (diff)
downloadspark-658687b25491047f30ee8558733d11e5a0572070.tar.gz
spark-658687b25491047f30ee8558733d11e5a0572070.tar.bz2
spark-658687b25491047f30ee8558733d11e5a0572070.zip
[SPARK-4964] [Streaming] refactor createRDD to take leaders via map instead of array
Author: cody koeninger <cody@koeninger.org> Closes #4511 from koeninger/kafkaRdd-leader-to-broker and squashes the following commits: f7151d4 [cody koeninger] [SPARK-4964] test refactoring 6f8680b [cody koeninger] [SPARK-4964] add test of the scala api for KafkaUtils.createRDD f81e016 [cody koeninger] [SPARK-4964] leave KafkaStreamSuite host and port as private 5173f3f [cody koeninger] [SPARK-4964] test the Java variations of createRDD e9cece4 [cody koeninger] [SPARK-4964] pass leaders as a map to ensure 1 leader per TopicPartition
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala (renamed from external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala)57
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala44
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java156
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala96
4 files changed, 287 insertions, 66 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
index c129a26836..5a74febb4b 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
@@ -17,41 +17,52 @@
package org.apache.spark.streaming.kafka
-import kafka.common.TopicAndPartition
-
import org.apache.spark.annotation.Experimental
/**
* :: Experimental ::
- * Represent the host info for the leader of a Kafka partition.
+ * Represent the host and port info for a Kafka broker.
+ * Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID
*/
@Experimental
-final class Leader private(
- /** Kafka topic name */
- val topic: String,
- /** Kafka partition id */
- val partition: Int,
- /** Leader's hostname */
+final class Broker private(
+ /** Broker's hostname */
val host: String,
- /** Leader's port */
- val port: Int) extends Serializable
+ /** Broker's port */
+ val port: Int) extends Serializable {
+ override def equals(obj: Any): Boolean = obj match {
+ case that: Broker =>
+ this.host == that.host &&
+ this.port == that.port
+ case _ => false
+ }
+
+ override def hashCode: Int = {
+ 41 * (41 + host.hashCode) + port
+ }
+
+ override def toString(): String = {
+ s"Broker($host, $port)"
+ }
+}
/**
* :: Experimental ::
- * Companion object the provides methods to create instances of [[Leader]].
+ * Companion object that provides methods to create instances of [[Broker]].
*/
@Experimental
-object Leader {
- def create(topic: String, partition: Int, host: String, port: Int): Leader =
- new Leader(topic, partition, host, port)
-
- def create(topicAndPartition: TopicAndPartition, host: String, port: Int): Leader =
- new Leader(topicAndPartition.topic, topicAndPartition.partition, host, port)
-
- def apply(topic: String, partition: Int, host: String, port: Int): Leader =
- new Leader(topic, partition, host, port)
+object Broker {
+ def create(host: String, port: Int): Broker =
+ new Broker(host, port)
- def apply(topicAndPartition: TopicAndPartition, host: String, port: Int): Leader =
- new Leader(topicAndPartition.topic, topicAndPartition.partition, host, port)
+ def apply(host: String, port: Int): Broker =
+ new Broker(host, port)
+ def unapply(broker: Broker): Option[(String, Int)] = {
+ if (broker == null) {
+ None
+ } else {
+ Some((broker.host, broker.port))
+ }
+ }
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 7a2c3abdcc..af04bc6576 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -154,6 +154,19 @@ object KafkaUtils {
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}
+ /** get leaders for the given offset ranges, or throw an exception */
+ private def leadersForRanges(
+ kafkaParams: Map[String, String],
+ offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, Int)] = {
+ val kc = new KafkaCluster(kafkaParams)
+ val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet
+ val leaders = kc.findLeaders(topics).fold(
+ errs => throw new SparkException(errs.mkString("\n")),
+ ok => ok
+ )
+ leaders
+ }
+
/**
* Create a RDD from Kafka using offset ranges for each topic and partition.
*
@@ -176,12 +189,7 @@ object KafkaUtils {
offsetRanges: Array[OffsetRange]
): RDD[(K, V)] = {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
- val kc = new KafkaCluster(kafkaParams)
- val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet
- val leaders = kc.findLeaders(topics).fold(
- errs => throw new SparkException(errs.mkString("\n")),
- ok => ok
- )
+ val leaders = leadersForRanges(kafkaParams, offsetRanges)
new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
}
@@ -198,7 +206,8 @@ object KafkaUtils {
* host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
- * @param leaders Kafka leaders for each offset range in batch
+ * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
+ * in which case leaders will be looked up on the driver.
* @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
@@ -211,12 +220,17 @@ object KafkaUtils {
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange],
- leaders: Array[Leader],
+ leaders: Map[TopicAndPartition, Broker],
messageHandler: MessageAndMetadata[K, V] => R
): RDD[R] = {
- val leaderMap = leaders
- .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
- .toMap
+ val leaderMap = if (leaders.isEmpty) {
+ leadersForRanges(kafkaParams, offsetRanges)
+ } else {
+ // This could be avoided by refactoring KafkaRDD.leaders and KafkaCluster to use Broker
+ leaders.map {
+ case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
+ }.toMap
+ }
new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
}
@@ -263,7 +277,8 @@ object KafkaUtils {
* host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
- * @param leaders Kafka leaders for each offset range in batch
+ * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
+ * in which case leaders will be looked up on the driver.
* @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
@@ -276,7 +291,7 @@ object KafkaUtils {
recordClass: Class[R],
kafkaParams: JMap[String, String],
offsetRanges: Array[OffsetRange],
- leaders: Array[Leader],
+ leaders: JMap[TopicAndPartition, Broker],
messageHandler: JFunction[MessageAndMetadata[K, V], R]
): JavaRDD[R] = {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
@@ -284,8 +299,9 @@ object KafkaUtils {
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+ val leaderMap = Map(leaders.toSeq: _*)
createRDD[K, V, KD, VD, R](
- jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _)
+ jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaderMap, messageHandler.call _)
}
/**
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
new file mode 100644
index 0000000000..9d2e1705c6
--- /dev/null
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Arrays;
+
+import org.apache.spark.SparkConf;
+
+import scala.Tuple2;
+
+import junit.framework.Assert;
+
+import kafka.common.TopicAndPartition;
+import kafka.message.MessageAndMetadata;
+import kafka.serializer.StringDecoder;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+import org.junit.Test;
+import org.junit.After;
+import org.junit.Before;
+
+public class JavaKafkaRDDSuite implements Serializable {
+ private transient JavaSparkContext sc = null;
+ private transient KafkaStreamSuiteBase suiteBase = null;
+
+ @Before
+ public void setUp() {
+ suiteBase = new KafkaStreamSuiteBase() { };
+ suiteBase.setupKafka();
+ System.clearProperty("spark.driver.port");
+ SparkConf sparkConf = new SparkConf()
+ .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+ sc = new JavaSparkContext(sparkConf);
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ System.clearProperty("spark.driver.port");
+ suiteBase.tearDownKafka();
+ }
+
+ @Test
+ public void testKafkaRDD() throws InterruptedException {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ String[] topic1data = createTopicAndSendData(topic1);
+ String[] topic2data = createTopicAndSendData(topic2);
+
+ HashMap<String, String> kafkaParams = new HashMap<String, String>();
+ kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress());
+
+ OffsetRange[] offsetRanges = {
+ OffsetRange.create(topic1, 0, 0, 1),
+ OffsetRange.create(topic2, 0, 0, 1)
+ };
+
+ HashMap<TopicAndPartition, Broker> emptyLeaders = new HashMap();
+ HashMap<TopicAndPartition, Broker> leaders = new HashMap();
+ String[] hostAndPort = suiteBase.brokerAddress().split(":");
+ Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
+ leaders.put(new TopicAndPartition(topic1, 0), broker);
+ leaders.put(new TopicAndPartition(topic2, 0), broker);
+
+ JavaRDD<String> rdd1 = KafkaUtils.createRDD(
+ sc,
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ kafkaParams,
+ offsetRanges
+ ).map(
+ new Function<Tuple2<String, String>, String>() {
+ @Override
+ public String call(scala.Tuple2<String, String> kv) throws Exception {
+ return kv._2();
+ }
+ }
+ );
+
+ JavaRDD<String> rdd2 = KafkaUtils.createRDD(
+ sc,
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ String.class,
+ kafkaParams,
+ offsetRanges,
+ emptyLeaders,
+ new Function<MessageAndMetadata<String, String>, String>() {
+ @Override
+ public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
+ return msgAndMd.message();
+ }
+ }
+ );
+
+ JavaRDD<String> rdd3 = KafkaUtils.createRDD(
+ sc,
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ String.class,
+ kafkaParams,
+ offsetRanges,
+ leaders,
+ new Function<MessageAndMetadata<String, String>, String>() {
+ @Override
+ public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
+ return msgAndMd.message();
+ }
+ }
+ );
+
+ // just making sure the java user apis work; the scala tests handle logic corner cases
+ long count1 = rdd1.count();
+ long count2 = rdd2.count();
+ long count3 = rdd3.count();
+ Assert.assertTrue(count1 > 0);
+ Assert.assertEquals(count1, count2);
+ Assert.assertEquals(count1, count3);
+ }
+
+ private String[] createTopicAndSendData(String topic) {
+ String[] data = { topic + "-1", topic + "-2", topic + "-3"};
+ suiteBase.createTopic(topic);
+ suiteBase.sendMessages(topic, data);
+ return data;
+ }
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
index 6774db854a..a223da70b0 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -21,18 +21,22 @@ import scala.util.Random
import kafka.serializer.StringDecoder
import kafka.common.TopicAndPartition
-import org.scalatest.BeforeAndAfter
+import kafka.message.MessageAndMetadata
+import org.scalatest.BeforeAndAfterAll
import org.apache.spark._
import org.apache.spark.SparkContext._
-class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll {
+ val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
var sc: SparkContext = _
- before {
+ override def beforeAll {
+ sc = new SparkContext(sparkConf)
+
setupKafka()
}
- after {
+ override def afterAll {
if (sc != null) {
sc.stop
sc = null
@@ -40,60 +44,94 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
tearDownKafka()
}
- test("Kafka RDD") {
- val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
- sc = new SparkContext(sparkConf)
+ test("basic usage") {
+ val topic = "topicbasic"
+ createTopic(topic)
+ val messages = Set("the", "quick", "brown", "fox")
+ sendMessages(topic, messages.toArray)
+
+
+ val kafkaParams = Map("metadata.broker.list" -> brokerAddress,
+ "group.id" -> s"test-consumer-${Random.nextInt(10000)}")
+
+ val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
+
+ val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
+ sc, kafkaParams, offsetRanges)
+
+ val received = rdd.map(_._2).collect.toSet
+ assert(received === messages)
+ }
+
+ test("iterator boundary conditions") {
+ // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
- sendMessages(topic, sent)
val kafkaParams = Map("metadata.broker.list" -> brokerAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}")
val kc = new KafkaCluster(kafkaParams)
- val rdd = getRdd(kc, Set(topic))
// this is the "lots of messages" case
- // make sure we get all of them
+ sendMessages(topic, sent)
+ // rdd defined from leaders after sending messages, should get the number sent
+ val rdd = getRdd(kc, Set(topic))
+
assert(rdd.isDefined)
- assert(rdd.get.count === sent.values.sum)
+ assert(rdd.get.count === sent.values.sum, "didn't get all sent messages")
- kc.setConsumerOffsets(
- kafkaParams("group.id"),
- rdd.get.offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap)
+ val ranges = rdd.get.asInstanceOf[HasOffsetRanges]
+ .offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap
+
+ kc.setConsumerOffsets(kafkaParams("group.id"), ranges)
- val rdd2 = getRdd(kc, Set(topic))
- val sent2 = Map("d" -> 1)
- sendMessages(topic, sent2)
// this is the "0 messages" case
- // make sure we dont get anything, since messages were sent after rdd was defined
+ val rdd2 = getRdd(kc, Set(topic))
+ // shouldn't get anything, since message is sent after rdd was defined
+ val sentOnlyOne = Map("d" -> 1)
+
+ sendMessages(topic, sentOnlyOne)
assert(rdd2.isDefined)
- assert(rdd2.get.count === 0)
+ assert(rdd2.get.count === 0, "got messages when there shouldn't be any")
+ // this is the "exactly 1 message" case, namely the single message from sentOnlyOne above
val rdd3 = getRdd(kc, Set(topic))
+ // send lots of messages after rdd was defined, they shouldn't show up
sendMessages(topic, Map("extra" -> 22))
- // this is the "exactly 1 message" case
- // make sure we get exactly one message, despite there being lots more available
+
assert(rdd3.isDefined)
- assert(rdd3.get.count === sent2.values.sum)
+ assert(rdd3.get.count === sentOnlyOne.values.sum, "didn't get exactly one message")
}
// get an rdd from the committed consumer offsets until the latest leader offsets,
private def getRdd(kc: KafkaCluster, topics: Set[String]) = {
val groupId = kc.kafkaParams("group.id")
- for {
- topicPartitions <- kc.getPartitions(topics).right.toOption
- from <- kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
+ def consumerOffsets(topicPartitions: Set[TopicAndPartition]) = {
+ kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs =>
offs.map(kv => kv._1 -> kv._2.offset)
}
)
- until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption
- } yield {
- KafkaRDD[String, String, StringDecoder, StringDecoder, String](
- sc, kc.kafkaParams, from, until, mmd => s"${mmd.offset} ${mmd.message}")
+ }
+ kc.getPartitions(topics).right.toOption.flatMap { topicPartitions =>
+ consumerOffsets(topicPartitions).flatMap { from =>
+ kc.getLatestLeaderOffsets(topicPartitions).right.toOption.map { until =>
+ val offsetRanges = from.map { case (tp: TopicAndPartition, fromOffset: Long) =>
+ OffsetRange(tp.topic, tp.partition, fromOffset, until(tp).offset)
+ }.toArray
+
+ val leaders = until.map { case (tp: TopicAndPartition, lo: KafkaCluster.LeaderOffset) =>
+ tp -> Broker(lo.host, lo.port)
+ }.toMap
+
+ KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, String](
+ sc, kc.kafkaParams, offsetRanges, leaders,
+ (mmd: MessageAndMetadata[String, String]) => s"${mmd.offset} ${mmd.message}")
+ }
+ }
}
}
}