aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java')
-rw-r--r--external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java15
1 files changed, 15 insertions, 0 deletions
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
index ac8d64b180..ba57b6beb2 100644
--- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka010;
import java.io.Serializable;
import java.util.*;
+import java.util.regex.Pattern;
import scala.collection.JavaConverters;
@@ -32,6 +33,7 @@ public class JavaConsumerStrategySuite implements Serializable {
@Test
public void testConsumerStrategyConstructors() {
final String topic1 = "topic1";
+ final Pattern pat = Pattern.compile("top.*");
final Collection<String> topics = Arrays.asList(topic1);
final scala.collection.Iterable<String> sTopics =
JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
@@ -69,6 +71,19 @@ public class JavaConsumerStrategySuite implements Serializable {
sub1.executorKafkaParams().get("bootstrap.servers"),
sub3.executorKafkaParams().get("bootstrap.servers"));
+ final ConsumerStrategy<String, String> psub1 =
+ ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams, sOffsets);
+ final ConsumerStrategy<String, String> psub2 =
+ ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams);
+ final ConsumerStrategy<String, String> psub3 =
+ ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams, offsets);
+ final ConsumerStrategy<String, String> psub4 =
+ ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams);
+
+ Assert.assertEquals(
+ psub1.executorKafkaParams().get("bootstrap.servers"),
+ psub3.executorKafkaParams().get("bootstrap.servers"));
+
final ConsumerStrategy<String, String> asn1 =
ConsumerStrategies.<String, String>Assign(sParts, sKafkaParams, sOffsets);
final ConsumerStrategy<String, String> asn2 =