aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10/src/test/java/org/apache
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2016-07-08 17:47:58 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-07-08 17:47:58 -0700
commitfd6e8f0e2269a2e7f24f79d5c2041816ea308c86 (patch)
tree09ce260a268c1016184ddb1a4faa8666a33f2338 /external/kafka-0-10/src/test/java/org/apache
parent3b22291b5f0317609cd71ce7af78e4c5063d66e8 (diff)
downloadspark-fd6e8f0e2269a2e7f24f79d5c2041816ea308c86.tar.gz
spark-fd6e8f0e2269a2e7f24f79d5c2041816ea308c86.tar.bz2
spark-fd6e8f0e2269a2e7f24f79d5c2041816ea308c86.zip
[SPARK-13569][STREAMING][KAFKA] pattern based topic subscription
## What changes were proposed in this pull request? Allow for kafka topic subscriptions based on a regex pattern. ## How was this patch tested? Unit tests, manual tests Author: cody koeninger <cody@koeninger.org> Closes #14026 from koeninger/SPARK-13569.
Diffstat (limited to 'external/kafka-0-10/src/test/java/org/apache')
-rw-r--r--external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java15
1 files changed, 15 insertions, 0 deletions
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
index ac8d64b180..ba57b6beb2 100644
--- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka010;
import java.io.Serializable;
import java.util.*;
+import java.util.regex.Pattern;
import scala.collection.JavaConverters;
@@ -32,6 +33,7 @@ public class JavaConsumerStrategySuite implements Serializable {
@Test
public void testConsumerStrategyConstructors() {
final String topic1 = "topic1";
+ final Pattern pat = Pattern.compile("top.*");
final Collection<String> topics = Arrays.asList(topic1);
final scala.collection.Iterable<String> sTopics =
JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
@@ -69,6 +71,19 @@ public class JavaConsumerStrategySuite implements Serializable {
sub1.executorKafkaParams().get("bootstrap.servers"),
sub3.executorKafkaParams().get("bootstrap.servers"));
+ final ConsumerStrategy<String, String> psub1 =
+ ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams, sOffsets);
+ final ConsumerStrategy<String, String> psub2 =
+ ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams);
+ final ConsumerStrategy<String, String> psub3 =
+ ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams, offsets);
+ final ConsumerStrategy<String, String> psub4 =
+ ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams);
+
+ Assert.assertEquals(
+ psub1.executorKafkaParams().get("bootstrap.servers"),
+ psub3.executorKafkaParams().get("bootstrap.servers"));
+
final ConsumerStrategy<String, String> asn1 =
ConsumerStrategies.<String, String>Assign(sParts, sKafkaParams, sOffsets);
final ConsumerStrategy<String, String> asn2 =