diff options
Diffstat (limited to 'external/kafka-0-10/src/test/java')
-rw-r--r-- | external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java | 15 |
1 files changed, 15 insertions, 0 deletions
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java index ac8d64b180..ba57b6beb2 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java @@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka010; import java.io.Serializable; import java.util.*; +import java.util.regex.Pattern; import scala.collection.JavaConverters; @@ -32,6 +33,7 @@ public class JavaConsumerStrategySuite implements Serializable { @Test public void testConsumerStrategyConstructors() { final String topic1 = "topic1"; + final Pattern pat = Pattern.compile("top.*"); final Collection<String> topics = Arrays.asList(topic1); final scala.collection.Iterable<String> sTopics = JavaConverters.collectionAsScalaIterableConverter(topics).asScala(); @@ -69,6 +71,19 @@ public class JavaConsumerStrategySuite implements Serializable { sub1.executorKafkaParams().get("bootstrap.servers"), sub3.executorKafkaParams().get("bootstrap.servers")); + final ConsumerStrategy<String, String> psub1 = + ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams, sOffsets); + final ConsumerStrategy<String, String> psub2 = + ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams); + final ConsumerStrategy<String, String> psub3 = + ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams, offsets); + final ConsumerStrategy<String, String> psub4 = + ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams); + + Assert.assertEquals( + psub1.executorKafkaParams().get("bootstrap.servers"), + psub3.executorKafkaParams().get("bootstrap.servers")); + final ConsumerStrategy<String, String> asn1 = ConsumerStrategies.<String, String>Assign(sParts, sKafkaParams, sOffsets); final ConsumerStrategy<String, String> asn2 = |