aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2016-10-21 15:55:04 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-10-21 15:55:04 -0700
commit268ccb9a48dfefc4d7bc85155e7e20a2dfe89307 (patch)
tree799a06a1de6b8767824654ecb2309da8e4f2543a /external
parent140570252fd3739d6bdcadd6d4d5a180e480d3e0 (diff)
downloadspark-268ccb9a48dfefc4d7bc85155e7e20a2dfe89307.tar.gz
spark-268ccb9a48dfefc4d7bc85155e7e20a2dfe89307.tar.bz2
spark-268ccb9a48dfefc4d7bc85155e7e20a2dfe89307.zip
[SPARK-17812][SQL][KAFKA] Assign and specific startingOffsets for structured stream
## What changes were proposed in this pull request? startingOffsets takes specific per-topicpartition offsets as a json argument, usable with any consumer strategy assign with specific topicpartitions as a consumer strategy ## How was this patch tested? Unit tests Author: cody koeninger <cody@koeninger.org> Closes #15504 from koeninger/SPARK-17812.
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala93
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala64
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala52
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/StartingOffsets.scala32
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/JsonUtilsSuite.scala45
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala114
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala14
7 files changed, 369 insertions, 45 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
new file mode 100644
index 0000000000..40d568a12c
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io.Writer
+
+import scala.collection.mutable.HashMap
+import scala.util.control.NonFatal
+
+import org.apache.kafka.common.TopicPartition
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Utilities for converting Kafka related objects to and from json.
+ */
+private object JsonUtils {
+ private implicit val formats = Serialization.formats(NoTypeHints)
+
+ /**
+ * Read TopicPartitions from json string
+ */
+ def partitions(str: String): Array[TopicPartition] = {
+ try {
+ Serialization.read[Map[String, Seq[Int]]](str).flatMap { case (topic, parts) =>
+ parts.map { part =>
+ new TopicPartition(topic, part)
+ }
+ }.toArray
+ } catch {
+ case NonFatal(x) =>
+ throw new IllegalArgumentException(
+ s"""Expected e.g. {"topicA":[0,1],"topicB":[0,1]}, got $str""")
+ }
+ }
+
+ /**
+ * Write TopicPartitions as json string
+ */
+ def partitions(partitions: Iterable[TopicPartition]): String = {
+ val result = new HashMap[String, List[Int]]
+ partitions.foreach { tp =>
+ val parts: List[Int] = result.getOrElse(tp.topic, Nil)
+ result += tp.topic -> (tp.partition::parts)
+ }
+ Serialization.write(result)
+ }
+
+ /**
+ * Read per-TopicPartition offsets from json string
+ */
+ def partitionOffsets(str: String): Map[TopicPartition, Long] = {
+ try {
+ Serialization.read[Map[String, Map[Int, Long]]](str).flatMap { case (topic, partOffsets) =>
+ partOffsets.map { case (part, offset) =>
+ new TopicPartition(topic, part) -> offset
+ }
+ }.toMap
+ } catch {
+ case NonFatal(x) =>
+ throw new IllegalArgumentException(
+ s"""Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got $str""")
+ }
+ }
+
+ /**
+ * Write per-TopicPartition offsets as json string
+ */
+ def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
+ val result = new HashMap[String, HashMap[Int, Long]]()
+ partitionOffsets.foreach { case (tp, off) =>
+ val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
+ parts += tp.partition -> off
+ result += tp.topic -> parts
+ }
+ Serialization.write(result)
+ }
+}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 4b0bb0a0f7..537b7b0baa 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -22,7 +22,7 @@ import java.{util => ju}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
-import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer}
+import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer, OffsetOutOfRangeException}
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition
@@ -82,7 +82,7 @@ private[kafka010] case class KafkaSource(
executorKafkaParams: ju.Map[String, Object],
sourceOptions: Map[String, String],
metadataPath: String,
- startFromEarliestOffset: Boolean,
+ startingOffsets: StartingOffsets,
failOnDataLoss: Boolean)
extends Source with Logging {
@@ -110,10 +110,10 @@ private[kafka010] case class KafkaSource(
private lazy val initialPartitionOffsets = {
val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath)
metadataLog.get(0).getOrElse {
- val offsets = if (startFromEarliestOffset) {
- KafkaSourceOffset(fetchEarliestOffsets())
- } else {
- KafkaSourceOffset(fetchLatestOffsets())
+ val offsets = startingOffsets match {
+ case EarliestOffsets => KafkaSourceOffset(fetchEarliestOffsets())
+ case LatestOffsets => KafkaSourceOffset(fetchLatestOffsets())
+ case SpecificOffsets(p) => KafkaSourceOffset(fetchSpecificStartingOffsets(p))
}
metadataLog.add(0, offsets)
logInfo(s"Initial offsets: $offsets")
@@ -232,6 +232,43 @@ private[kafka010] case class KafkaSource(
override def toString(): String = s"KafkaSource[$consumerStrategy]"
/**
+ * Set consumer position to specified offsets, making sure all assignments are set.
+ */
+ private def fetchSpecificStartingOffsets(
+ partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
+ val result = withRetriesWithoutInterrupt {
+ // Poll to get the latest assigned partitions
+ consumer.poll(0)
+ val partitions = consumer.assignment()
+ consumer.pause(partitions)
+ assert(partitions.asScala == partitionOffsets.keySet,
+ "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
+ "Use -1 for latest, -2 for earliest, if you don't care.\n" +
+ s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")
+ logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets")
+
+ partitionOffsets.foreach {
+ case (tp, -1) => consumer.seekToEnd(ju.Arrays.asList(tp))
+ case (tp, -2) => consumer.seekToBeginning(ju.Arrays.asList(tp))
+ case (tp, off) => consumer.seek(tp, off)
+ }
+ partitionOffsets.map {
+ case (tp, _) => tp -> consumer.position(tp)
+ }
+ }
+ partitionOffsets.foreach {
+ case (tp, off) if off != -1 && off != -2 =>
+ if (result(tp) != off) {
+ reportDataLoss(
+ s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}")
+ }
+ case _ =>
+ // no real way to check that beginning or end is reasonable
+ }
+ result
+ }
+
+ /**
* Fetch the earliest offsets of partitions.
*/
private def fetchEarliestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
@@ -273,7 +310,7 @@ private[kafka010] case class KafkaSource(
consumer.poll(0)
val partitions = consumer.assignment()
consumer.pause(partitions)
- logDebug(s"\tPartitioned assigned to consumer: $partitions")
+ logDebug(s"\tPartitions assigned to consumer: $partitions")
// Get the earliest offset of each partition
consumer.seekToBeginning(partitions)
@@ -317,6 +354,8 @@ private[kafka010] case class KafkaSource(
try {
result = Some(body)
} catch {
+ case x: OffsetOutOfRangeException =>
+ reportDataLoss(x.getMessage)
case NonFatal(e) =>
lastException = e
logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e)
@@ -373,6 +412,17 @@ private[kafka010] object KafkaSource {
def createConsumer(): Consumer[Array[Byte], Array[Byte]]
}
+ case class AssignStrategy(partitions: Array[TopicPartition], kafkaParams: ju.Map[String, Object])
+ extends ConsumerStrategy {
+ override def createConsumer(): Consumer[Array[Byte], Array[Byte]] = {
+ val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
+ consumer.assign(ju.Arrays.asList(partitions: _*))
+ consumer
+ }
+
+ override def toString: String = s"Assign[${partitions.mkString(", ")}]"
+ }
+
case class SubscribeStrategy(topics: Seq[String], kafkaParams: ju.Map[String, Object])
extends ConsumerStrategy {
override def createConsumer(): Consumer[Array[Byte], Array[Byte]] = {
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 23b1b60f3b..585ced875c 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -77,14 +77,12 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
- val startFromEarliestOffset =
- caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY).map(_.trim.toLowerCase) match {
- case Some("latest") => false
- case Some("earliest") => true
- case Some(pos) =>
- // This should not happen since we have already checked the options.
- throw new IllegalStateException(s"Invalid $STARTING_OFFSET_OPTION_KEY: $pos")
- case None => false
+ val startingOffsets =
+ caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
+ case Some("latest") => LatestOffsets
+ case Some("earliest") => EarliestOffsets
+ case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
+ case None => LatestOffsets
}
val kafkaParamsForStrategy =
@@ -95,9 +93,9 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
// So that consumers in Kafka source do not mess with any existing group id
.set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-driver")
- // Set to "latest" to avoid exceptions. However, KafkaSource will fetch the initial offsets
- // by itself instead of counting on KafkaConsumer.
- .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
+ // Set to "earliest" to avoid exceptions. However, KafkaSource will fetch the initial
+ // offsets by itself instead of counting on KafkaConsumer.
+ .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// So that consumers in the driver does not commit offsets unnecessarily
.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@@ -130,6 +128,10 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
.build()
val strategy = caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match {
+ case ("assign", value) =>
+ AssignStrategy(
+ JsonUtils.partitions(value),
+ kafkaParamsForStrategy)
case ("subscribe", value) =>
SubscribeStrategy(
value.split(",").map(_.trim()).filter(_.nonEmpty),
@@ -153,7 +155,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
kafkaParamsForExecutors,
parameters,
metadataPath,
- startFromEarliestOffset,
+ startingOffsets,
failOnDataLoss)
}
@@ -175,6 +177,13 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
}
val strategy = caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match {
+ case ("assign", value) =>
+ if (!value.trim.startsWith("{")) {
+ throw new IllegalArgumentException(
+ "No topicpartitions to assign as specified value for option " +
+ s"'assign' is '$value'")
+ }
+
case ("subscribe", value) =>
val topics = value.split(",").map(_.trim).filter(_.nonEmpty)
if (topics.isEmpty) {
@@ -195,14 +204,6 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
throw new IllegalArgumentException("Unknown option")
}
- caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY) match {
- case Some(pos) if !STARTING_OFFSET_OPTION_VALUES.contains(pos.trim.toLowerCase) =>
- throw new IllegalArgumentException(
- s"Illegal value '$pos' for option '$STARTING_OFFSET_OPTION_KEY', " +
- s"acceptable values are: ${STARTING_OFFSET_OPTION_VALUES.mkString(", ")}")
- case _ =>
- }
-
// Validate user-specified Kafka options
if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.GROUP_ID_CONFIG}")) {
@@ -215,11 +216,11 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
throw new IllegalArgumentException(
s"""
|Kafka option '${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG}' is not supported.
- |Instead set the source option '$STARTING_OFFSET_OPTION_KEY' to 'earliest' or 'latest' to
- |specify where to start. Structured Streaming manages which offsets are consumed
+ |Instead set the source option '$STARTING_OFFSETS_OPTION_KEY' to 'earliest' or 'latest'
+ |to specify where to start. Structured Streaming manages which offsets are consumed
|internally, rather than relying on the kafkaConsumer to do it. This will ensure that no
|data is missed when when new topics/partitions are dynamically subscribed. Note that
- |'$STARTING_OFFSET_OPTION_KEY' only applies when a new Streaming query is started, and
+ |'$STARTING_OFFSETS_OPTION_KEY' only applies when a new Streaming query is started, and
|that resuming will always pick up from where the query left off. See the docs for more
|details.
""".stripMargin)
@@ -282,8 +283,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
}
private[kafka010] object KafkaSourceProvider {
- private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern")
- private val STARTING_OFFSET_OPTION_KEY = "startingoffset"
- private val STARTING_OFFSET_OPTION_VALUES = Set("earliest", "latest")
+ private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern", "assign")
+ private val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/StartingOffsets.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/StartingOffsets.scala
new file mode 100644
index 0000000000..83959e5971
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/StartingOffsets.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+/*
+ * Values that can be specified for config startingOffsets
+ */
+private[kafka010] sealed trait StartingOffsets
+
+private[kafka010] case object EarliestOffsets extends StartingOffsets
+
+private[kafka010] case object LatestOffsets extends StartingOffsets
+
+private[kafka010] case class SpecificOffsets(
+ partitionOffsets: Map[TopicPartition, Long]) extends StartingOffsets
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/JsonUtilsSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/JsonUtilsSuite.scala
new file mode 100644
index 0000000000..54b980049d
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/JsonUtilsSuite.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkFunSuite
+
+class JsonUtilsSuite extends SparkFunSuite {
+
+ test("parsing partitions") {
+ val parsed = JsonUtils.partitions("""{"topicA":[0,1],"topicB":[4,6]}""")
+ val expected = Array(
+ new TopicPartition("topicA", 0),
+ new TopicPartition("topicA", 1),
+ new TopicPartition("topicB", 4),
+ new TopicPartition("topicB", 6)
+ )
+ assert(parsed.toSeq === expected.toSeq)
+ }
+
+ test("parsing partitionOffsets") {
+ val parsed = JsonUtils.partitionOffsets(
+ """{"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}""")
+
+ assert(parsed(new TopicPartition("topicA", 0)) === 23)
+ assert(parsed(new TopicPartition("topicA", 1)) === -1)
+ assert(parsed(new TopicPartition("topicB", 0)) === -2)
+ }
+}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 8b5296ea13..b50688ecb7 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.util.Random
import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.kafka.common.TopicPartition
import org.scalatest.time.SpanSugar._
import org.apache.spark.sql.execution.streaming._
@@ -52,7 +53,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
protected def makeSureGetOffsetCalled = AssertOnQuery { q =>
// Because KafkaSource's initialPartitionOffsets is set lazily, we need to make sure
// its "getOffset" is called before pushing any data. Otherwise, because of the race contion,
- // we don't know which data should be fetched when `startingOffset` is latest.
+ // we don't know which data should be fetched when `startingOffsets` is latest.
q.processAllAvailable()
true
}
@@ -155,26 +156,52 @@ class KafkaSourceSuite extends KafkaSourceTest {
)
}
+ test("assign from latest offsets") {
+ val topic = newTopic()
+ testFromLatestOffsets(topic, false, "assign" -> assignString(topic, 0 to 4))
+ }
+
+ test("assign from earliest offsets") {
+ val topic = newTopic()
+ testFromEarliestOffsets(topic, false, "assign" -> assignString(topic, 0 to 4))
+ }
+
+ test("assign from specific offsets") {
+ val topic = newTopic()
+ testFromSpecificOffsets(topic, "assign" -> assignString(topic, 0 to 4))
+ }
+
test("subscribing topic by name from latest offsets") {
val topic = newTopic()
- testFromLatestOffsets(topic, "subscribe" -> topic)
+ testFromLatestOffsets(topic, true, "subscribe" -> topic)
}
test("subscribing topic by name from earliest offsets") {
val topic = newTopic()
- testFromEarliestOffsets(topic, "subscribe" -> topic)
+ testFromEarliestOffsets(topic, true, "subscribe" -> topic)
+ }
+
+ test("subscribing topic by name from specific offsets") {
+ val topic = newTopic()
+ testFromSpecificOffsets(topic, "subscribe" -> topic)
}
test("subscribing topic by pattern from latest offsets") {
val topicPrefix = newTopic()
val topic = topicPrefix + "-suffix"
- testFromLatestOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*")
+ testFromLatestOffsets(topic, true, "subscribePattern" -> s"$topicPrefix-.*")
}
test("subscribing topic by pattern from earliest offsets") {
val topicPrefix = newTopic()
val topic = topicPrefix + "-suffix"
- testFromEarliestOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*")
+ testFromEarliestOffsets(topic, true, "subscribePattern" -> s"$topicPrefix-.*")
+ }
+
+ test("subscribing topic by pattern from specific offsets") {
+ val topicPrefix = newTopic()
+ val topic = topicPrefix + "-suffix"
+ testFromSpecificOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*")
}
test("subscribing topic by pattern with topic deletions") {
@@ -233,6 +260,10 @@ class KafkaSourceSuite extends KafkaSourceTest {
testBadOptions("subscribe" -> "t", "subscribePattern" -> "t.*")(
"only one", "options can be specified")
+ testBadOptions("subscribe" -> "t", "assign" -> """{"a":[0]}""")(
+ "only one", "options can be specified")
+
+ testBadOptions("assign" -> "")("no topicpartitions to assign")
testBadOptions("subscribe" -> "")("no topics to subscribe")
testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
}
@@ -293,7 +324,61 @@ class KafkaSourceSuite extends KafkaSourceTest {
private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
- private def testFromLatestOffsets(topic: String, options: (String, String)*): Unit = {
+ private def assignString(topic: String, partitions: Iterable[Int]): String = {
+ JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
+ }
+
+ private def testFromSpecificOffsets(topic: String, options: (String, String)*): Unit = {
+ val partitionOffsets = Map(
+ new TopicPartition(topic, 0) -> -2L,
+ new TopicPartition(topic, 1) -> -1L,
+ new TopicPartition(topic, 2) -> 0L,
+ new TopicPartition(topic, 3) -> 1L,
+ new TopicPartition(topic, 4) -> 2L
+ )
+ val startingOffsets = JsonUtils.partitionOffsets(partitionOffsets)
+
+ testUtils.createTopic(topic, partitions = 5)
+ // part 0 starts at earliest, these should all be seen
+ testUtils.sendMessages(topic, Array(-20, -21, -22).map(_.toString), Some(0))
+ // part 1 starts at latest, these should all be skipped
+ testUtils.sendMessages(topic, Array(-10, -11, -12).map(_.toString), Some(1))
+ // part 2 starts at 0, these should all be seen
+ testUtils.sendMessages(topic, Array(0, 1, 2).map(_.toString), Some(2))
+ // part 3 starts at 1, first should be skipped
+ testUtils.sendMessages(topic, Array(10, 11, 12).map(_.toString), Some(3))
+ // part 4 starts at 2, first and second should be skipped
+ testUtils.sendMessages(topic, Array(20, 21, 22).map(_.toString), Some(4))
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("startingOffsets", startingOffsets)
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ options.foreach { case (k, v) => reader.option(k, v) }
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+ testStream(mapped)(
+ makeSureGetOffsetCalled,
+ CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22),
+ StopStream,
+ StartStream(),
+ CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), // Should get the data back on recovery
+ AddKafkaData(Set(topic), 30, 31, 32, 33, 34)(ensureDataInMultiplePartition = true),
+ CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22, 30, 31, 32, 33, 34),
+ StopStream
+ )
+ }
+
+ private def testFromLatestOffsets(
+ topic: String,
+ addPartitions: Boolean,
+ options: (String, String)*): Unit = {
testUtils.createTopic(topic, partitions = 5)
testUtils.sendMessages(topic, Array("-1"))
require(testUtils.getLatestOffsets(Set(topic)).size === 5)
@@ -301,7 +386,7 @@ class KafkaSourceSuite extends KafkaSourceTest {
val reader = spark
.readStream
.format("kafka")
- .option("startingOffset", s"latest")
+ .option("startingOffsets", s"latest")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
options.foreach { case (k, v) => reader.option(k, v) }
@@ -324,7 +409,9 @@ class KafkaSourceSuite extends KafkaSourceTest {
AddKafkaData(Set(topic), 7, 8),
CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
AssertOnQuery("Add partitions") { query: StreamExecution =>
- testUtils.addPartitions(topic, 10)
+ if (addPartitions) {
+ testUtils.addPartitions(topic, 10)
+ }
true
},
AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
@@ -332,7 +419,10 @@ class KafkaSourceSuite extends KafkaSourceTest {
)
}
- private def testFromEarliestOffsets(topic: String, options: (String, String)*): Unit = {
+ private def testFromEarliestOffsets(
+ topic: String,
+ addPartitions: Boolean,
+ options: (String, String)*): Unit = {
testUtils.createTopic(topic, partitions = 5)
testUtils.sendMessages(topic, (1 to 3).map { _.toString }.toArray)
require(testUtils.getLatestOffsets(Set(topic)).size === 5)
@@ -340,7 +430,7 @@ class KafkaSourceSuite extends KafkaSourceTest {
val reader = spark.readStream
reader
.format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$"))
- .option("startingOffset", s"earliest")
+ .option("startingOffsets", s"earliest")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
options.foreach { case (k, v) => reader.option(k, v) }
@@ -360,7 +450,9 @@ class KafkaSourceSuite extends KafkaSourceTest {
StartStream(),
CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
AssertOnQuery("Add partitions") { query: StreamExecution =>
- testUtils.addPartitions(topic, 10)
+ if (addPartitions) {
+ testUtils.addPartitions(topic, 10)
+ }
true
},
AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 3eb8a737ba..9b24ccdd56 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -201,11 +201,23 @@ class KafkaTestUtils extends Logging {
/** Send the array of messages to the Kafka broker */
def sendMessages(topic: String, messages: Array[String]): Seq[(String, RecordMetadata)] = {
+ sendMessages(topic, messages, None)
+ }
+
+ /** Send the array of messages to the Kafka broker using specified partition */
+ def sendMessages(
+ topic: String,
+ messages: Array[String],
+ partition: Option[Int]): Seq[(String, RecordMetadata)] = {
producer = new KafkaProducer[String, String](producerConfiguration)
val offsets = try {
messages.map { m =>
+ val record = partition match {
+ case Some(p) => new ProducerRecord[String, String](topic, p, null, m)
+ case None => new ProducerRecord[String, String](topic, m)
+ }
val metadata =
- producer.send(new ProducerRecord[String, String](topic, m)).get(10, TimeUnit.SECONDS)
+ producer.send(record).get(10, TimeUnit.SECONDS)
logInfo(s"\tSent $m to partition ${metadata.partition}, offset ${metadata.offset}")
(m, metadata)
}