aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2016-07-08 17:47:58 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-07-08 17:47:58 -0700
commitfd6e8f0e2269a2e7f24f79d5c2041816ea308c86 (patch)
tree09ce260a268c1016184ddb1a4faa8666a33f2338 /external
parent3b22291b5f0317609cd71ce7af78e4c5063d66e8 (diff)
downloadspark-fd6e8f0e2269a2e7f24f79d5c2041816ea308c86.tar.gz
spark-fd6e8f0e2269a2e7f24f79d5c2041816ea308c86.tar.bz2
spark-fd6e8f0e2269a2e7f24f79d5c2041816ea308c86.zip
[SPARK-13569][STREAMING][KAFKA] pattern based topic subscription
## What changes were proposed in this pull request? Allow for kafka topic subscriptions based on a regex pattern. ## How was this patch tested? Unit tests, manual tests Author: cody koeninger <cody@koeninger.org> Closes #14026 from koeninger/SPARK-13569.
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala178
-rw-r--r--external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java15
-rw-r--r--external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala74
3 files changed, 258 insertions, 9 deletions
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index 70c3f1a98d..60255fc655 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -22,10 +22,11 @@ import java.{ lang => jl, util => ju }
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer._
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition
import org.apache.spark.annotation.Experimental
-
+import org.apache.spark.internal.Logging
/**
* :: Experimental ::
@@ -47,7 +48,9 @@ abstract class ConsumerStrategy[K, V] {
/**
* Must return a fully configured Kafka Consumer, including subscribed or assigned topics.
+ * See <a href="http://kafka.apache.org/documentation.html#newconsumerapi">Kafka docs</a>.
* This consumer will be used on the driver to query for offsets only, not messages.
+ * The consumer must be returned in a state that it is safe to call poll(0) on.
* @param currentOffsets A map from TopicPartition to offset, indicating how far the driver
* has successfully read. Will be empty on initial start, possibly non-empty on restart from
* checkpoint.
@@ -72,15 +75,83 @@ private case class Subscribe[K, V](
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, jl.Long]
- ) extends ConsumerStrategy[K, V] {
+ ) extends ConsumerStrategy[K, V] with Logging {
def executorKafkaParams: ju.Map[String, Object] = kafkaParams
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.subscribe(topics)
- if (currentOffsets.isEmpty) {
- offsets.asScala.foreach { case (topicPartition, offset) =>
+ val toSeek = if (currentOffsets.isEmpty) {
+ offsets
+ } else {
+ currentOffsets
+ }
+ if (!toSeek.isEmpty) {
+ // work around KAFKA-3370 when reset is none
+ // poll will throw if no position, i.e. auto offset reset none and no explicit position
+ // but cant seek to a position before poll, because poll is what gets subscription partitions
+ // So, poll, suppress the first exception, then seek
+ val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
+ val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"
+ try {
+ consumer.poll(0)
+ } catch {
+ case x: NoOffsetForPartitionException if shouldSuppress =>
+ logWarning("Catching NoOffsetForPartitionException since " +
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370")
+ }
+ toSeek.asScala.foreach { case (topicPartition, offset) =>
+ consumer.seek(topicPartition, offset)
+ }
+ }
+
+ consumer
+ }
+}
+
+/**
+ * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
+ * The pattern matching will be done periodically against topics existing at the time of check.
+ * @param pattern pattern to subscribe to
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ * Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsets: offsets to begin at on initial startup. If no offset is given for a
+ * TopicPartition, the committed offset (if applicable) or kafka param
+ * auto.offset.reset will be used.
+ */
+private case class SubscribePattern[K, V](
+ pattern: ju.regex.Pattern,
+ kafkaParams: ju.Map[String, Object],
+ offsets: ju.Map[TopicPartition, jl.Long]
+ ) extends ConsumerStrategy[K, V] with Logging {
+
+ def executorKafkaParams: ju.Map[String, Object] = kafkaParams
+
+ def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
+ val consumer = new KafkaConsumer[K, V](kafkaParams)
+ consumer.subscribe(pattern, new NoOpConsumerRebalanceListener())
+ val toSeek = if (currentOffsets.isEmpty) {
+ offsets
+ } else {
+ currentOffsets
+ }
+ if (!toSeek.isEmpty) {
+ // work around KAFKA-3370 when reset is none, see explanation in Subscribe above
+ val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
+ val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"
+ try {
+ consumer.poll(0)
+ } catch {
+ case x: NoOffsetForPartitionException if shouldSuppress =>
+ logWarning("Catching NoOffsetForPartitionException since " +
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370")
+ }
+ toSeek.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
}
@@ -113,8 +184,14 @@ private case class Assign[K, V](
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.assign(topicPartitions)
- if (currentOffsets.isEmpty) {
- offsets.asScala.foreach { case (topicPartition, offset) =>
+ val toSeek = if (currentOffsets.isEmpty) {
+ offsets
+ } else {
+ currentOffsets
+ }
+ if (!toSeek.isEmpty) {
+ // this doesn't need a KAFKA-3370 workaround, because partitions are known, no poll needed
+ toSeek.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
}
@@ -215,6 +292,95 @@ object ConsumerStrategies {
new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]())
}
+ /** :: Experimental ::
+ * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
+ * The pattern matching will be done periodically against topics existing at the time of check.
+ * @param pattern pattern to subscribe to
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ * Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsets: offsets to begin at on initial startup. If no offset is given for a
+ * TopicPartition, the committed offset (if applicable) or kafka param
+ * auto.offset.reset will be used.
+ */
+ @Experimental
+ def SubscribePattern[K, V](
+ pattern: ju.regex.Pattern,
+ kafkaParams: collection.Map[String, Object],
+ offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
+ new SubscribePattern[K, V](
+ pattern,
+ new ju.HashMap[String, Object](kafkaParams.asJava),
+ new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
+ }
+
+ /** :: Experimental ::
+ * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
+ * The pattern matching will be done periodically against topics existing at the time of check.
+ * @param pattern pattern to subscribe to
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ * Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+ @Experimental
+ def SubscribePattern[K, V](
+ pattern: ju.regex.Pattern,
+ kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
+ new SubscribePattern[K, V](
+ pattern,
+ new ju.HashMap[String, Object](kafkaParams.asJava),
+ ju.Collections.emptyMap[TopicPartition, jl.Long]())
+ }
+
+ /** :: Experimental ::
+ * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
+ * The pattern matching will be done periodically against topics existing at the time of check.
+ * @param pattern pattern to subscribe to
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ * Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsets: offsets to begin at on initial startup. If no offset is given for a
+ * TopicPartition, the committed offset (if applicable) or kafka param
+ * auto.offset.reset will be used.
+ */
+ @Experimental
+ def SubscribePattern[K, V](
+ pattern: ju.regex.Pattern,
+ kafkaParams: ju.Map[String, Object],
+ offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
+ new SubscribePattern[K, V](pattern, kafkaParams, offsets)
+ }
+
+ /** :: Experimental ::
+ * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
+ * The pattern matching will be done periodically against topics existing at the time of check.
+ * @param pattern pattern to subscribe to
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ * Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+ @Experimental
+ def SubscribePattern[K, V](
+ pattern: ju.regex.Pattern,
+ kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
+ new SubscribePattern[K, V](
+ pattern,
+ kafkaParams,
+ ju.Collections.emptyMap[TopicPartition, jl.Long]())
+ }
+
/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
index ac8d64b180..ba57b6beb2 100644
--- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka010;
import java.io.Serializable;
import java.util.*;
+import java.util.regex.Pattern;
import scala.collection.JavaConverters;
@@ -32,6 +33,7 @@ public class JavaConsumerStrategySuite implements Serializable {
@Test
public void testConsumerStrategyConstructors() {
final String topic1 = "topic1";
+ final Pattern pat = Pattern.compile("top.*");
final Collection<String> topics = Arrays.asList(topic1);
final scala.collection.Iterable<String> sTopics =
JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
@@ -69,6 +71,19 @@ public class JavaConsumerStrategySuite implements Serializable {
sub1.executorKafkaParams().get("bootstrap.servers"),
sub3.executorKafkaParams().get("bootstrap.servers"));
+ final ConsumerStrategy<String, String> psub1 =
+ ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams, sOffsets);
+ final ConsumerStrategy<String, String> psub2 =
+ ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams);
+ final ConsumerStrategy<String, String> psub3 =
+ ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams, offsets);
+ final ConsumerStrategy<String, String> psub4 =
+ ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams);
+
+ Assert.assertEquals(
+ psub1.executorKafkaParams().get("bootstrap.servers"),
+ psub3.executorKafkaParams().get("bootstrap.servers"));
+
final ConsumerStrategy<String, String> asn1 =
ConsumerStrategies.<String, String>Assign(sParts, sKafkaParams, sOffsets);
final ConsumerStrategy<String, String> asn2 =
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index 0a53259802..c9e15bcba0 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -103,7 +103,9 @@ class DirectKafkaStreamSuite
kafkaTestUtils.createTopic(t)
kafkaTestUtils.sendMessages(t, data)
}
- val totalSent = data.values.sum * topics.size
+ val offsets = Map(new TopicPartition("basic3", 0) -> 2L)
+ // one topic is starting 2 messages later
+ val expectedTotal = (data.values.sum * topics.size) - 2
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
ssc = new StreamingContext(sparkConf, Milliseconds(200))
@@ -111,7 +113,7 @@ class DirectKafkaStreamSuite
KafkaUtils.createDirectStream[String, String](
ssc,
preferredHosts,
- ConsumerStrategies.Subscribe[String, String](topics, kafkaParams.asScala))
+ ConsumerStrategies.Subscribe[String, String](topics, kafkaParams.asScala, offsets))
}
val allReceived = new ConcurrentLinkedQueue[(String, String)]()
@@ -149,13 +151,78 @@ class DirectKafkaStreamSuite
}
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
- assert(allReceived.size === totalSent,
+ assert(allReceived.size === expectedTotal,
"didn't get expected number of messages, messages:\n" +
allReceived.asScala.mkString("\n"))
}
ssc.stop()
}
+ test("pattern based subscription") {
+ val topics = List("pat1", "pat2", "advanced3")
+ // Should match 2 out of 3 topics
+ val pat = """pat\d""".r.pattern
+ val data = Map("a" -> 7, "b" -> 9)
+ topics.foreach { t =>
+ kafkaTestUtils.createTopic(t)
+ kafkaTestUtils.sendMessages(t, data)
+ }
+ val offsets = Map(new TopicPartition("pat2", 0) -> 3L)
+ // 2 matching topics, one of which starts 3 messages later
+ val expectedTotal = (data.values.sum * 2) - 3
+ val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String](
+ ssc,
+ preferredHosts,
+ ConsumerStrategies.SubscribePattern[String, String](pat, kafkaParams.asScala, offsets))
+ }
+ val allReceived = new ConcurrentLinkedQueue[(String, String)]()
+
+ // hold a reference to the current offset ranges, so it can be used downstream
+ var offsetRanges = Array[OffsetRange]()
+ val tf = stream.transform { rdd =>
+ // Get the offset ranges in the RDD
+ offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ rdd.map(r => (r.key, r.value))
+ }
+
+ tf.foreachRDD { rdd =>
+ for (o <- offsetRanges) {
+ logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
+ }
+ val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
+ // For each partition, get size of the range in the partition,
+ // and the number of items in the partition
+ val off = offsetRanges(i)
+ val all = iter.toSeq
+ val partSize = all.size
+ val rangeSize = off.untilOffset - off.fromOffset
+ Iterator((partSize, rangeSize))
+ }.collect
+
+ // Verify whether number of elements in each partition
+ // matches with the corresponding offset range
+ collected.foreach { case (partSize, rangeSize) =>
+ assert(partSize === rangeSize, "offset ranges are wrong")
+ }
+ }
+
+ stream.foreachRDD { rdd =>
+ allReceived.addAll(Arrays.asList(rdd.map(r => (r.key, r.value)).collect(): _*))
+ }
+ ssc.start()
+ eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+ assert(allReceived.size === expectedTotal,
+ "didn't get expected number of messages, messages:\n" +
+ allReceived.asScala.mkString("\n"))
+ }
+ ssc.stop()
+ }
+
+
test("receiving from largest starting offset") {
val topic = "latest"
val topicPartition = new TopicPartition(topic, 0)
@@ -228,6 +295,7 @@ class DirectKafkaStreamSuite
kc.close()
// Setup context and kafka stream with largest offset
+ kafkaParams.put("auto.offset.reset", "none")
ssc = new StreamingContext(sparkConf, Milliseconds(200))
val stream = withClue("Error creating direct stream") {
val s = new DirectKafkaInputDStream[String, String](