aboutsummaryrefslogtreecommitdiff
path: root/external/kafka/src/test
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2015-02-11 00:13:27 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-02-11 00:13:27 -0800
commit658687b25491047f30ee8558733d11e5a0572070 (patch)
tree9f4d67993bcdf9b331a65a917d79b487bbf4cf7b /external/kafka/src/test
parentc2131c0cdc57a4871ea23cd71e27e066d3c9a42c (diff)
downloadspark-658687b25491047f30ee8558733d11e5a0572070.tar.gz
spark-658687b25491047f30ee8558733d11e5a0572070.tar.bz2
spark-658687b25491047f30ee8558733d11e5a0572070.zip
[SPARK-4964] [Streaming] refactor createRDD to take leaders via map instead of array
Author: cody koeninger <cody@koeninger.org> Closes #4511 from koeninger/kafkaRdd-leader-to-broker and squashes the following commits: f7151d4 [cody koeninger] [SPARK-4964] test refactoring 6f8680b [cody koeninger] [SPARK-4964] add test of the scala api for KafkaUtils.createRDD f81e016 [cody koeninger] [SPARK-4964] leave KafkaStreamSuite host and port as private 5173f3f [cody koeninger] [SPARK-4964] test the Java variations of createRDD e9cece4 [cody koeninger] [SPARK-4964] pass leaders as a map to ensure 1 leader per TopicPartition
Diffstat (limited to 'external/kafka/src/test')
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java156
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala96
2 files changed, 223 insertions, 29 deletions
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
new file mode 100644
index 0000000000..9d2e1705c6
--- /dev/null
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Arrays;
+
+import org.apache.spark.SparkConf;
+
+import scala.Tuple2;
+
+import junit.framework.Assert;
+
+import kafka.common.TopicAndPartition;
+import kafka.message.MessageAndMetadata;
+import kafka.serializer.StringDecoder;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+import org.junit.Test;
+import org.junit.After;
+import org.junit.Before;
+
+public class JavaKafkaRDDSuite implements Serializable {
+ private transient JavaSparkContext sc = null;
+ private transient KafkaStreamSuiteBase suiteBase = null;
+
+ @Before
+ public void setUp() {
+ suiteBase = new KafkaStreamSuiteBase() { };
+ suiteBase.setupKafka();
+ System.clearProperty("spark.driver.port");
+ SparkConf sparkConf = new SparkConf()
+ .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+ sc = new JavaSparkContext(sparkConf);
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ System.clearProperty("spark.driver.port");
+ suiteBase.tearDownKafka();
+ }
+
+ @Test
+ public void testKafkaRDD() throws InterruptedException {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ String[] topic1data = createTopicAndSendData(topic1);
+ String[] topic2data = createTopicAndSendData(topic2);
+
+ HashMap<String, String> kafkaParams = new HashMap<String, String>();
+ kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress());
+
+ OffsetRange[] offsetRanges = {
+ OffsetRange.create(topic1, 0, 0, 1),
+ OffsetRange.create(topic2, 0, 0, 1)
+ };
+
+ HashMap<TopicAndPartition, Broker> emptyLeaders = new HashMap();
+ HashMap<TopicAndPartition, Broker> leaders = new HashMap();
+ String[] hostAndPort = suiteBase.brokerAddress().split(":");
+ Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
+ leaders.put(new TopicAndPartition(topic1, 0), broker);
+ leaders.put(new TopicAndPartition(topic2, 0), broker);
+
+ JavaRDD<String> rdd1 = KafkaUtils.createRDD(
+ sc,
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ kafkaParams,
+ offsetRanges
+ ).map(
+ new Function<Tuple2<String, String>, String>() {
+ @Override
+ public String call(scala.Tuple2<String, String> kv) throws Exception {
+ return kv._2();
+ }
+ }
+ );
+
+ JavaRDD<String> rdd2 = KafkaUtils.createRDD(
+ sc,
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ String.class,
+ kafkaParams,
+ offsetRanges,
+ emptyLeaders,
+ new Function<MessageAndMetadata<String, String>, String>() {
+ @Override
+ public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
+ return msgAndMd.message();
+ }
+ }
+ );
+
+ JavaRDD<String> rdd3 = KafkaUtils.createRDD(
+ sc,
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ String.class,
+ kafkaParams,
+ offsetRanges,
+ leaders,
+ new Function<MessageAndMetadata<String, String>, String>() {
+ @Override
+ public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
+ return msgAndMd.message();
+ }
+ }
+ );
+
+ // just making sure the java user apis work; the scala tests handle logic corner cases
+ long count1 = rdd1.count();
+ long count2 = rdd2.count();
+ long count3 = rdd3.count();
+ Assert.assertTrue(count1 > 0);
+ Assert.assertEquals(count1, count2);
+ Assert.assertEquals(count1, count3);
+ }
+
+ private String[] createTopicAndSendData(String topic) {
+ String[] data = { topic + "-1", topic + "-2", topic + "-3"};
+ suiteBase.createTopic(topic);
+ suiteBase.sendMessages(topic, data);
+ return data;
+ }
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
index 6774db854a..a223da70b0 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -21,18 +21,22 @@ import scala.util.Random
import kafka.serializer.StringDecoder
import kafka.common.TopicAndPartition
-import org.scalatest.BeforeAndAfter
+import kafka.message.MessageAndMetadata
+import org.scalatest.BeforeAndAfterAll
import org.apache.spark._
import org.apache.spark.SparkContext._
-class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll {
+ val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
var sc: SparkContext = _
- before {
+ override def beforeAll {
+ sc = new SparkContext(sparkConf)
+
setupKafka()
}
- after {
+ override def afterAll {
if (sc != null) {
sc.stop
sc = null
@@ -40,60 +44,94 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
tearDownKafka()
}
- test("Kafka RDD") {
- val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
- sc = new SparkContext(sparkConf)
+ test("basic usage") {
+ val topic = "topicbasic"
+ createTopic(topic)
+ val messages = Set("the", "quick", "brown", "fox")
+ sendMessages(topic, messages.toArray)
+
+
+ val kafkaParams = Map("metadata.broker.list" -> brokerAddress,
+ "group.id" -> s"test-consumer-${Random.nextInt(10000)}")
+
+ val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
+
+ val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
+ sc, kafkaParams, offsetRanges)
+
+ val received = rdd.map(_._2).collect.toSet
+ assert(received === messages)
+ }
+
+ test("iterator boundary conditions") {
+ // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
- sendMessages(topic, sent)
val kafkaParams = Map("metadata.broker.list" -> brokerAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}")
val kc = new KafkaCluster(kafkaParams)
- val rdd = getRdd(kc, Set(topic))
// this is the "lots of messages" case
- // make sure we get all of them
+ sendMessages(topic, sent)
+ // rdd defined from leaders after sending messages, should get the number sent
+ val rdd = getRdd(kc, Set(topic))
+
assert(rdd.isDefined)
- assert(rdd.get.count === sent.values.sum)
+ assert(rdd.get.count === sent.values.sum, "didn't get all sent messages")
- kc.setConsumerOffsets(
- kafkaParams("group.id"),
- rdd.get.offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap)
+ val ranges = rdd.get.asInstanceOf[HasOffsetRanges]
+ .offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap
+
+ kc.setConsumerOffsets(kafkaParams("group.id"), ranges)
- val rdd2 = getRdd(kc, Set(topic))
- val sent2 = Map("d" -> 1)
- sendMessages(topic, sent2)
// this is the "0 messages" case
- // make sure we dont get anything, since messages were sent after rdd was defined
+ val rdd2 = getRdd(kc, Set(topic))
+ // shouldn't get anything, since message is sent after rdd was defined
+ val sentOnlyOne = Map("d" -> 1)
+
+ sendMessages(topic, sentOnlyOne)
assert(rdd2.isDefined)
- assert(rdd2.get.count === 0)
+ assert(rdd2.get.count === 0, "got messages when there shouldn't be any")
+ // this is the "exactly 1 message" case, namely the single message from sentOnlyOne above
val rdd3 = getRdd(kc, Set(topic))
+ // send lots of messages after rdd was defined, they shouldn't show up
sendMessages(topic, Map("extra" -> 22))
- // this is the "exactly 1 message" case
- // make sure we get exactly one message, despite there being lots more available
+
assert(rdd3.isDefined)
- assert(rdd3.get.count === sent2.values.sum)
+ assert(rdd3.get.count === sentOnlyOne.values.sum, "didn't get exactly one message")
}
// get an rdd from the committed consumer offsets until the latest leader offsets,
private def getRdd(kc: KafkaCluster, topics: Set[String]) = {
val groupId = kc.kafkaParams("group.id")
- for {
- topicPartitions <- kc.getPartitions(topics).right.toOption
- from <- kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
+ def consumerOffsets(topicPartitions: Set[TopicAndPartition]) = {
+ kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs =>
offs.map(kv => kv._1 -> kv._2.offset)
}
)
- until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption
- } yield {
- KafkaRDD[String, String, StringDecoder, StringDecoder, String](
- sc, kc.kafkaParams, from, until, mmd => s"${mmd.offset} ${mmd.message}")
+ }
+ kc.getPartitions(topics).right.toOption.flatMap { topicPartitions =>
+ consumerOffsets(topicPartitions).flatMap { from =>
+ kc.getLatestLeaderOffsets(topicPartitions).right.toOption.map { until =>
+ val offsetRanges = from.map { case (tp: TopicAndPartition, fromOffset: Long) =>
+ OffsetRange(tp.topic, tp.partition, fromOffset, until(tp).offset)
+ }.toArray
+
+ val leaders = until.map { case (tp: TopicAndPartition, lo: KafkaCluster.LeaderOffset) =>
+ tp -> Broker(lo.host, lo.port)
+ }.toMap
+
+ KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, String](
+ sc, kc.kafkaParams, offsetRanges, leaders,
+ (mmd: MessageAndMetadata[String, String]) => s"${mmd.offset} ${mmd.message}")
+ }
+ }
}
}
}