aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10/src/test')
-rw-r--r--external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java15
-rw-r--r--external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala74
2 files changed, 86 insertions, 3 deletions
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
index ac8d64b180..ba57b6beb2 100644
--- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka010;
import java.io.Serializable;
import java.util.*;
+import java.util.regex.Pattern;
import scala.collection.JavaConverters;
@@ -32,6 +33,7 @@ public class JavaConsumerStrategySuite implements Serializable {
@Test
public void testConsumerStrategyConstructors() {
final String topic1 = "topic1";
+ final Pattern pat = Pattern.compile("top.*");
final Collection<String> topics = Arrays.asList(topic1);
final scala.collection.Iterable<String> sTopics =
JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
@@ -69,6 +71,19 @@ public class JavaConsumerStrategySuite implements Serializable {
sub1.executorKafkaParams().get("bootstrap.servers"),
sub3.executorKafkaParams().get("bootstrap.servers"));
+ final ConsumerStrategy<String, String> psub1 =
+ ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams, sOffsets);
+ final ConsumerStrategy<String, String> psub2 =
+ ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams);
+ final ConsumerStrategy<String, String> psub3 =
+ ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams, offsets);
+ final ConsumerStrategy<String, String> psub4 =
+ ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams);
+
+ Assert.assertEquals(
+ psub1.executorKafkaParams().get("bootstrap.servers"),
+ psub3.executorKafkaParams().get("bootstrap.servers"));
+
final ConsumerStrategy<String, String> asn1 =
ConsumerStrategies.<String, String>Assign(sParts, sKafkaParams, sOffsets);
final ConsumerStrategy<String, String> asn2 =
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index 0a53259802..c9e15bcba0 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -103,7 +103,9 @@ class DirectKafkaStreamSuite
kafkaTestUtils.createTopic(t)
kafkaTestUtils.sendMessages(t, data)
}
- val totalSent = data.values.sum * topics.size
+ val offsets = Map(new TopicPartition("basic3", 0) -> 2L)
+ // one topic is starting 2 messages later
+ val expectedTotal = (data.values.sum * topics.size) - 2
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
ssc = new StreamingContext(sparkConf, Milliseconds(200))
@@ -111,7 +113,7 @@ class DirectKafkaStreamSuite
KafkaUtils.createDirectStream[String, String](
ssc,
preferredHosts,
- ConsumerStrategies.Subscribe[String, String](topics, kafkaParams.asScala))
+ ConsumerStrategies.Subscribe[String, String](topics, kafkaParams.asScala, offsets))
}
val allReceived = new ConcurrentLinkedQueue[(String, String)]()
@@ -149,13 +151,78 @@ class DirectKafkaStreamSuite
}
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
- assert(allReceived.size === totalSent,
+ assert(allReceived.size === expectedTotal,
"didn't get expected number of messages, messages:\n" +
allReceived.asScala.mkString("\n"))
}
ssc.stop()
}
+ test("pattern based subscription") {
+ val topics = List("pat1", "pat2", "advanced3")
+ // Should match 2 out of 3 topics
+ val pat = """pat\d""".r.pattern
+ val data = Map("a" -> 7, "b" -> 9)
+ topics.foreach { t =>
+ kafkaTestUtils.createTopic(t)
+ kafkaTestUtils.sendMessages(t, data)
+ }
+ val offsets = Map(new TopicPartition("pat2", 0) -> 3L)
+ // 2 matching topics, one of which starts 3 messages later
+ val expectedTotal = (data.values.sum * 2) - 3
+ val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String](
+ ssc,
+ preferredHosts,
+ ConsumerStrategies.SubscribePattern[String, String](pat, kafkaParams.asScala, offsets))
+ }
+ val allReceived = new ConcurrentLinkedQueue[(String, String)]()
+
+ // hold a reference to the current offset ranges, so it can be used downstream
+ var offsetRanges = Array[OffsetRange]()
+ val tf = stream.transform { rdd =>
+ // Get the offset ranges in the RDD
+ offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ rdd.map(r => (r.key, r.value))
+ }
+
+ tf.foreachRDD { rdd =>
+ for (o <- offsetRanges) {
+ logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
+ }
+ val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
+ // For each partition, get size of the range in the partition,
+ // and the number of items in the partition
+ val off = offsetRanges(i)
+ val all = iter.toSeq
+ val partSize = all.size
+ val rangeSize = off.untilOffset - off.fromOffset
+ Iterator((partSize, rangeSize))
+ }.collect
+
+ // Verify whether number of elements in each partition
+ // matches with the corresponding offset range
+ collected.foreach { case (partSize, rangeSize) =>
+ assert(partSize === rangeSize, "offset ranges are wrong")
+ }
+ }
+
+ stream.foreachRDD { rdd =>
+ allReceived.addAll(Arrays.asList(rdd.map(r => (r.key, r.value)).collect(): _*))
+ }
+ ssc.start()
+ eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+ assert(allReceived.size === expectedTotal,
+ "didn't get expected number of messages, messages:\n" +
+ allReceived.asScala.mkString("\n"))
+ }
+ ssc.stop()
+ }
+
+
test("receiving from largest starting offset") {
val topic = "latest"
val topicPartition = new TopicPartition(topic, 0)
@@ -228,6 +295,7 @@ class DirectKafkaStreamSuite
kc.close()
// Setup context and kafka stream with largest offset
+ kafkaParams.put("auto.offset.reset", "none")
ssc = new StreamingContext(sparkConf, Milliseconds(200))
val stream = withClue("Error creating direct stream") {
val s = new DirectKafkaInputDStream[String, String](