From 268ccb9a48dfefc4d7bc85155e7e20a2dfe89307 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Fri, 21 Oct 2016 15:55:04 -0700 Subject: [SPARK-17812][SQL][KAFKA] Assign and specific startingOffsets for structured stream ## What changes were proposed in this pull request? startingOffsets takes specific per-topicpartition offsets as a json argument, usable with any consumer strategy assign with specific topicpartitions as a consumer strategy ## How was this patch tested? Unit tests Author: cody koeninger Closes #15504 from koeninger/SPARK-17812. --- .../org/apache/spark/sql/kafka010/JsonUtils.scala | 93 +++++++++++++++++ .../apache/spark/sql/kafka010/KafkaSource.scala | 64 ++++++++++-- .../spark/sql/kafka010/KafkaSourceProvider.scala | 52 +++++----- .../spark/sql/kafka010/StartingOffsets.scala | 32 ++++++ .../apache/spark/sql/kafka010/JsonUtilsSuite.scala | 45 ++++++++ .../spark/sql/kafka010/KafkaSourceSuite.scala | 114 +++++++++++++++++++-- .../apache/spark/sql/kafka010/KafkaTestUtils.scala | 14 ++- 7 files changed, 369 insertions(+), 45 deletions(-) create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/StartingOffsets.scala create mode 100644 external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/JsonUtilsSuite.scala (limited to 'external/kafka-0-10-sql') diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala new file mode 100644 index 0000000000..40d568a12c --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io.Writer + +import scala.collection.mutable.HashMap +import scala.util.control.NonFatal + +import org.apache.kafka.common.TopicPartition +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +/** + * Utilities for converting Kafka related objects to and from json. + */ +private object JsonUtils { + private implicit val formats = Serialization.formats(NoTypeHints) + + /** + * Read TopicPartitions from json string + */ + def partitions(str: String): Array[TopicPartition] = { + try { + Serialization.read[Map[String, Seq[Int]]](str).flatMap { case (topic, parts) => + parts.map { part => + new TopicPartition(topic, part) + } + }.toArray + } catch { + case NonFatal(x) => + throw new IllegalArgumentException( + s"""Expected e.g. {"topicA":[0,1],"topicB":[0,1]}, got $str""") + } + } + + /** + * Write TopicPartitions as json string + */ + def partitions(partitions: Iterable[TopicPartition]): String = { + val result = new HashMap[String, List[Int]] + partitions.foreach { tp => + val parts: List[Int] = result.getOrElse(tp.topic, Nil) + result += tp.topic -> (tp.partition::parts) + } + Serialization.write(result) + } + + /** + * Read per-TopicPartition offsets from json string + */ + def partitionOffsets(str: String): Map[TopicPartition, Long] = { + try { + Serialization.read[Map[String, Map[Int, Long]]](str).flatMap { case (topic, partOffsets) => + partOffsets.map { case (part, offset) => + new TopicPartition(topic, part) -> offset + } + }.toMap + } catch { + case NonFatal(x) => + throw new IllegalArgumentException( + s"""Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got $str""") + } + } + + /** + * Write per-TopicPartition offsets as json string + */ + def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = { + val result = new HashMap[String, HashMap[Int, Long]]() + partitionOffsets.foreach { case (tp, off) => + val parts = result.getOrElse(tp.topic, new HashMap[Int, Long]) + parts += tp.partition -> off + result += tp.topic -> parts + } + Serialization.write(result) + } +} diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 4b0bb0a0f7..537b7b0baa 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -22,7 +22,7 @@ import java.{util => ju} import scala.collection.JavaConverters._ import scala.util.control.NonFatal -import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer} +import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer, OffsetOutOfRangeException} import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.common.TopicPartition @@ -82,7 +82,7 @@ private[kafka010] case class KafkaSource( executorKafkaParams: ju.Map[String, Object], sourceOptions: Map[String, String], metadataPath: String, - startFromEarliestOffset: Boolean, + startingOffsets: StartingOffsets, failOnDataLoss: Boolean) extends Source with Logging { @@ -110,10 +110,10 @@ private[kafka010] case class KafkaSource( private lazy val initialPartitionOffsets = { val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) metadataLog.get(0).getOrElse { - val offsets = if (startFromEarliestOffset) { - KafkaSourceOffset(fetchEarliestOffsets()) - } else { - KafkaSourceOffset(fetchLatestOffsets()) + val offsets = startingOffsets match { + case EarliestOffsets => KafkaSourceOffset(fetchEarliestOffsets()) + case LatestOffsets => KafkaSourceOffset(fetchLatestOffsets()) + case SpecificOffsets(p) => KafkaSourceOffset(fetchSpecificStartingOffsets(p)) } metadataLog.add(0, offsets) logInfo(s"Initial offsets: $offsets") @@ -231,6 +231,43 @@ private[kafka010] case class KafkaSource( override def toString(): String = s"KafkaSource[$consumerStrategy]" + /** + * Set consumer position to specified offsets, making sure all assignments are set. + */ + private def fetchSpecificStartingOffsets( + partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { + val result = withRetriesWithoutInterrupt { + // Poll to get the latest assigned partitions + consumer.poll(0) + val partitions = consumer.assignment() + consumer.pause(partitions) + assert(partitions.asScala == partitionOffsets.keySet, + "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + + "Use -1 for latest, -2 for earliest, if you don't care.\n" + + s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") + + partitionOffsets.foreach { + case (tp, -1) => consumer.seekToEnd(ju.Arrays.asList(tp)) + case (tp, -2) => consumer.seekToBeginning(ju.Arrays.asList(tp)) + case (tp, off) => consumer.seek(tp, off) + } + partitionOffsets.map { + case (tp, _) => tp -> consumer.position(tp) + } + } + partitionOffsets.foreach { + case (tp, off) if off != -1 && off != -2 => + if (result(tp) != off) { + reportDataLoss( + s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") + } + case _ => + // no real way to check that beginning or end is reasonable + } + result + } + /** * Fetch the earliest offsets of partitions. */ @@ -273,7 +310,7 @@ private[kafka010] case class KafkaSource( consumer.poll(0) val partitions = consumer.assignment() consumer.pause(partitions) - logDebug(s"\tPartitioned assigned to consumer: $partitions") + logDebug(s"\tPartitions assigned to consumer: $partitions") // Get the earliest offset of each partition consumer.seekToBeginning(partitions) @@ -317,6 +354,8 @@ private[kafka010] case class KafkaSource( try { result = Some(body) } catch { + case x: OffsetOutOfRangeException => + reportDataLoss(x.getMessage) case NonFatal(e) => lastException = e logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e) @@ -373,6 +412,17 @@ private[kafka010] object KafkaSource { def createConsumer(): Consumer[Array[Byte], Array[Byte]] } + case class AssignStrategy(partitions: Array[TopicPartition], kafkaParams: ju.Map[String, Object]) + extends ConsumerStrategy { + override def createConsumer(): Consumer[Array[Byte], Array[Byte]] = { + val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams) + consumer.assign(ju.Arrays.asList(partitions: _*)) + consumer + } + + override def toString: String = s"Assign[${partitions.mkString(", ")}]" + } + case class SubscribeStrategy(topics: Seq[String], kafkaParams: ju.Map[String, Object]) extends ConsumerStrategy { override def createConsumer(): Consumer[Array[Byte], Array[Byte]] = { diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 23b1b60f3b..585ced875c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -77,14 +77,12 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider // id. Hence, we should generate a unique id for each query. val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" - val startFromEarliestOffset = - caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY).map(_.trim.toLowerCase) match { - case Some("latest") => false - case Some("earliest") => true - case Some(pos) => - // This should not happen since we have already checked the options. - throw new IllegalStateException(s"Invalid $STARTING_OFFSET_OPTION_KEY: $pos") - case None => false + val startingOffsets = + caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { + case Some("latest") => LatestOffsets + case Some("earliest") => EarliestOffsets + case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json)) + case None => LatestOffsets } val kafkaParamsForStrategy = @@ -95,9 +93,9 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider // So that consumers in Kafka source do not mess with any existing group id .set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-driver") - // Set to "latest" to avoid exceptions. However, KafkaSource will fetch the initial offsets - // by itself instead of counting on KafkaConsumer. - .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") + // Set to "earliest" to avoid exceptions. However, KafkaSource will fetch the initial + // offsets by itself instead of counting on KafkaConsumer. + .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // So that consumers in the driver does not commit offsets unnecessarily .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") @@ -130,6 +128,10 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider .build() val strategy = caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match { + case ("assign", value) => + AssignStrategy( + JsonUtils.partitions(value), + kafkaParamsForStrategy) case ("subscribe", value) => SubscribeStrategy( value.split(",").map(_.trim()).filter(_.nonEmpty), @@ -153,7 +155,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider kafkaParamsForExecutors, parameters, metadataPath, - startFromEarliestOffset, + startingOffsets, failOnDataLoss) } @@ -175,6 +177,13 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider } val strategy = caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match { + case ("assign", value) => + if (!value.trim.startsWith("{")) { + throw new IllegalArgumentException( + "No topicpartitions to assign as specified value for option " + + s"'assign' is '$value'") + } + case ("subscribe", value) => val topics = value.split(",").map(_.trim).filter(_.nonEmpty) if (topics.isEmpty) { @@ -195,14 +204,6 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider throw new IllegalArgumentException("Unknown option") } - caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY) match { - case Some(pos) if !STARTING_OFFSET_OPTION_VALUES.contains(pos.trim.toLowerCase) => - throw new IllegalArgumentException( - s"Illegal value '$pos' for option '$STARTING_OFFSET_OPTION_KEY', " + - s"acceptable values are: ${STARTING_OFFSET_OPTION_VALUES.mkString(", ")}") - case _ => - } - // Validate user-specified Kafka options if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.GROUP_ID_CONFIG}")) { @@ -215,11 +216,11 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider throw new IllegalArgumentException( s""" |Kafka option '${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG}' is not supported. - |Instead set the source option '$STARTING_OFFSET_OPTION_KEY' to 'earliest' or 'latest' to - |specify where to start. Structured Streaming manages which offsets are consumed + |Instead set the source option '$STARTING_OFFSETS_OPTION_KEY' to 'earliest' or 'latest' + |to specify where to start. Structured Streaming manages which offsets are consumed |internally, rather than relying on the kafkaConsumer to do it. This will ensure that no |data is missed when when new topics/partitions are dynamically subscribed. Note that - |'$STARTING_OFFSET_OPTION_KEY' only applies when a new Streaming query is started, and + |'$STARTING_OFFSETS_OPTION_KEY' only applies when a new Streaming query is started, and |that resuming will always pick up from where the query left off. See the docs for more |details. """.stripMargin) @@ -282,8 +283,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider } private[kafka010] object KafkaSourceProvider { - private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern") - private val STARTING_OFFSET_OPTION_KEY = "startingoffset" - private val STARTING_OFFSET_OPTION_VALUES = Set("earliest", "latest") + private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern", "assign") + private val STARTING_OFFSETS_OPTION_KEY = "startingoffsets" private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss" } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/StartingOffsets.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/StartingOffsets.scala new file mode 100644 index 0000000000..83959e5971 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/StartingOffsets.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +/* + * Values that can be specified for config startingOffsets + */ +private[kafka010] sealed trait StartingOffsets + +private[kafka010] case object EarliestOffsets extends StartingOffsets + +private[kafka010] case object LatestOffsets extends StartingOffsets + +private[kafka010] case class SpecificOffsets( + partitionOffsets: Map[TopicPartition, Long]) extends StartingOffsets diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/JsonUtilsSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/JsonUtilsSuite.scala new file mode 100644 index 0000000000..54b980049d --- /dev/null +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/JsonUtilsSuite.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkFunSuite + +class JsonUtilsSuite extends SparkFunSuite { + + test("parsing partitions") { + val parsed = JsonUtils.partitions("""{"topicA":[0,1],"topicB":[4,6]}""") + val expected = Array( + new TopicPartition("topicA", 0), + new TopicPartition("topicA", 1), + new TopicPartition("topicB", 4), + new TopicPartition("topicB", 6) + ) + assert(parsed.toSeq === expected.toSeq) + } + + test("parsing partitionOffsets") { + val parsed = JsonUtils.partitionOffsets( + """{"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}""") + + assert(parsed(new TopicPartition("topicA", 0)) === 23) + assert(parsed(new TopicPartition("topicA", 1)) === -1) + assert(parsed(new TopicPartition("topicB", 0)) === -2) + } +} diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 8b5296ea13..b50688ecb7 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.util.Random import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.common.TopicPartition import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.execution.streaming._ @@ -52,7 +53,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { protected def makeSureGetOffsetCalled = AssertOnQuery { q => // Because KafkaSource's initialPartitionOffsets is set lazily, we need to make sure // its "getOffset" is called before pushing any data. Otherwise, because of the race contion, - // we don't know which data should be fetched when `startingOffset` is latest. + // we don't know which data should be fetched when `startingOffsets` is latest. q.processAllAvailable() true } @@ -155,26 +156,52 @@ class KafkaSourceSuite extends KafkaSourceTest { ) } + test("assign from latest offsets") { + val topic = newTopic() + testFromLatestOffsets(topic, false, "assign" -> assignString(topic, 0 to 4)) + } + + test("assign from earliest offsets") { + val topic = newTopic() + testFromEarliestOffsets(topic, false, "assign" -> assignString(topic, 0 to 4)) + } + + test("assign from specific offsets") { + val topic = newTopic() + testFromSpecificOffsets(topic, "assign" -> assignString(topic, 0 to 4)) + } + test("subscribing topic by name from latest offsets") { val topic = newTopic() - testFromLatestOffsets(topic, "subscribe" -> topic) + testFromLatestOffsets(topic, true, "subscribe" -> topic) } test("subscribing topic by name from earliest offsets") { val topic = newTopic() - testFromEarliestOffsets(topic, "subscribe" -> topic) + testFromEarliestOffsets(topic, true, "subscribe" -> topic) + } + + test("subscribing topic by name from specific offsets") { + val topic = newTopic() + testFromSpecificOffsets(topic, "subscribe" -> topic) } test("subscribing topic by pattern from latest offsets") { val topicPrefix = newTopic() val topic = topicPrefix + "-suffix" - testFromLatestOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*") + testFromLatestOffsets(topic, true, "subscribePattern" -> s"$topicPrefix-.*") } test("subscribing topic by pattern from earliest offsets") { val topicPrefix = newTopic() val topic = topicPrefix + "-suffix" - testFromEarliestOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*") + testFromEarliestOffsets(topic, true, "subscribePattern" -> s"$topicPrefix-.*") + } + + test("subscribing topic by pattern from specific offsets") { + val topicPrefix = newTopic() + val topic = topicPrefix + "-suffix" + testFromSpecificOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*") } test("subscribing topic by pattern with topic deletions") { @@ -233,6 +260,10 @@ class KafkaSourceSuite extends KafkaSourceTest { testBadOptions("subscribe" -> "t", "subscribePattern" -> "t.*")( "only one", "options can be specified") + testBadOptions("subscribe" -> "t", "assign" -> """{"a":[0]}""")( + "only one", "options can be specified") + + testBadOptions("assign" -> "")("no topicpartitions to assign") testBadOptions("subscribe" -> "")("no topics to subscribe") testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty") } @@ -293,7 +324,61 @@ class KafkaSourceSuite extends KafkaSourceTest { private def newTopic(): String = s"topic-${topicId.getAndIncrement()}" - private def testFromLatestOffsets(topic: String, options: (String, String)*): Unit = { + private def assignString(topic: String, partitions: Iterable[Int]): String = { + JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p))) + } + + private def testFromSpecificOffsets(topic: String, options: (String, String)*): Unit = { + val partitionOffsets = Map( + new TopicPartition(topic, 0) -> -2L, + new TopicPartition(topic, 1) -> -1L, + new TopicPartition(topic, 2) -> 0L, + new TopicPartition(topic, 3) -> 1L, + new TopicPartition(topic, 4) -> 2L + ) + val startingOffsets = JsonUtils.partitionOffsets(partitionOffsets) + + testUtils.createTopic(topic, partitions = 5) + // part 0 starts at earliest, these should all be seen + testUtils.sendMessages(topic, Array(-20, -21, -22).map(_.toString), Some(0)) + // part 1 starts at latest, these should all be skipped + testUtils.sendMessages(topic, Array(-10, -11, -12).map(_.toString), Some(1)) + // part 2 starts at 0, these should all be seen + testUtils.sendMessages(topic, Array(0, 1, 2).map(_.toString), Some(2)) + // part 3 starts at 1, first should be skipped + testUtils.sendMessages(topic, Array(10, 11, 12).map(_.toString), Some(3)) + // part 4 starts at 2, first and second should be skipped + testUtils.sendMessages(topic, Array(20, 21, 22).map(_.toString), Some(4)) + require(testUtils.getLatestOffsets(Set(topic)).size === 5) + + val reader = spark + .readStream + .format("kafka") + .option("startingOffsets", startingOffsets) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + options.foreach { case (k, v) => reader.option(k, v) } + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + + testStream(mapped)( + makeSureGetOffsetCalled, + CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), + StopStream, + StartStream(), + CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), // Should get the data back on recovery + AddKafkaData(Set(topic), 30, 31, 32, 33, 34)(ensureDataInMultiplePartition = true), + CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22, 30, 31, 32, 33, 34), + StopStream + ) + } + + private def testFromLatestOffsets( + topic: String, + addPartitions: Boolean, + options: (String, String)*): Unit = { testUtils.createTopic(topic, partitions = 5) testUtils.sendMessages(topic, Array("-1")) require(testUtils.getLatestOffsets(Set(topic)).size === 5) @@ -301,7 +386,7 @@ class KafkaSourceSuite extends KafkaSourceTest { val reader = spark .readStream .format("kafka") - .option("startingOffset", s"latest") + .option("startingOffsets", s"latest") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") options.foreach { case (k, v) => reader.option(k, v) } @@ -324,7 +409,9 @@ class KafkaSourceSuite extends KafkaSourceTest { AddKafkaData(Set(topic), 7, 8), CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9), AssertOnQuery("Add partitions") { query: StreamExecution => - testUtils.addPartitions(topic, 10) + if (addPartitions) { + testUtils.addPartitions(topic, 10) + } true }, AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16), @@ -332,7 +419,10 @@ class KafkaSourceSuite extends KafkaSourceTest { ) } - private def testFromEarliestOffsets(topic: String, options: (String, String)*): Unit = { + private def testFromEarliestOffsets( + topic: String, + addPartitions: Boolean, + options: (String, String)*): Unit = { testUtils.createTopic(topic, partitions = 5) testUtils.sendMessages(topic, (1 to 3).map { _.toString }.toArray) require(testUtils.getLatestOffsets(Set(topic)).size === 5) @@ -340,7 +430,7 @@ class KafkaSourceSuite extends KafkaSourceTest { val reader = spark.readStream reader .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$")) - .option("startingOffset", s"earliest") + .option("startingOffsets", s"earliest") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") options.foreach { case (k, v) => reader.option(k, v) } @@ -360,7 +450,9 @@ class KafkaSourceSuite extends KafkaSourceTest { StartStream(), CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9), AssertOnQuery("Add partitions") { query: StreamExecution => - testUtils.addPartitions(topic, 10) + if (addPartitions) { + testUtils.addPartitions(topic, 10) + } true }, AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16), diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 3eb8a737ba..9b24ccdd56 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -201,11 +201,23 @@ class KafkaTestUtils extends Logging { /** Send the array of messages to the Kafka broker */ def sendMessages(topic: String, messages: Array[String]): Seq[(String, RecordMetadata)] = { + sendMessages(topic, messages, None) + } + + /** Send the array of messages to the Kafka broker using specified partition */ + def sendMessages( + topic: String, + messages: Array[String], + partition: Option[Int]): Seq[(String, RecordMetadata)] = { producer = new KafkaProducer[String, String](producerConfiguration) val offsets = try { messages.map { m => + val record = partition match { + case Some(p) => new ProducerRecord[String, String](topic, p, null, m) + case None => new ProducerRecord[String, String](topic, m) + } val metadata = - producer.send(new ProducerRecord[String, String](topic, m)).get(10, TimeUnit.SECONDS) + producer.send(record).get(10, TimeUnit.SECONDS) logInfo(s"\tSent $m to partition ${metadata.partition}, offset ${metadata.offset}") (m, metadata) } -- cgit v1.2.3