From fd6e8f0e2269a2e7f24f79d5c2041816ea308c86 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Fri, 8 Jul 2016 17:47:58 -0700 Subject: [SPARK-13569][STREAMING][KAFKA] pattern based topic subscription ## What changes were proposed in this pull request? Allow for kafka topic subscriptions based on a regex pattern. ## How was this patch tested? Unit tests, manual tests Author: cody koeninger Closes #14026 from koeninger/SPARK-13569. --- .../streaming/kafka010/ConsumerStrategy.scala | 178 ++++++++++++++++++++- .../kafka010/JavaConsumerStrategySuite.java | 15 ++ .../kafka010/DirectKafkaStreamSuite.scala | 74 ++++++++- 3 files changed, 258 insertions(+), 9 deletions(-) (limited to 'external') diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index 70c3f1a98d..60255fc655 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -22,10 +22,11 @@ import java.{ lang => jl, util => ju } import scala.collection.JavaConverters._ import org.apache.kafka.clients.consumer._ +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.common.TopicPartition import org.apache.spark.annotation.Experimental - +import org.apache.spark.internal.Logging /** * :: Experimental :: @@ -47,7 +48,9 @@ abstract class ConsumerStrategy[K, V] { /** * Must return a fully configured Kafka Consumer, including subscribed or assigned topics. + * See Kafka docs. * This consumer will be used on the driver to query for offsets only, not messages. + * The consumer must be returned in a state that it is safe to call poll(0) on. * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver * has successfully read. Will be empty on initial start, possibly non-empty on restart from * checkpoint. @@ -72,15 +75,83 @@ private case class Subscribe[K, V]( topics: ju.Collection[jl.String], kafkaParams: ju.Map[String, Object], offsets: ju.Map[TopicPartition, jl.Long] - ) extends ConsumerStrategy[K, V] { + ) extends ConsumerStrategy[K, V] with Logging { def executorKafkaParams: ju.Map[String, Object] = kafkaParams def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = { val consumer = new KafkaConsumer[K, V](kafkaParams) consumer.subscribe(topics) - if (currentOffsets.isEmpty) { - offsets.asScala.foreach { case (topicPartition, offset) => + val toSeek = if (currentOffsets.isEmpty) { + offsets + } else { + currentOffsets + } + if (!toSeek.isEmpty) { + // work around KAFKA-3370 when reset is none + // poll will throw if no position, i.e. auto offset reset none and no explicit position + // but cant seek to a position before poll, because poll is what gets subscription partitions + // So, poll, suppress the first exception, then seek + val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) + val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE" + try { + consumer.poll(0) + } catch { + case x: NoOffsetForPartitionException if shouldSuppress => + logWarning("Catching NoOffsetForPartitionException since " + + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370") + } + toSeek.asScala.foreach { case (topicPartition, offset) => + consumer.seek(topicPartition, offset) + } + } + + consumer + } +} + +/** + * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. + * The pattern matching will be done periodically against topics existing at the time of check. + * @param pattern pattern to subscribe to + * @param kafkaParams Kafka + * + * configuration parameters to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsets: offsets to begin at on initial startup. If no offset is given for a + * TopicPartition, the committed offset (if applicable) or kafka param + * auto.offset.reset will be used. + */ +private case class SubscribePattern[K, V]( + pattern: ju.regex.Pattern, + kafkaParams: ju.Map[String, Object], + offsets: ju.Map[TopicPartition, jl.Long] + ) extends ConsumerStrategy[K, V] with Logging { + + def executorKafkaParams: ju.Map[String, Object] = kafkaParams + + def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = { + val consumer = new KafkaConsumer[K, V](kafkaParams) + consumer.subscribe(pattern, new NoOpConsumerRebalanceListener()) + val toSeek = if (currentOffsets.isEmpty) { + offsets + } else { + currentOffsets + } + if (!toSeek.isEmpty) { + // work around KAFKA-3370 when reset is none, see explanation in Subscribe above + val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) + val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE" + try { + consumer.poll(0) + } catch { + case x: NoOffsetForPartitionException if shouldSuppress => + logWarning("Catching NoOffsetForPartitionException since " + + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370") + } + toSeek.asScala.foreach { case (topicPartition, offset) => consumer.seek(topicPartition, offset) } } @@ -113,8 +184,14 @@ private case class Assign[K, V]( def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = { val consumer = new KafkaConsumer[K, V](kafkaParams) consumer.assign(topicPartitions) - if (currentOffsets.isEmpty) { - offsets.asScala.foreach { case (topicPartition, offset) => + val toSeek = if (currentOffsets.isEmpty) { + offsets + } else { + currentOffsets + } + if (!toSeek.isEmpty) { + // this doesn't need a KAFKA-3370 workaround, because partitions are known, no poll needed + toSeek.asScala.foreach { case (topicPartition, offset) => consumer.seek(topicPartition, offset) } } @@ -215,6 +292,95 @@ object ConsumerStrategies { new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]()) } + /** :: Experimental :: + * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. + * The pattern matching will be done periodically against topics existing at the time of check. + * @param pattern pattern to subscribe to + * @param kafkaParams Kafka + * + * configuration parameters to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsets: offsets to begin at on initial startup. If no offset is given for a + * TopicPartition, the committed offset (if applicable) or kafka param + * auto.offset.reset will be used. + */ + @Experimental + def SubscribePattern[K, V]( + pattern: ju.regex.Pattern, + kafkaParams: collection.Map[String, Object], + offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { + new SubscribePattern[K, V]( + pattern, + new ju.HashMap[String, Object](kafkaParams.asJava), + new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava)) + } + + /** :: Experimental :: + * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. + * The pattern matching will be done periodically against topics existing at the time of check. + * @param pattern pattern to subscribe to + * @param kafkaParams Kafka + * + * configuration parameters to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + @Experimental + def SubscribePattern[K, V]( + pattern: ju.regex.Pattern, + kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = { + new SubscribePattern[K, V]( + pattern, + new ju.HashMap[String, Object](kafkaParams.asJava), + ju.Collections.emptyMap[TopicPartition, jl.Long]()) + } + + /** :: Experimental :: + * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. + * The pattern matching will be done periodically against topics existing at the time of check. + * @param pattern pattern to subscribe to + * @param kafkaParams Kafka + * + * configuration parameters to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsets: offsets to begin at on initial startup. If no offset is given for a + * TopicPartition, the committed offset (if applicable) or kafka param + * auto.offset.reset will be used. + */ + @Experimental + def SubscribePattern[K, V]( + pattern: ju.regex.Pattern, + kafkaParams: ju.Map[String, Object], + offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = { + new SubscribePattern[K, V](pattern, kafkaParams, offsets) + } + + /** :: Experimental :: + * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. + * The pattern matching will be done periodically against topics existing at the time of check. + * @param pattern pattern to subscribe to + * @param kafkaParams Kafka + * + * configuration parameters to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + @Experimental + def SubscribePattern[K, V]( + pattern: ju.regex.Pattern, + kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = { + new SubscribePattern[K, V]( + pattern, + kafkaParams, + ju.Collections.emptyMap[TopicPartition, jl.Long]()) + } + /** * :: Experimental :: * Assign a fixed collection of TopicPartitions diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java index ac8d64b180..ba57b6beb2 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java @@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka010; import java.io.Serializable; import java.util.*; +import java.util.regex.Pattern; import scala.collection.JavaConverters; @@ -32,6 +33,7 @@ public class JavaConsumerStrategySuite implements Serializable { @Test public void testConsumerStrategyConstructors() { final String topic1 = "topic1"; + final Pattern pat = Pattern.compile("top.*"); final Collection topics = Arrays.asList(topic1); final scala.collection.Iterable sTopics = JavaConverters.collectionAsScalaIterableConverter(topics).asScala(); @@ -69,6 +71,19 @@ public class JavaConsumerStrategySuite implements Serializable { sub1.executorKafkaParams().get("bootstrap.servers"), sub3.executorKafkaParams().get("bootstrap.servers")); + final ConsumerStrategy psub1 = + ConsumerStrategies.SubscribePattern(pat, sKafkaParams, sOffsets); + final ConsumerStrategy psub2 = + ConsumerStrategies.SubscribePattern(pat, sKafkaParams); + final ConsumerStrategy psub3 = + ConsumerStrategies.SubscribePattern(pat, kafkaParams, offsets); + final ConsumerStrategy psub4 = + ConsumerStrategies.SubscribePattern(pat, kafkaParams); + + Assert.assertEquals( + psub1.executorKafkaParams().get("bootstrap.servers"), + psub3.executorKafkaParams().get("bootstrap.servers")); + final ConsumerStrategy asn1 = ConsumerStrategies.Assign(sParts, sKafkaParams, sOffsets); final ConsumerStrategy asn2 = diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 0a53259802..c9e15bcba0 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -103,7 +103,9 @@ class DirectKafkaStreamSuite kafkaTestUtils.createTopic(t) kafkaTestUtils.sendMessages(t, data) } - val totalSent = data.values.sum * topics.size + val offsets = Map(new TopicPartition("basic3", 0) -> 2L) + // one topic is starting 2 messages later + val expectedTotal = (data.values.sum * topics.size) - 2 val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest") ssc = new StreamingContext(sparkConf, Milliseconds(200)) @@ -111,7 +113,7 @@ class DirectKafkaStreamSuite KafkaUtils.createDirectStream[String, String]( ssc, preferredHosts, - ConsumerStrategies.Subscribe[String, String](topics, kafkaParams.asScala)) + ConsumerStrategies.Subscribe[String, String](topics, kafkaParams.asScala, offsets)) } val allReceived = new ConcurrentLinkedQueue[(String, String)]() @@ -149,13 +151,78 @@ class DirectKafkaStreamSuite } ssc.start() eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { - assert(allReceived.size === totalSent, + assert(allReceived.size === expectedTotal, "didn't get expected number of messages, messages:\n" + allReceived.asScala.mkString("\n")) } ssc.stop() } + test("pattern based subscription") { + val topics = List("pat1", "pat2", "advanced3") + // Should match 2 out of 3 topics + val pat = """pat\d""".r.pattern + val data = Map("a" -> 7, "b" -> 9) + topics.foreach { t => + kafkaTestUtils.createTopic(t) + kafkaTestUtils.sendMessages(t, data) + } + val offsets = Map(new TopicPartition("pat2", 0) -> 3L) + // 2 matching topics, one of which starts 3 messages later + val expectedTotal = (data.values.sum * 2) - 3 + val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest") + + ssc = new StreamingContext(sparkConf, Milliseconds(200)) + val stream = withClue("Error creating direct stream") { + KafkaUtils.createDirectStream[String, String]( + ssc, + preferredHosts, + ConsumerStrategies.SubscribePattern[String, String](pat, kafkaParams.asScala, offsets)) + } + val allReceived = new ConcurrentLinkedQueue[(String, String)]() + + // hold a reference to the current offset ranges, so it can be used downstream + var offsetRanges = Array[OffsetRange]() + val tf = stream.transform { rdd => + // Get the offset ranges in the RDD + offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + rdd.map(r => (r.key, r.value)) + } + + tf.foreachRDD { rdd => + for (o <- offsetRanges) { + logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") + } + val collected = rdd.mapPartitionsWithIndex { (i, iter) => + // For each partition, get size of the range in the partition, + // and the number of items in the partition + val off = offsetRanges(i) + val all = iter.toSeq + val partSize = all.size + val rangeSize = off.untilOffset - off.fromOffset + Iterator((partSize, rangeSize)) + }.collect + + // Verify whether number of elements in each partition + // matches with the corresponding offset range + collected.foreach { case (partSize, rangeSize) => + assert(partSize === rangeSize, "offset ranges are wrong") + } + } + + stream.foreachRDD { rdd => + allReceived.addAll(Arrays.asList(rdd.map(r => (r.key, r.value)).collect(): _*)) + } + ssc.start() + eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { + assert(allReceived.size === expectedTotal, + "didn't get expected number of messages, messages:\n" + + allReceived.asScala.mkString("\n")) + } + ssc.stop() + } + + test("receiving from largest starting offset") { val topic = "latest" val topicPartition = new TopicPartition(topic, 0) @@ -228,6 +295,7 @@ class DirectKafkaStreamSuite kc.close() // Setup context and kafka stream with largest offset + kafkaParams.put("auto.offset.reset", "none") ssc = new StreamingContext(sparkConf, Milliseconds(200)) val stream = withClue("Error creating direct stream") { val s = new DirectKafkaInputDStream[String, String]( -- cgit v1.2.3