aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10-sql/src
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-10-05 16:45:45 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-10-05 16:45:45 -0700
commit9293734d35eb3d6e4fd4ebb86f54dd5d3a35e6db (patch)
treeece99a6177b900c44cca0a5fa4596c0f41c2cc13 /external/kafka-0-10-sql/src
parent5fd54b994e2078dbf0794932b4e0ffa9a9eda0c3 (diff)
downloadspark-9293734d35eb3d6e4fd4ebb86f54dd5d3a35e6db.tar.gz
spark-9293734d35eb3d6e4fd4ebb86f54dd5d3a35e6db.tar.bz2
spark-9293734d35eb3d6e4fd4ebb86f54dd5d3a35e6db.zip
[SPARK-17346][SQL] Add Kafka source for Structured Streaming
## What changes were proposed in this pull request? This PR adds a new project ` external/kafka-0-10-sql` for Structured Streaming Kafka source. It's based on the design doc: https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing tdas did most of work and part of them was inspired by koeninger's work. ### Introduction The Kafka source is a structured streaming data source to poll data from Kafka. The schema of reading data is as follows: Column | Type ---- | ---- key | binary value | binary topic | string partition | int offset | long timestamp | long timestampType | int The source can deal with deleting topics. However, the user should make sure there is no Spark job processing the data when deleting a topic. ### Configuration The user can use `DataStreamReader.option` to set the following configurations. Kafka Source's options | value | default | meaning ------ | ------- | ------ | ----- startingOffset | ["earliest", "latest"] | "latest" | The start point when a query is started, either "earliest" which is from the earliest offset, or "latest" which is just from the latest offset. Note: This only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off. failOnDataLost | [true, false] | true | Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected. subscribe | A comma-separated list of topics | (none) | The topic list to subscribe. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source. subscribePattern | Java regex string | (none) | The pattern used to subscribe the topic. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source. kafka.consumer.poll.timeoutMs | long | 512 | The timeout in milliseconds to poll data from Kafka in executors fetchOffset.numRetries | int | 3 | Number of times to retry before giving up fatch Kafka latest offsets. fetchOffset.retryIntervalMs | long | 10 | milliseconds to wait before retrying to fetch Kafka offsets Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, `stream.option("kafka.bootstrap.servers", "host:port")` ### Usage * Subscribe to 1 topic ```Scala spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host:port") .option("subscribe", "topic1") .load() ``` * Subscribe to multiple topics ```Scala spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host:port") .option("subscribe", "topic1,topic2") .load() ``` * Subscribe to a pattern ```Scala spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host:port") .option("subscribePattern", "topic.*") .load() ``` ## How was this patch tested? The new unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Shixiong Zhu <zsxwing@gmail.com> Author: cody koeninger <cody@koeninger.org> Closes #15102 from zsxwing/kafka-source.
Diffstat (limited to 'external/kafka-0-10-sql/src')
-rw-r--r--external/kafka-0-10-sql/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister1
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala152
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala399
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala54
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala282
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala148
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package-info.java21
-rw-r--r--external/kafka-0-10-sql/src/test/resources/log4j.properties28
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala39
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala424
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala339
11 files changed, 1887 insertions, 0 deletions
diff --git a/external/kafka-0-10-sql/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/external/kafka-0-10-sql/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000000..2f9e9fc039
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1 @@
+org.apache.spark.sql.kafka010.KafkaSourceProvider
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
new file mode 100644
index 0000000000..3b5a96534f
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads is usually bad anyway.
+ */
+private[kafka010] case class CachedKafkaConsumer private(
+ topicPartition: TopicPartition,
+ kafkaParams: ju.Map[String, Object]) extends Logging {
+
+ private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+ private val consumer = {
+ val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
+ val tps = new ju.ArrayList[TopicPartition]()
+ tps.add(topicPartition)
+ c.assign(tps)
+ c
+ }
+
+ /** Iterator to the already fetch data */
+ private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
+ private var nextOffsetInFetchedData = -2L
+
+ /**
+ * Get the record for the given offset, waiting up to timeout ms if IO is necessary.
+ * Sequential forward access will use buffers, but random access will be horribly inefficient.
+ */
+ def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+ logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset")
+ if (offset != nextOffsetInFetchedData) {
+ logInfo(s"Initial fetch for $topicPartition $offset")
+ seek(offset)
+ poll(pollTimeoutMs)
+ }
+
+ if (!fetchedData.hasNext()) { poll(pollTimeoutMs) }
+ assert(fetchedData.hasNext(),
+ s"Failed to get records for $groupId $topicPartition $offset " +
+ s"after polling for $pollTimeoutMs")
+ var record = fetchedData.next()
+
+ if (record.offset != offset) {
+ logInfo(s"Buffer miss for $groupId $topicPartition $offset")
+ seek(offset)
+ poll(pollTimeoutMs)
+ assert(fetchedData.hasNext(),
+ s"Failed to get records for $groupId $topicPartition $offset " +
+ s"after polling for $pollTimeoutMs")
+ record = fetchedData.next()
+ assert(record.offset == offset,
+ s"Got wrong record for $groupId $topicPartition even after seeking to offset $offset")
+ }
+
+ nextOffsetInFetchedData = offset + 1
+ record
+ }
+
+ private def close(): Unit = consumer.close()
+
+ private def seek(offset: Long): Unit = {
+ logDebug(s"Seeking to $groupId $topicPartition $offset")
+ consumer.seek(topicPartition, offset)
+ }
+
+ private def poll(pollTimeoutMs: Long): Unit = {
+ val p = consumer.poll(pollTimeoutMs)
+ val r = p.records(topicPartition)
+ logDebug(s"Polled $groupId ${p.partitions()} ${r.size}")
+ fetchedData = r.iterator
+ }
+}
+
+private[kafka010] object CachedKafkaConsumer extends Logging {
+
+ private case class CacheKey(groupId: String, topicPartition: TopicPartition)
+
+ private lazy val cache = {
+ val conf = SparkEnv.get.conf
+ val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
+ new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, true) {
+ override def removeEldestEntry(
+ entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
+ if (this.size > capacity) {
+ logWarning(s"KafkaConsumer cache hitting max capacity of $capacity, " +
+ s"removing consumer for ${entry.getKey}")
+ try {
+ entry.getValue.close()
+ } catch {
+ case e: SparkException =>
+ logError(s"Error closing earliest Kafka consumer for ${entry.getKey}", e)
+ }
+ true
+ } else {
+ false
+ }
+ }
+ }
+ }
+
+ /**
+ * Get a cached consumer for groupId, assigned to topic and partition.
+ * If matching consumer doesn't already exist, will be created using kafkaParams.
+ */
+ def getOrCreate(
+ topic: String,
+ partition: Int,
+ kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = synchronized {
+ val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+ val topicPartition = new TopicPartition(topic, partition)
+ val key = CacheKey(groupId, topicPartition)
+
+ // If this is reattempt at running the task, then invalidate cache and start with
+ // a new consumer
+ if (TaskContext.get != null && TaskContext.get.attemptNumber > 1) {
+ cache.remove(key)
+ new CachedKafkaConsumer(topicPartition, kafkaParams)
+ } else {
+ if (!cache.containsKey(key)) {
+ cache.put(key, new CachedKafkaConsumer(topicPartition, kafkaParams))
+ }
+ cache.get(key)
+ }
+ }
+}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
new file mode 100644
index 0000000000..1be70db874
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer}
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.kafka010.KafkaSource._
+import org.apache.spark.sql.types._
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[Source]] that uses Kafka's own [[KafkaConsumer]] API to reads data from Kafka. The design
+ * for this source is as follows.
+ *
+ * - The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * - The [[ConsumerStrategy]] class defines which Kafka topics and partitions should be read
+ * by this source. These strategies directly correspond to the different consumption options
+ * in . This class is designed to return a configured [[KafkaConsumer]] that is used by the
+ * [[KafkaSource]] to query for the offsets. See the docs on
+ * [[org.apache.spark.sql.kafka010.KafkaSource.ConsumerStrategy]] for more details.
+ *
+ * - The [[KafkaSource]] written to do the following.
+ *
+ * - As soon as the source is created, the pre-configured KafkaConsumer returned by the
+ * [[ConsumerStrategy]] is used to query the initial offsets that this source should
+ * start reading from. This used to create the first batch.
+ *
+ * - `getOffset()` uses the KafkaConsumer to query the latest available offsets, which are
+ * returned as a [[KafkaSourceOffset]].
+ *
+ * - `getBatch()` returns a DF that reads from the 'start offset' until the 'end offset' in
+ * for each partition. The end offset is excluded to be consistent with the semantics of
+ * [[KafkaSourceOffset]] and `KafkaConsumer.position()`.
+ *
+ * - The DF returned is based on [[KafkaSourceRDD]] which is constructed such that the
+ * data from Kafka topic + partition is consistently read by the same executors across
+ * batches, and cached KafkaConsumers in the executors can be reused efficiently. See the
+ * docs on [[KafkaSourceRDD]] for more details.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
+ * must make sure all messages in a topic have been processed when deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using KafkaSource maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] case class KafkaSource(
+ sqlContext: SQLContext,
+ consumerStrategy: ConsumerStrategy,
+ executorKafkaParams: ju.Map[String, Object],
+ sourceOptions: Map[String, String],
+ metadataPath: String,
+ failOnDataLoss: Boolean)
+ extends Source with Logging {
+
+ private val sc = sqlContext.sparkContext
+
+ private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
+
+ private val maxOffsetFetchAttempts =
+ sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt
+
+ private val offsetFetchAttemptIntervalMs =
+ sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong
+
+ /**
+ * A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the
+ * offsets and never commits them.
+ */
+ private val consumer = consumerStrategy.createConsumer()
+
+ /**
+ * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
+ * called in StreamExecutionThread. Otherwise, interrupting a thread while running
+ * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+ */
+ private lazy val initialPartitionOffsets = {
+ val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath)
+ metadataLog.get(0).getOrElse {
+ val offsets = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = false))
+ metadataLog.add(0, offsets)
+ logInfo(s"Initial offsets: $offsets")
+ offsets
+ }.partitionToOffsets
+ }
+
+ override def schema: StructType = KafkaSource.kafkaSchema
+
+ /** Returns the maximum available offset for this source. */
+ override def getOffset: Option[Offset] = {
+ // Make sure initialPartitionOffsets is initialized
+ initialPartitionOffsets
+
+ val offset = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = true))
+ logDebug(s"GetOffset: ${offset.partitionToOffsets.toSeq.map(_.toString).sorted}")
+ Some(offset)
+ }
+
+ /**
+ * Returns the data that is between the offsets
+ * [`start.get.partitionToOffsets`, `end.partitionToOffsets`), i.e. end.partitionToOffsets is
+ * exclusive.
+ */
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+ // Make sure initialPartitionOffsets is initialized
+ initialPartitionOffsets
+
+ logInfo(s"GetBatch called with start = $start, end = $end")
+ val untilPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(end)
+ val fromPartitionOffsets = start match {
+ case Some(prevBatchEndOffset) =>
+ KafkaSourceOffset.getPartitionOffsets(prevBatchEndOffset)
+ case None =>
+ initialPartitionOffsets
+ }
+
+ // Find the new partitions, and get their earliest offsets
+ val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
+ val newPartitionOffsets = if (newPartitions.nonEmpty) {
+ fetchNewPartitionEarliestOffsets(newPartitions.toSeq)
+ } else {
+ Map.empty[TopicPartition, Long]
+ }
+ if (newPartitionOffsets.keySet != newPartitions) {
+ // We cannot get from offsets for some partitions. It means they got deleted.
+ val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
+ reportDataLoss(
+ s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+ }
+ logInfo(s"Partitions added: $newPartitionOffsets")
+ newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+ reportDataLoss(
+ s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+ }
+
+ val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
+ if (deletedPartitions.nonEmpty) {
+ reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
+ }
+
+ // Use the until partitions to calculate offset ranges to ignore partitions that have
+ // been deleted
+ val topicPartitions = untilPartitionOffsets.keySet.filter { tp =>
+ // Ignore partitions that we don't know the from offsets.
+ newPartitionOffsets.contains(tp) || fromPartitionOffsets.contains(tp)
+ }.toSeq
+ logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+ val sortedExecutors = getSortedExecutorList(sc)
+ val numExecutors = sortedExecutors.length
+ logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
+
+ // Calculate offset ranges
+ val offsetRanges = topicPartitions.map { tp =>
+ val fromOffset = fromPartitionOffsets.get(tp).getOrElse {
+ newPartitionOffsets.getOrElse(tp, {
+ // This should not happen since newPartitionOffsets contains all partitions not in
+ // fromPartitionOffsets
+ throw new IllegalStateException(s"$tp doesn't have a from offset")
+ })
+ }
+ val untilOffset = untilPartitionOffsets(tp)
+ val preferredLoc = if (numExecutors > 0) {
+ // This allows cached KafkaConsumers in the executors to be re-used to read the same
+ // partition in every batch.
+ Some(sortedExecutors(floorMod(tp.hashCode, numExecutors)))
+ } else None
+ KafkaSourceRDDOffsetRange(tp, fromOffset, untilOffset, preferredLoc)
+ }.filter { range =>
+ if (range.untilOffset < range.fromOffset) {
+ reportDataLoss(s"Partition ${range.topicPartition}'s offset was changed from " +
+ s"${range.fromOffset} to ${range.untilOffset}, some data may have been missed")
+ false
+ } else {
+ true
+ }
+ }.toArray
+
+ // Create a RDD that reads from Kafka and get the (key, value) pair as byte arrays.
+ val rdd = new KafkaSourceRDD(
+ sc, executorKafkaParams, offsetRanges, pollTimeoutMs).map { cr =>
+ Row(cr.key, cr.value, cr.topic, cr.partition, cr.offset, cr.timestamp, cr.timestampType.id)
+ }
+
+ logInfo("GetBatch generating RDD of offset range: " +
+ offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))
+ sqlContext.createDataFrame(rdd, schema)
+ }
+
+ /** Stop this source and free any resources it has allocated. */
+ override def stop(): Unit = synchronized {
+ consumer.close()
+ }
+
+ override def toString(): String = s"KafkaSource[$consumerStrategy]"
+
+ /**
+ * Fetch the offset of a partition, either seek to the latest offsets or use the current offsets
+ * in the consumer.
+ */
+ private def fetchPartitionOffsets(
+ seekToEnd: Boolean): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
+ // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
+ assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
+ // Poll to get the latest assigned partitions
+ consumer.poll(0)
+ val partitions = consumer.assignment()
+ consumer.pause(partitions)
+ logDebug(s"Partitioned assigned to consumer: $partitions")
+
+ // Get the current or latest offset of each partition
+ if (seekToEnd) {
+ consumer.seekToEnd(partitions)
+ logDebug("Seeked to the end")
+ }
+ val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
+ logDebug(s"Got offsets for partition : $partitionOffsets")
+ partitionOffsets
+ }
+
+ /**
+ * Fetch the earliest offsets for newly discovered partitions. The return result may not contain
+ * some partitions if they are deleted.
+ */
+ private def fetchNewPartitionEarliestOffsets(
+ newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
+ // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
+ assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
+ // Poll to get the latest assigned partitions
+ consumer.poll(0)
+ val partitions = consumer.assignment()
+ logDebug(s"\tPartitioned assigned to consumer: $partitions")
+
+ // Get the earliest offset of each partition
+ consumer.seekToBeginning(partitions)
+ val partitionToOffsets = newPartitions.filter { p =>
+ // When deleting topics happen at the same time, some partitions may not be in `partitions`.
+ // So we need to ignore them
+ partitions.contains(p)
+ }.map(p => p -> consumer.position(p)).toMap
+ logDebug(s"Got offsets for new partitions: $partitionToOffsets")
+ partitionToOffsets
+ }
+
+ /**
+ * Helper function that does multiple retries on the a body of code that returns offsets.
+ * Retries are needed to handle transient failures. For e.g. race conditions between getting
+ * assignment and getting position while topics/partitions are deleted can cause NPEs.
+ *
+ * This method also makes sure `body` won't be interrupted to workaround a potential issue in
+ * `KafkaConsumer.poll`. (KAFKA-1894)
+ */
+ private def withRetriesWithoutInterrupt(
+ body: => Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
+ synchronized {
+ var result: Option[Map[TopicPartition, Long]] = None
+ var attempt = 1
+ var lastException: Throwable = null
+ while (result.isEmpty && attempt <= maxOffsetFetchAttempts
+ && !Thread.currentThread().isInterrupted) {
+ Thread.currentThread match {
+ case ut: UninterruptibleThread =>
+ // "KafkaConsumer.poll" may hang forever if the thread is interrupted (E.g., the query
+ // is stopped)(KAFKA-1894). Hence, we just make sure we don't interrupt it.
+ //
+ // If the broker addresses are wrong, or Kafka cluster is down, "KafkaConsumer.poll" may
+ // hang forever as well. This cannot be resolved in KafkaSource until Kafka fixes the
+ // issue.
+ ut.runUninterruptibly {
+ try {
+ result = Some(body)
+ } catch {
+ case NonFatal(e) =>
+ lastException = e
+ logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e)
+ attempt += 1
+ Thread.sleep(offsetFetchAttemptIntervalMs)
+ }
+ }
+ case _ =>
+ throw new IllegalStateException(
+ "Kafka APIs must be executed on a o.a.spark.util.UninterruptibleThread")
+ }
+ }
+ if (Thread.interrupted()) {
+ throw new InterruptedException()
+ }
+ if (result.isEmpty) {
+ assert(attempt > maxOffsetFetchAttempts)
+ assert(lastException != null)
+ throw lastException
+ }
+ result.get
+ }
+ }
+
+ /**
+ * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+ * Otherwise, just log a warning.
+ */
+ private def reportDataLoss(message: String): Unit = {
+ if (failOnDataLoss) {
+ throw new IllegalStateException(message +
+ ". Set the source option 'failOnDataLoss' to 'false' if you want to ignore these checks.")
+ } else {
+ logWarning(message)
+ }
+ }
+}
+
+
+/** Companion object for the [[KafkaSource]]. */
+private[kafka010] object KafkaSource {
+
+ def kafkaSchema: StructType = StructType(Seq(
+ StructField("key", BinaryType),
+ StructField("value", BinaryType),
+ StructField("topic", StringType),
+ StructField("partition", IntegerType),
+ StructField("offset", LongType),
+ StructField("timestamp", LongType),
+ StructField("timestampType", IntegerType)
+ ))
+
+ sealed trait ConsumerStrategy {
+ def createConsumer(): Consumer[Array[Byte], Array[Byte]]
+ }
+
+ case class SubscribeStrategy(topics: Seq[String], kafkaParams: ju.Map[String, Object])
+ extends ConsumerStrategy {
+ override def createConsumer(): Consumer[Array[Byte], Array[Byte]] = {
+ val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
+ consumer.subscribe(topics.asJava)
+ consumer
+ }
+
+ override def toString: String = s"Subscribe[${topics.mkString(", ")}]"
+ }
+
+ case class SubscribePatternStrategy(
+ topicPattern: String, kafkaParams: ju.Map[String, Object])
+ extends ConsumerStrategy {
+ override def createConsumer(): Consumer[Array[Byte], Array[Byte]] = {
+ val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
+ consumer.subscribe(
+ ju.regex.Pattern.compile(topicPattern),
+ new NoOpConsumerRebalanceListener())
+ consumer
+ }
+
+ override def toString: String = s"SubscribePattern[$topicPattern]"
+ }
+
+ private def getSortedExecutorList(sc: SparkContext): Array[String] = {
+ val bm = sc.env.blockManager
+ bm.master.getPeers(bm.blockManagerId).toArray
+ .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+ .sortWith(compare)
+ .map(_.toString)
+ }
+
+ private def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
+ if (a.host == b.host) { a.executorId > b.executorId } else { a.host > b.host }
+ }
+
+ private def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b
+}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
new file mode 100644
index 0000000000..b5ade98251
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.execution.streaming.Offset
+
+/**
+ * An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and
+ * their offsets.
+ */
+private[kafka010]
+case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends Offset {
+ override def toString(): String = {
+ partitionToOffsets.toSeq.sortBy(_._1.toString).mkString("[", ", ", "]")
+ }
+}
+
+/** Companion object of the [[KafkaSourceOffset]] */
+private[kafka010] object KafkaSourceOffset {
+
+ def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long] = {
+ offset match {
+ case o: KafkaSourceOffset => o.partitionToOffsets
+ case _ =>
+ throw new IllegalArgumentException(
+ s"Invalid conversion from offset of ${offset.getClass} to KafkaSourceOffset")
+ }
+ }
+
+ /**
+ * Returns [[KafkaSourceOffset]] from a variable sequence of (topic, partitionId, offset)
+ * tuples.
+ */
+ def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset = {
+ KafkaSourceOffset(offsetTuples.map { case(t, p, o) => (new TopicPartition(t, p), o) }.toMap)
+ }
+}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
new file mode 100644
index 0000000000..1b0a2fe955
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.streaming.Source
+import org.apache.spark.sql.kafka010.KafkaSource._
+import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * The provider class for the [[KafkaSource]]. This provider is designed such that it throws
+ * IllegalArgumentException when the Kafka Dataset is created, so that it can catch
+ * missing options even before the query is started.
+ */
+private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
+ with DataSourceRegister with Logging {
+
+ import KafkaSourceProvider._
+
+ /**
+ * Returns the name and schema of the source. In addition, it also verifies whether the options
+ * are correct and sufficient to create the [[KafkaSource]] when the query is started.
+ */
+ override def sourceSchema(
+ sqlContext: SQLContext,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): (String, StructType) = {
+ require(schema.isEmpty, "Kafka source has a fixed schema and cannot be set with a custom one")
+ validateOptions(parameters)
+ ("kafka", KafkaSource.kafkaSchema)
+ }
+
+ override def createSource(
+ sqlContext: SQLContext,
+ metadataPath: String,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): Source = {
+ validateOptions(parameters)
+ val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) }
+ val specifiedKafkaParams =
+ parameters
+ .keySet
+ .filter(_.toLowerCase.startsWith("kafka."))
+ .map { k => k.drop(6).toString -> parameters(k) }
+ .toMap
+
+ val deserClassName = classOf[ByteArrayDeserializer].getName
+ // Each running query should use its own group id. Otherwise, the query may be only assigned
+ // partial data since Kafka will assign partitions to multiple consumers having the same group
+ // id. Hence, we should generate a unique id for each query.
+ val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
+
+ val autoOffsetResetValue = caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY) match {
+ case Some(value) => value.trim() // same values as those supported by auto.offset.reset
+ case None => "latest"
+ }
+
+ val kafkaParamsForStrategy =
+ ConfigUpdater("source", specifiedKafkaParams)
+ .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
+ .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)
+
+ // So that consumers in Kafka source do not mess with any existing group id
+ .set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-driver")
+
+ // So that consumers can start from earliest or latest
+ .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetValue)
+
+ // So that consumers in the driver does not commit offsets unnecessarily
+ .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+
+ // So that the driver does not pull too much data
+ .set(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, new java.lang.Integer(1))
+
+ // If buffer config is not set, set it to reasonable value to work around
+ // buffer issues (see KAFKA-3135)
+ .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
+ .build()
+
+ val kafkaParamsForExecutors =
+ ConfigUpdater("executor", specifiedKafkaParams)
+ .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
+ .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)
+
+ // Make sure executors do only what the driver tells them.
+ .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
+
+ // So that consumers in executors do not mess with any existing group id
+ .set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor")
+
+ // So that consumers in executors does not commit offsets unnecessarily
+ .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+
+ // If buffer config is not set, set it to reasonable value to work around
+ // buffer issues (see KAFKA-3135)
+ .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
+ .build()
+
+ val strategy = caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match {
+ case ("subscribe", value) =>
+ SubscribeStrategy(
+ value.split(",").map(_.trim()).filter(_.nonEmpty),
+ kafkaParamsForStrategy)
+ case ("subscribepattern", value) =>
+ SubscribePatternStrategy(
+ value.trim(),
+ kafkaParamsForStrategy)
+ case _ =>
+ // Should never reach here as we are already matching on
+ // matched strategy names
+ throw new IllegalArgumentException("Unknown option")
+ }
+
+ val failOnDataLoss =
+ caseInsensitiveParams.getOrElse(FAIL_ON_DATA_LOSS_OPTION_KEY, "true").toBoolean
+
+ new KafkaSource(
+ sqlContext,
+ strategy,
+ kafkaParamsForExecutors,
+ parameters,
+ metadataPath,
+ failOnDataLoss)
+ }
+
+ private def validateOptions(parameters: Map[String, String]): Unit = {
+
+ // Validate source options
+
+ val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) }
+ val specifiedStrategies =
+ caseInsensitiveParams.filter { case (k, _) => STRATEGY_OPTION_KEYS.contains(k) }.toSeq
+ if (specifiedStrategies.isEmpty) {
+ throw new IllegalArgumentException(
+ "One of the following options must be specified for Kafka source: "
+ + STRATEGY_OPTION_KEYS.mkString(", ") + ". See the docs for more details.")
+ } else if (specifiedStrategies.size > 1) {
+ throw new IllegalArgumentException(
+ "Only one of the following options can be specified for Kafka source: "
+ + STRATEGY_OPTION_KEYS.mkString(", ") + ". See the docs for more details.")
+ }
+
+ val strategy = caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match {
+ case ("subscribe", value) =>
+ val topics = value.split(",").map(_.trim).filter(_.nonEmpty)
+ if (topics.isEmpty) {
+ throw new IllegalArgumentException(
+ "No topics to subscribe to as specified value for option " +
+ s"'subscribe' is '$value'")
+ }
+ case ("subscribepattern", value) =>
+ val pattern = caseInsensitiveParams("subscribepattern").trim()
+ if (pattern.isEmpty) {
+ throw new IllegalArgumentException(
+ "Pattern to subscribe is empty as specified value for option " +
+ s"'subscribePattern' is '$value'")
+ }
+ case _ =>
+ // Should never reach here as we are already matching on
+ // matched strategy names
+ throw new IllegalArgumentException("Unknown option")
+ }
+
+ caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY) match {
+ case Some(pos) if !STARTING_OFFSET_OPTION_VALUES.contains(pos.trim.toLowerCase) =>
+ throw new IllegalArgumentException(
+ s"Illegal value '$pos' for option '$STARTING_OFFSET_OPTION_KEY', " +
+ s"acceptable values are: ${STARTING_OFFSET_OPTION_VALUES.mkString(", ")}")
+ case _ =>
+ }
+
+ // Validate user-specified Kafka options
+
+ if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.GROUP_ID_CONFIG}")) {
+ throw new IllegalArgumentException(
+ s"Kafka option '${ConsumerConfig.GROUP_ID_CONFIG}' is not supported as " +
+ s"user-specified consumer groups is not used to track offsets.")
+ }
+
+ if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG}")) {
+ throw new IllegalArgumentException(
+ s"""
+ |Kafka option '${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG}' is not supported.
+ |Instead set the source option '$STARTING_OFFSET_OPTION_KEY' to 'earliest' or 'latest' to
+ |specify where to start. Structured Streaming manages which offsets are consumed
+ |internally, rather than relying on the kafkaConsumer to do it. This will ensure that no
+ |data is missed when when new topics/partitions are dynamically subscribed. Note that
+ |'$STARTING_OFFSET_OPTION_KEY' only applies when a new Streaming query is started, and
+ |that resuming will always pick up from where the query left off. See the docs for more
+ |details.
+ """.stripMargin)
+ }
+
+ if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG}")) {
+ throw new IllegalArgumentException(
+ s"Kafka option '${ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG}' is not supported as keys "
+ + "are deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations "
+ + "to explicitly deserialize the keys.")
+ }
+
+ if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}"))
+ {
+ throw new IllegalArgumentException(
+ s"Kafka option '${ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}' is not supported as "
+ + "value are deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame "
+ + "operations to explicitly deserialize the values.")
+ }
+
+ val otherUnsupportedConfigs = Seq(
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, // committing correctly requires new APIs in Source
+ ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG) // interceptors can modify payload, so not safe
+
+ otherUnsupportedConfigs.foreach { c =>
+ if (caseInsensitiveParams.contains(s"kafka.$c")) {
+ throw new IllegalArgumentException(s"Kafka option '$c' is not supported")
+ }
+ }
+
+ if (!caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG}")) {
+ throw new IllegalArgumentException(
+ s"Option 'kafka.${ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG}' must be specified for " +
+ s"configuring Kafka consumer")
+ }
+ }
+
+ override def shortName(): String = "kafka"
+
+ /** Class to conveniently update Kafka config params, while logging the changes */
+ private case class ConfigUpdater(module: String, kafkaParams: Map[String, String]) {
+ private val map = new ju.HashMap[String, Object](kafkaParams.asJava)
+
+ def set(key: String, value: Object): this.type = {
+ map.put(key, value)
+ logInfo(s"$module: Set $key to $value, earlier value: ${kafkaParams.get(key).getOrElse("")}")
+ this
+ }
+
+ def setIfUnset(key: String, value: Object): ConfigUpdater = {
+ if (!map.containsKey(key)) {
+ map.put(key, value)
+ logInfo(s"$module: Set $key to $value")
+ }
+ this
+ }
+
+ def build(): ju.Map[String, Object] = map
+ }
+}
+
+private[kafka010] object KafkaSourceProvider {
+ private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern")
+ private val STARTING_OFFSET_OPTION_KEY = "startingoffset"
+ private val STARTING_OFFSET_OPTION_VALUES = Set("earliest", "latest")
+ private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
+}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
new file mode 100644
index 0000000000..496af7e39a
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+
+/** Offset range that one partition of the KafkaSourceRDD has to read */
+private[kafka010] case class KafkaSourceRDDOffsetRange(
+ topicPartition: TopicPartition,
+ fromOffset: Long,
+ untilOffset: Long,
+ preferredLoc: Option[String]) {
+ def topic: String = topicPartition.topic
+ def partition: Int = topicPartition.partition
+ def size: Long = untilOffset - fromOffset
+}
+
+
+/** Partition of the KafkaSourceRDD */
+private[kafka010] case class KafkaSourceRDDPartition(
+ index: Int, offsetRange: KafkaSourceRDDOffsetRange) extends Partition
+
+
+/**
+ * An RDD that reads data from Kafka based on offset ranges across multiple partitions.
+ * Additionally, it allows preferred locations to be set for each topic + partition, so that
+ * the [[KafkaSource]] can ensure the same executor always reads the same topic + partition
+ * and cached KafkaConsuemrs (see [[CachedKafkaConsumer]] can be used read data efficiently.
+ *
+ * @param sc the [[SparkContext]]
+ * @param executorKafkaParams Kafka configuration for creating KafkaConsumer on the executors
+ * @param offsetRanges Offset ranges that define the Kafka data belonging to this RDD
+ */
+private[kafka010] class KafkaSourceRDD(
+ sc: SparkContext,
+ executorKafkaParams: ju.Map[String, Object],
+ offsetRanges: Seq[KafkaSourceRDDOffsetRange],
+ pollTimeoutMs: Long)
+ extends RDD[ConsumerRecord[Array[Byte], Array[Byte]]](sc, Nil) {
+
+ override def persist(newLevel: StorageLevel): this.type = {
+ logError("Kafka ConsumerRecord is not serializable. " +
+ "Use .map to extract fields before calling .persist or .window")
+ super.persist(newLevel)
+ }
+
+ override def getPartitions: Array[Partition] = {
+ offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray
+ }
+
+ override def count(): Long = offsetRanges.map(_.size).sum
+
+ override def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] = {
+ val c = count
+ new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
+ }
+
+ override def isEmpty(): Boolean = count == 0L
+
+ override def take(num: Int): Array[ConsumerRecord[Array[Byte], Array[Byte]]] = {
+ val nonEmptyPartitions =
+ this.partitions.map(_.asInstanceOf[KafkaSourceRDDPartition]).filter(_.offsetRange.size > 0)
+
+ if (num < 1 || nonEmptyPartitions.isEmpty) {
+ return new Array[ConsumerRecord[Array[Byte], Array[Byte]]](0)
+ }
+
+ // Determine in advance how many messages need to be taken from each partition
+ val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
+ val remain = num - result.values.sum
+ if (remain > 0) {
+ val taken = Math.min(remain, part.offsetRange.size)
+ result + (part.index -> taken.toInt)
+ } else {
+ result
+ }
+ }
+
+ val buf = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]
+ val res = context.runJob(
+ this,
+ (tc: TaskContext, it: Iterator[ConsumerRecord[Array[Byte], Array[Byte]]]) =>
+ it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
+ )
+ res.foreach(buf ++= _)
+ buf.toArray
+ }
+
+ override def compute(
+ thePart: Partition,
+ context: TaskContext): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = {
+ val range = thePart.asInstanceOf[KafkaSourceRDDPartition].offsetRange
+ assert(
+ range.fromOffset <= range.untilOffset,
+ s"Beginning offset ${range.fromOffset} is after the ending offset ${range.untilOffset} " +
+ s"for topic ${range.topic} partition ${range.partition}. " +
+ "You either provided an invalid fromOffset, or the Kafka topic has been damaged")
+ if (range.fromOffset == range.untilOffset) {
+ logInfo(s"Beginning offset ${range.fromOffset} is the same as ending offset " +
+ s"skipping ${range.topic} ${range.partition}")
+ Iterator.empty
+
+ } else {
+
+ val consumer = CachedKafkaConsumer.getOrCreate(
+ range.topic, range.partition, executorKafkaParams)
+ var requestOffset = range.fromOffset
+
+ logDebug(s"Creating iterator for $range")
+
+ new Iterator[ConsumerRecord[Array[Byte], Array[Byte]]]() {
+ override def hasNext(): Boolean = requestOffset < range.untilOffset
+ override def next(): ConsumerRecord[Array[Byte], Array[Byte]] = {
+ assert(hasNext(), "Can't call next() once untilOffset has been reached")
+ val r = consumer.get(requestOffset, pollTimeoutMs)
+ requestOffset += 1
+ r
+ }
+ }
+ }
+ }
+}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package-info.java b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package-info.java
new file mode 100644
index 0000000000..596f775c56
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Structured Streaming Data Source for Kafka 0.10
+ */
+package org.apache.spark.sql.kafka010;
diff --git a/external/kafka-0-10-sql/src/test/resources/log4j.properties b/external/kafka-0-10-sql/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..75e3b53a09
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN
+
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
new file mode 100644
index 0000000000..7056a41b17
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.spark.sql.streaming.OffsetSuite
+
+class KafkaSourceOffsetSuite extends OffsetSuite {
+
+ compare(
+ one = KafkaSourceOffset(("t", 0, 1L)),
+ two = KafkaSourceOffset(("t", 0, 2L)))
+
+ compare(
+ one = KafkaSourceOffset(("t", 0, 1L), ("t", 1, 0L)),
+ two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L)))
+
+ compare(
+ one = KafkaSourceOffset(("t", 0, 1L), ("T", 0, 0L)),
+ two = KafkaSourceOffset(("t", 0, 2L), ("T", 0, 1L)))
+
+ compare(
+ one = KafkaSourceOffset(("t", 0, 1L)),
+ two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L)))
+}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
new file mode 100644
index 0000000000..64bf503058
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.util.Random
+
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.scalatest.BeforeAndAfter
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+
+abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
+
+ protected var testUtils: KafkaTestUtils = _
+
+ override val streamingTimeout = 30.seconds
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ testUtils = new KafkaTestUtils
+ testUtils.setup()
+ }
+
+ override def afterAll(): Unit = {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ super.afterAll()
+ }
+ }
+
+ protected def makeSureGetOffsetCalled = AssertOnQuery { q =>
+ // Because KafkaSource's initialPartitionOffsets is set lazily, we need to make sure
+ // its "getOffset" is called before pushing any data. Otherwise, because of the race contion,
+ // we don't know which data should be fetched when `startingOffset` is latest.
+ q.processAllAvailable()
+ true
+ }
+
+ /**
+ * Add data to Kafka.
+ *
+ * `topicAction` can be used to run actions for each topic before inserting data.
+ */
+ case class AddKafkaData(topics: Set[String], data: Int*)
+ (implicit ensureDataInMultiplePartition: Boolean = false,
+ concurrent: Boolean = false,
+ message: String = "",
+ topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends AddData {
+
+ override def addData(query: Option[StreamExecution]): (Source, Offset) = {
+ if (query.get.isActive) {
+ // Make sure no Spark job is running when deleting a topic
+ query.get.processAllAvailable()
+ }
+
+ val existingTopics = testUtils.getAllTopicsAndPartitionSize().toMap
+ val newTopics = topics.diff(existingTopics.keySet)
+ for (newTopic <- newTopics) {
+ topicAction(newTopic, None)
+ }
+ for (existingTopicPartitions <- existingTopics) {
+ topicAction(existingTopicPartitions._1, Some(existingTopicPartitions._2))
+ }
+
+ // Read all topics again in case some topics are delete.
+ val allTopics = testUtils.getAllTopicsAndPartitionSize().toMap.keys
+ require(
+ query.nonEmpty,
+ "Cannot add data when there is no query for finding the active kafka source")
+
+ val sources = query.get.logicalPlan.collect {
+ case StreamingExecutionRelation(source, _) if source.isInstanceOf[KafkaSource] =>
+ source.asInstanceOf[KafkaSource]
+ }
+ if (sources.isEmpty) {
+ throw new Exception(
+ "Could not find Kafka source in the StreamExecution logical plan to add data to")
+ } else if (sources.size > 1) {
+ throw new Exception(
+ "Could not select the Kafka source in the StreamExecution logical plan as there" +
+ "are multiple Kafka sources:\n\t" + sources.mkString("\n\t"))
+ }
+ val kafkaSource = sources.head
+ val topic = topics.toSeq(Random.nextInt(topics.size))
+ val sentMetadata = testUtils.sendMessages(topic, data.map { _.toString }.toArray)
+
+ def metadataToStr(m: (String, RecordMetadata)): String = {
+ s"Sent ${m._1} to partition ${m._2.partition()}, offset ${m._2.offset()}"
+ }
+ // Verify that the test data gets inserted into multiple partitions
+ if (ensureDataInMultiplePartition) {
+ require(
+ sentMetadata.groupBy(_._2.partition).size > 1,
+ s"Added data does not test multiple partitions: ${sentMetadata.map(metadataToStr)}")
+ }
+
+ val offset = KafkaSourceOffset(testUtils.getLatestOffsets(topics))
+ logInfo(s"Added data, expected offset $offset")
+ (kafkaSource, offset)
+ }
+
+ override def toString: String =
+ s"AddKafkaData(topics = $topics, data = $data, message = $message)"
+ }
+}
+
+
+class KafkaSourceSuite extends KafkaSourceTest {
+
+ import testImplicits._
+
+ private val topicId = new AtomicInteger(0)
+
+ test("cannot stop Kafka stream") {
+ val topic = newTopic()
+ testUtils.createTopic(newTopic(), partitions = 5)
+ testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribePattern", s"topic-.*")
+
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+ testStream(mapped)(
+ StopStream
+ )
+ }
+
+ test("subscribing topic by name from latest offsets") {
+ val topic = newTopic()
+ testFromLatestOffsets(topic, "subscribe" -> topic)
+ }
+
+ test("subscribing topic by name from earliest offsets") {
+ val topic = newTopic()
+ testFromEarliestOffsets(topic, "subscribe" -> topic)
+ }
+
+ test("subscribing topic by pattern from latest offsets") {
+ val topicPrefix = newTopic()
+ val topic = topicPrefix + "-suffix"
+ testFromLatestOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*")
+ }
+
+ test("subscribing topic by pattern from earliest offsets") {
+ val topicPrefix = newTopic()
+ val topic = topicPrefix + "-suffix"
+ testFromEarliestOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*")
+ }
+
+ test("subscribing topic by pattern with topic deletions") {
+ val topicPrefix = newTopic()
+ val topic = topicPrefix + "-seems"
+ val topic2 = topicPrefix + "-bad"
+ testUtils.createTopic(topic, partitions = 5)
+ testUtils.sendMessages(topic, Array("-1"))
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribePattern", s"$topicPrefix-.*")
+ .option("failOnDataLoss", "false")
+
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+ testStream(mapped)(
+ makeSureGetOffsetCalled,
+ AddKafkaData(Set(topic), 1, 2, 3),
+ CheckAnswer(2, 3, 4),
+ Assert {
+ testUtils.deleteTopic(topic)
+ testUtils.createTopic(topic2, partitions = 5)
+ true
+ },
+ AddKafkaData(Set(topic2), 4, 5, 6),
+ CheckAnswer(2, 3, 4, 5, 6, 7)
+ )
+ }
+
+ test("bad source options") {
+ def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = {
+ val ex = intercept[IllegalArgumentException] {
+ val reader = spark
+ .readStream
+ .format("kafka")
+ options.foreach { case (k, v) => reader.option(k, v) }
+ reader.load()
+ }
+ expectedMsgs.foreach { m =>
+ assert(ex.getMessage.toLowerCase.contains(m.toLowerCase))
+ }
+ }
+
+ // No strategy specified
+ testBadOptions()("options must be specified", "subscribe", "subscribePattern")
+
+ // Multiple strategies specified
+ testBadOptions("subscribe" -> "t", "subscribePattern" -> "t.*")(
+ "only one", "options can be specified")
+
+ testBadOptions("subscribe" -> "")("no topics to subscribe")
+ testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
+ }
+
+ test("unsupported kafka configs") {
+ def testUnsupportedConfig(key: String, value: String = "someValue"): Unit = {
+ val ex = intercept[IllegalArgumentException] {
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("subscribe", "topic")
+ .option("kafka.bootstrap.servers", "somehost")
+ .option(s"$key", value)
+ reader.load()
+ }
+ assert(ex.getMessage.toLowerCase.contains("not supported"))
+ }
+
+ testUnsupportedConfig("kafka.group.id")
+ testUnsupportedConfig("kafka.auto.offset.reset")
+ testUnsupportedConfig("kafka.enable.auto.commit")
+ testUnsupportedConfig("kafka.interceptor.classes")
+ testUnsupportedConfig("kafka.key.deserializer")
+ testUnsupportedConfig("kafka.value.deserializer")
+
+ testUnsupportedConfig("kafka.auto.offset.reset", "none")
+ testUnsupportedConfig("kafka.auto.offset.reset", "someValue")
+ testUnsupportedConfig("kafka.auto.offset.reset", "earliest")
+ testUnsupportedConfig("kafka.auto.offset.reset", "latest")
+ }
+
+ private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+ private def testFromLatestOffsets(topic: String, options: (String, String)*): Unit = {
+ testUtils.createTopic(topic, partitions = 5)
+ testUtils.sendMessages(topic, Array("-1"))
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("startingOffset", s"latest")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ options.foreach { case (k, v) => reader.option(k, v) }
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+ testStream(mapped)(
+ makeSureGetOffsetCalled,
+ AddKafkaData(Set(topic), 1, 2, 3),
+ CheckAnswer(2, 3, 4),
+ StopStream,
+ StartStream(),
+ CheckAnswer(2, 3, 4), // Should get the data back on recovery
+ StopStream,
+ AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped
+ StartStream(),
+ CheckAnswer(2, 3, 4, 5, 6, 7), // Should get the added data
+ AddKafkaData(Set(topic), 7, 8),
+ CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
+ AssertOnQuery("Add partitions") { query: StreamExecution =>
+ testUtils.addPartitions(topic, 10)
+ true
+ },
+ AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
+ CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
+ )
+ }
+
+ private def testFromEarliestOffsets(topic: String, options: (String, String)*): Unit = {
+ testUtils.createTopic(topic, partitions = 5)
+ testUtils.sendMessages(topic, (1 to 3).map { _.toString }.toArray)
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ val reader = spark.readStream
+ reader
+ .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$"))
+ .option("startingOffset", s"earliest")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ options.foreach { case (k, v) => reader.option(k, v) }
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+ testStream(mapped)(
+ AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped
+ CheckAnswer(2, 3, 4, 5, 6, 7),
+ StopStream,
+ StartStream(),
+ CheckAnswer(2, 3, 4, 5, 6, 7),
+ StopStream,
+ AddKafkaData(Set(topic), 7, 8),
+ StartStream(),
+ CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
+ AssertOnQuery("Add partitions") { query: StreamExecution =>
+ testUtils.addPartitions(topic, 10)
+ true
+ },
+ AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
+ CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
+ )
+ }
+}
+
+
+class KafkaSourceStressSuite extends KafkaSourceTest with BeforeAndAfter {
+
+ import testImplicits._
+
+ val topicId = new AtomicInteger(1)
+
+ @volatile var topics: Seq[String] = (1 to 5).map(_ => newStressTopic)
+
+ def newStressTopic: String = s"stress${topicId.getAndIncrement()}"
+
+ private def nextInt(start: Int, end: Int): Int = {
+ start + Random.nextInt(start + end - 1)
+ }
+
+ after {
+ for (topic <- testUtils.getAllTopicsAndPartitionSize().toMap.keys) {
+ testUtils.deleteTopic(topic)
+ }
+ }
+
+ test("stress test with multiple topics and partitions") {
+ topics.foreach { topic =>
+ testUtils.createTopic(topic, partitions = nextInt(1, 6))
+ testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray)
+ }
+
+ // Create Kafka source that reads from latest offset
+ val kafka =
+ spark.readStream
+ .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$"))
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribePattern", "stress.*")
+ .option("failOnDataLoss", "false")
+ .load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+
+ val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+ runStressTest(
+ mapped,
+ Seq(makeSureGetOffsetCalled),
+ (d, running) => {
+ Random.nextInt(5) match {
+ case 0 => // Add a new topic
+ topics = topics ++ Seq(newStressTopic)
+ AddKafkaData(topics.toSet, d: _*)(message = s"Add topic $newStressTopic",
+ topicAction = (topic, partition) => {
+ if (partition.isEmpty) {
+ testUtils.createTopic(topic, partitions = nextInt(1, 6))
+ }
+ })
+ case 1 if running =>
+ // Only delete a topic when the query is running. Otherwise, we may lost data and
+ // cannot check the correctness.
+ val deletedTopic = topics(Random.nextInt(topics.size))
+ if (deletedTopic != topics.head) {
+ topics = topics.filterNot(_ == deletedTopic)
+ }
+ AddKafkaData(topics.toSet, d: _*)(message = s"Delete topic $deletedTopic",
+ topicAction = (topic, partition) => {
+ // Never remove the first topic to make sure we have at least one topic
+ if (topic == deletedTopic && deletedTopic != topics.head) {
+ testUtils.deleteTopic(deletedTopic)
+ }
+ })
+ case 2 => // Add new partitions
+ AddKafkaData(topics.toSet, d: _*)(message = "Add partitiosn",
+ topicAction = (topic, partition) => {
+ testUtils.addPartitions(topic, partition.get + nextInt(1, 6))
+ })
+ case _ => // Just add new data
+ AddKafkaData(topics.toSet, d: _*)
+ }
+ },
+ iterations = 50)
+ }
+}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
new file mode 100644
index 0000000000..3eb8a737ba
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io.File
+import java.lang.{Integer => JInt}
+import java.net.InetSocketAddress
+import java.util.{Map => JMap, Properties}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.Random
+
+import kafka.admin.AdminUtils
+import kafka.api.Request
+import kafka.common.TopicAndPartition
+import kafka.server.{KafkaConfig, KafkaServer, OffsetCheckpoint}
+import kafka.utils.ZkUtils
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.producer._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
+import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+import org.apache.spark.SparkConf
+
+/**
+ * This is a helper class for Kafka test suites. This has the functionality to set up
+ * and tear down local Kafka servers, and to push data using Kafka producers.
+ *
+ * The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
+ */
+class KafkaTestUtils extends Logging {
+
+ // Zookeeper related configurations
+ private val zkHost = "localhost"
+ private var zkPort: Int = 0
+ private val zkConnectionTimeout = 60000
+ private val zkSessionTimeout = 6000
+
+ private var zookeeper: EmbeddedZookeeper = _
+
+ private var zkUtils: ZkUtils = _
+
+ // Kafka broker related configurations
+ private val brokerHost = "localhost"
+ private var brokerPort = 0
+ private var brokerConf: KafkaConfig = _
+
+ // Kafka broker server
+ private var server: KafkaServer = _
+
+ // Kafka producer
+ private var producer: Producer[String, String] = _
+
+ // Flag to test whether the system is correctly started
+ private var zkReady = false
+ private var brokerReady = false
+
+ def zkAddress: String = {
+ assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address")
+ s"$zkHost:$zkPort"
+ }
+
+ def brokerAddress: String = {
+ assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address")
+ s"$brokerHost:$brokerPort"
+ }
+
+ def zookeeperClient: ZkUtils = {
+ assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
+ Option(zkUtils).getOrElse(
+ throw new IllegalStateException("Zookeeper client is not yet initialized"))
+ }
+
+ // Set up the Embedded Zookeeper server and get the proper Zookeeper port
+ private def setupEmbeddedZookeeper(): Unit = {
+ // Zookeeper server startup
+ zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+ // Get the actual zookeeper binding port
+ zkPort = zookeeper.actualPort
+ zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false)
+ zkReady = true
+ }
+
+ // Set up the Embedded Kafka server
+ private def setupEmbeddedKafkaServer(): Unit = {
+ assert(zkReady, "Zookeeper should be set up beforehand")
+
+ // Kafka broker startup
+ Utils.startServiceOnPort(brokerPort, port => {
+ brokerPort = port
+ brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
+ server = new KafkaServer(brokerConf)
+ server.startup()
+ brokerPort = server.boundPort()
+ (server, brokerPort)
+ }, new SparkConf(), "KafkaBroker")
+
+ brokerReady = true
+ }
+
+ /** setup the whole embedded servers, including Zookeeper and Kafka brokers */
+ def setup(): Unit = {
+ setupEmbeddedZookeeper()
+ setupEmbeddedKafkaServer()
+ }
+
+ /** Teardown the whole servers, including Kafka broker and Zookeeper */
+ def teardown(): Unit = {
+ brokerReady = false
+ zkReady = false
+
+ if (producer != null) {
+ producer.close()
+ producer = null
+ }
+
+ if (server != null) {
+ server.shutdown()
+ server = null
+ }
+
+ brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+
+ if (zkUtils != null) {
+ zkUtils.close()
+ zkUtils = null
+ }
+
+ if (zookeeper != null) {
+ zookeeper.shutdown()
+ zookeeper = null
+ }
+ }
+
+ /** Create a Kafka topic and wait until it is propagated to the whole cluster */
+ def createTopic(topic: String, partitions: Int): Unit = {
+ AdminUtils.createTopic(zkUtils, topic, partitions, 1)
+ // wait until metadata is propagated
+ (0 until partitions).foreach { p =>
+ waitUntilMetadataIsPropagated(topic, p)
+ }
+ }
+
+ def getAllTopicsAndPartitionSize(): Seq[(String, Int)] = {
+ zkUtils.getPartitionsForTopics(zkUtils.getAllTopics()).mapValues(_.size).toSeq
+ }
+
+ /** Create a Kafka topic and wait until it is propagated to the whole cluster */
+ def createTopic(topic: String): Unit = {
+ createTopic(topic, 1)
+ }
+
+ /** Delete a Kafka topic and wait until it is propagated to the whole cluster */
+ def deleteTopic(topic: String): Unit = {
+ val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size
+ AdminUtils.deleteTopic(zkUtils, topic)
+ verifyTopicDeletion(zkUtils, topic, partitions, List(this.server))
+ }
+
+ /** Add new paritions to a Kafka topic */
+ def addPartitions(topic: String, partitions: Int): Unit = {
+ AdminUtils.addPartitions(zkUtils, topic, partitions)
+ // wait until metadata is propagated
+ (0 until partitions).foreach { p =>
+ waitUntilMetadataIsPropagated(topic, p)
+ }
+ }
+
+ /** Java-friendly function for sending messages to the Kafka broker */
+ def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
+ sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
+ }
+
+ /** Send the messages to the Kafka broker */
+ def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
+ val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray
+ sendMessages(topic, messages)
+ }
+
+ /** Send the array of messages to the Kafka broker */
+ def sendMessages(topic: String, messages: Array[String]): Seq[(String, RecordMetadata)] = {
+ producer = new KafkaProducer[String, String](producerConfiguration)
+ val offsets = try {
+ messages.map { m =>
+ val metadata =
+ producer.send(new ProducerRecord[String, String](topic, m)).get(10, TimeUnit.SECONDS)
+ logInfo(s"\tSent $m to partition ${metadata.partition}, offset ${metadata.offset}")
+ (m, metadata)
+ }
+ } finally {
+ if (producer != null) {
+ producer.close()
+ producer = null
+ }
+ }
+ offsets
+ }
+
+ def getLatestOffsets(topics: Set[String]): Map[TopicPartition, Long] = {
+ val kc = new KafkaConsumer[String, String](consumerConfiguration)
+ logInfo("Created consumer to get latest offsets")
+ kc.subscribe(topics.asJavaCollection)
+ kc.poll(0)
+ val partitions = kc.assignment()
+ kc.pause(partitions)
+ kc.seekToEnd(partitions)
+ val offsets = partitions.asScala.map(p => p -> kc.position(p)).toMap
+ kc.close()
+ logInfo("Closed consumer to get latest offsets")
+ offsets
+ }
+
+ private def brokerConfiguration: Properties = {
+ val props = new Properties()
+ props.put("broker.id", "0")
+ props.put("host.name", "localhost")
+ props.put("advertised.host.name", "localhost")
+ props.put("port", brokerPort.toString)
+ props.put("log.dir", Utils.createTempDir().getAbsolutePath)
+ props.put("zookeeper.connect", zkAddress)
+ props.put("log.flush.interval.messages", "1")
+ props.put("replica.socket.timeout.ms", "1500")
+ props.put("delete.topic.enable", "true")
+ props
+ }
+
+ private def producerConfiguration: Properties = {
+ val props = new Properties()
+ props.put("bootstrap.servers", brokerAddress)
+ props.put("value.serializer", classOf[StringSerializer].getName)
+ props.put("key.serializer", classOf[StringSerializer].getName)
+ // wait for all in-sync replicas to ack sends
+ props.put("acks", "all")
+ props
+ }
+
+ private def consumerConfiguration: Properties = {
+ val props = new Properties()
+ props.put("bootstrap.servers", brokerAddress)
+ props.put("group.id", "group-KafkaTestUtils-" + Random.nextInt)
+ props.put("value.deserializer", classOf[StringDeserializer].getName)
+ props.put("key.deserializer", classOf[StringDeserializer].getName)
+ props.put("enable.auto.commit", "false")
+ props
+ }
+
+ private def verifyTopicDeletion(
+ zkUtils: ZkUtils,
+ topic: String,
+ numPartitions: Int,
+ servers: Seq[KafkaServer]) {
+ import ZkUtils._
+ val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
+ def isDeleted(): Boolean = {
+ // wait until admin path for delete topic is deleted, signaling completion of topic deletion
+ val deletePath = !zkUtils.pathExists(getDeleteTopicPath(topic))
+ val topicPath = !zkUtils.pathExists(getTopicPath(topic))
+ // ensure that the topic-partition has been deleted from all brokers' replica managers
+ val replicaManager = servers.forall(server => topicAndPartitions.forall(tp =>
+ server.replicaManager.getPartition(tp.topic, tp.partition) == None))
+ // ensure that logs from all replicas are deleted if delete topic is marked successful
+ val logManager = servers.forall(server => topicAndPartitions.forall(tp =>
+ server.getLogManager().getLog(tp).isEmpty))
+ // ensure that topic is removed from all cleaner offsets
+ val cleaner = servers.forall(server => topicAndPartitions.forall { tp =>
+ val checkpoints = server.getLogManager().logDirs.map { logDir =>
+ new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
+ }
+ checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
+ })
+ deletePath && topicPath && replicaManager && logManager && cleaner
+ }
+ eventually(timeout(10.seconds)) {
+ assert(isDeleted, s"$topic not deleted after timeout")
+ }
+ }
+
+ private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
+ def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
+ case Some(partitionState) =>
+ val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
+
+ zkUtils.getLeaderForPartition(topic, partition).isDefined &&
+ Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
+ leaderAndInSyncReplicas.isr.size >= 1
+
+ case _ =>
+ false
+ }
+ eventually(timeout(10.seconds)) {
+ assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
+ }
+ }
+
+ private class EmbeddedZookeeper(val zkConnect: String) {
+ val snapshotDir = Utils.createTempDir()
+ val logDir = Utils.createTempDir()
+
+ val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
+ val (ip, port) = {
+ val splits = zkConnect.split(":")
+ (splits(0), splits(1).toInt)
+ }
+ val factory = new NIOServerCnxnFactory()
+ factory.configure(new InetSocketAddress(ip, port), 16)
+ factory.startup(zookeeper)
+
+ val actualPort = factory.getLocalPort
+
+ def shutdown() {
+ factory.shutdown()
+ Utils.deleteRecursively(snapshotDir)
+ Utils.deleteRecursively(logDir)
+ }
+ }
+}
+