From fd6e8f0e2269a2e7f24f79d5c2041816ea308c86 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Fri, 8 Jul 2016 17:47:58 -0700 Subject: [SPARK-13569][STREAMING][KAFKA] pattern based topic subscription ## What changes were proposed in this pull request? Allow for kafka topic subscriptions based on a regex pattern. ## How was this patch tested? Unit tests, manual tests Author: cody koeninger Closes #14026 from koeninger/SPARK-13569. --- .../streaming/kafka010/JavaConsumerStrategySuite.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'external/kafka-0-10/src/test/java/org/apache') diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java index ac8d64b180..ba57b6beb2 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java @@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka010; import java.io.Serializable; import java.util.*; +import java.util.regex.Pattern; import scala.collection.JavaConverters; @@ -32,6 +33,7 @@ public class JavaConsumerStrategySuite implements Serializable { @Test public void testConsumerStrategyConstructors() { final String topic1 = "topic1"; + final Pattern pat = Pattern.compile("top.*"); final Collection topics = Arrays.asList(topic1); final scala.collection.Iterable sTopics = JavaConverters.collectionAsScalaIterableConverter(topics).asScala(); @@ -69,6 +71,19 @@ public class JavaConsumerStrategySuite implements Serializable { sub1.executorKafkaParams().get("bootstrap.servers"), sub3.executorKafkaParams().get("bootstrap.servers")); + final ConsumerStrategy psub1 = + ConsumerStrategies.SubscribePattern(pat, sKafkaParams, sOffsets); + final ConsumerStrategy psub2 = + ConsumerStrategies.SubscribePattern(pat, sKafkaParams); + final ConsumerStrategy psub3 = + ConsumerStrategies.SubscribePattern(pat, kafkaParams, offsets); + final ConsumerStrategy psub4 = + ConsumerStrategies.SubscribePattern(pat, kafkaParams); + + Assert.assertEquals( + psub1.executorKafkaParams().get("bootstrap.servers"), + psub3.executorKafkaParams().get("bootstrap.servers")); + final ConsumerStrategy asn1 = ConsumerStrategies.Assign(sParts, sKafkaParams, sOffsets); final ConsumerStrategy asn2 = -- cgit v1.2.3