path: root/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
diff options
Diffstat (limited to 'external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala')
1 files changed, 41 insertions, 282 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 8e6c93e672..02b23111af 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -21,11 +21,6 @@ import java.{util => ju}
import java.io._
import java.nio.charset.StandardCharsets
-import scala.collection.JavaConverters._
-import scala.util.control.NonFatal
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer}
-import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkContext
@@ -38,11 +33,9 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.kafka010.KafkaSource._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.UninterruptibleThread
- * A [[Source]] that uses Kafka's own [[KafkaConsumer]] API to reads data from Kafka. The design
- * for this source is as follows.
+ * A [[Source]] that reads data from Kafka using the following design.
* - The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
* a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
@@ -50,20 +43,14 @@ import org.apache.spark.util.UninterruptibleThread
* KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
* with the semantics of `KafkaConsumer.position()`.
- * - The [[ConsumerStrategy]] class defines which Kafka topics and partitions should be read
- * by this source. These strategies directly correspond to the different consumption options
- * in . This class is designed to return a configured [[KafkaConsumer]] that is used by the
- * [[KafkaSource]] to query for the offsets. See the docs on
- * [[org.apache.spark.sql.kafka010.KafkaSource.ConsumerStrategy]] for more details.
- *
* - The [[KafkaSource]] written to do the following.
- * - As soon as the source is created, the pre-configured KafkaConsumer returned by the
- * [[ConsumerStrategy]] is used to query the initial offsets that this source should
- * start reading from. This used to create the first batch.
+ * - As soon as the source is created, the pre-configured [[KafkaOffsetReader]]
+ * is used to query the initial offsets that this source should
+ * start reading from. This is used to create the first batch.
- * - `getOffset()` uses the KafkaConsumer to query the latest available offsets, which are
- * returned as a [[KafkaSourceOffset]].
+ * - `getOffset()` uses the [[KafkaOffsetReader]] to query the latest
+ * available offsets, which are returned as a [[KafkaSourceOffset]].
* - `getBatch()` returns a DF that reads from the 'start offset' until the 'end offset' in
* for each partition. The end offset is excluded to be consistent with the semantics of
@@ -82,15 +69,13 @@ import org.apache.spark.util.UninterruptibleThread
* and not use wrong broker addresses.
private[kafka010] class KafkaSource(
- sqlContext: SQLContext,
- consumerStrategy: ConsumerStrategy,
- driverKafkaParams: ju.Map[String, Object],
- executorKafkaParams: ju.Map[String, Object],
- sourceOptions: Map[String, String],
- metadataPath: String,
- startingOffsets: StartingOffsets,
- failOnDataLoss: Boolean,
- driverGroupIdPrefix: String)
+ sqlContext: SQLContext,
+ kafkaReader: KafkaOffsetReader,
+ executorKafkaParams: ju.Map[String, Object],
+ sourceOptions: Map[String, String],
+ metadataPath: String,
+ startingOffsets: KafkaOffsetRangeLimit,
+ failOnDataLoss: Boolean)
extends Source with Logging {
private val sc = sqlContext.sparkContext
@@ -100,41 +85,9 @@ private[kafka010] class KafkaSource(
sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
- private val maxOffsetFetchAttempts =
- sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt
- private val offsetFetchAttemptIntervalMs =
- sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
private val maxOffsetsPerTrigger =
- private var groupId: String = null
- private var nextId = 0
- private def nextGroupId(): String = {
- groupId = driverGroupIdPrefix + "-" + nextId
- nextId += 1
- groupId
- }
- /**
- * A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the
- * offsets and never commits them.
- */
- private var consumer: Consumer[Array[Byte], Array[Byte]] = createConsumer()
- /**
- * Create a consumer using the new generated group id. We always use a new consumer to avoid
- * just using a broken consumer to retry on Kafka errors, which likely will fail again.
- */
- private def createConsumer(): Consumer[Array[Byte], Array[Byte]] = synchronized {
- val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
- newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
- consumerStrategy.createConsumer(newKafkaParams)
- }
* Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
* called in StreamExecutionThread. Otherwise, interrupting a thread while running
@@ -159,9 +112,9 @@ private[kafka010] class KafkaSource(
metadataLog.get(0).getOrElse {
val offsets = startingOffsets match {
- case EarliestOffsets => KafkaSourceOffset(fetchEarliestOffsets())
- case LatestOffsets => KafkaSourceOffset(fetchLatestOffsets())
- case SpecificOffsets(p) => KafkaSourceOffset(fetchSpecificStartingOffsets(p))
+ case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
+ case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+ case SpecificOffsetRangeLimit(p) => fetchAndVerify(p)
metadataLog.add(0, offsets)
logInfo(s"Initial offsets: $offsets")
@@ -169,16 +122,31 @@ private[kafka010] class KafkaSource(
+ private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = {
+ val result = kafkaReader.fetchSpecificOffsets(specificOffsets)
+ specificOffsets.foreach {
+ case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+ off != KafkaOffsetRangeLimit.EARLIEST =>
+ if (result(tp) != off) {
+ reportDataLoss(
+ s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}")
+ }
+ case _ =>
+ // no real way to check that beginning or end is reasonable
+ }
+ KafkaSourceOffset(result)
+ }
private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None
- override def schema: StructType = KafkaSource.kafkaSchema
+ override def schema: StructType = KafkaOffsetReader.kafkaSchema
/** Returns the maximum available offset for this source. */
override def getOffset: Option[Offset] = {
// Make sure initialPartitionOffsets is initialized
- val latest = fetchLatestOffsets()
+ val latest = kafkaReader.fetchLatestOffsets()
val offsets = maxOffsetsPerTrigger match {
case None =>
@@ -193,17 +161,12 @@ private[kafka010] class KafkaSource(
- private def resetConsumer(): Unit = synchronized {
- consumer.close()
- consumer = createConsumer()
- }
/** Proportionally distribute limit number of offsets among topicpartitions */
private def rateLimit(
limit: Long,
from: Map[TopicPartition, Long],
until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
- val fromNew = fetchNewPartitionEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+ val fromNew = kafkaReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
val sizes = until.flatMap {
case (tp, end) =>
// If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
@@ -253,7 +216,7 @@ private[kafka010] class KafkaSource(
// Find the new partitions, and get their earliest offsets
val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
- val newPartitionOffsets = fetchNewPartitionEarliestOffsets(newPartitions.toSeq)
+ val newPartitionOffsets = kafkaReader.fetchEarliestOffsets(newPartitions.toSeq)
if (newPartitionOffsets.keySet != newPartitions) {
// We cannot get from offsets for some partitions. It means they got deleted.
val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
@@ -311,7 +274,8 @@ private[kafka010] class KafkaSource(
// Create an RDD that reads from Kafka and get the (key, value) pair as byte arrays.
val rdd = new KafkaSourceRDD(
- sc, executorKafkaParams, offsetRanges, pollTimeoutMs, failOnDataLoss).map { cr =>
+ sc, executorKafkaParams, offsetRanges, pollTimeoutMs, failOnDataLoss,
+ reuseKafkaConsumer = true).map { cr =>
@@ -335,163 +299,10 @@ private[kafka010] class KafkaSource(
/** Stop this source and free any resources it has allocated. */
override def stop(): Unit = synchronized {
- consumer.close()
+ kafkaReader.close()
- override def toString(): String = s"KafkaSource[$consumerStrategy]"
- /**
- * Set consumer position to specified offsets, making sure all assignments are set.
- */
- private def fetchSpecificStartingOffsets(
- partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
- val result = withRetriesWithoutInterrupt {
- // Poll to get the latest assigned partitions
- consumer.poll(0)
- val partitions = consumer.assignment()
- consumer.pause(partitions)
- assert(partitions.asScala == partitionOffsets.keySet,
- "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
- "Use -1 for latest, -2 for earliest, if you don't care.\n" +
- s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")
- logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets")
- partitionOffsets.foreach {
- case (tp, -1) => consumer.seekToEnd(ju.Arrays.asList(tp))
- case (tp, -2) => consumer.seekToBeginning(ju.Arrays.asList(tp))
- case (tp, off) => consumer.seek(tp, off)
- }
- partitionOffsets.map {
- case (tp, _) => tp -> consumer.position(tp)
- }
- }
- partitionOffsets.foreach {
- case (tp, off) if off != -1 && off != -2 =>
- if (result(tp) != off) {
- reportDataLoss(
- s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}")
- }
- case _ =>
- // no real way to check that beginning or end is reasonable
- }
- result
- }
- /**
- * Fetch the earliest offsets of partitions.
- */
- private def fetchEarliestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
- // Poll to get the latest assigned partitions
- consumer.poll(0)
- val partitions = consumer.assignment()
- consumer.pause(partitions)
- logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the beginning")
- consumer.seekToBeginning(partitions)
- val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
- logDebug(s"Got earliest offsets for partition : $partitionOffsets")
- partitionOffsets
- }
- /**
- * Fetch the latest offset of partitions.
- */
- private def fetchLatestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
- // Poll to get the latest assigned partitions
- consumer.poll(0)
- val partitions = consumer.assignment()
- consumer.pause(partitions)
- logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.")
- consumer.seekToEnd(partitions)
- val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
- logDebug(s"Got latest offsets for partition : $partitionOffsets")
- partitionOffsets
- }
- /**
- * Fetch the earliest offsets for newly discovered partitions. The return result may not contain
- * some partitions if they are deleted.
- */
- private def fetchNewPartitionEarliestOffsets(
- newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] =
- if (newPartitions.isEmpty) {
- Map.empty[TopicPartition, Long]
- } else {
- withRetriesWithoutInterrupt {
- // Poll to get the latest assigned partitions
- consumer.poll(0)
- val partitions = consumer.assignment()
- consumer.pause(partitions)
- logDebug(s"\tPartitions assigned to consumer: $partitions")
- // Get the earliest offset of each partition
- consumer.seekToBeginning(partitions)
- val partitionOffsets = newPartitions.filter { p =>
- // When deleting topics happen at the same time, some partitions may not be in
- // `partitions`. So we need to ignore them
- partitions.contains(p)
- }.map(p => p -> consumer.position(p)).toMap
- logDebug(s"Got earliest offsets for new partitions: $partitionOffsets")
- partitionOffsets
- }
- }
- /**
- * Helper function that does multiple retries on the a body of code that returns offsets.
- * Retries are needed to handle transient failures. For e.g. race conditions between getting
- * assignment and getting position while topics/partitions are deleted can cause NPEs.
- *
- * This method also makes sure `body` won't be interrupted to workaround a potential issue in
- * `KafkaConsumer.poll`. (KAFKA-1894)
- */
- private def withRetriesWithoutInterrupt(
- body: => Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
- // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
- assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
- synchronized {
- var result: Option[Map[TopicPartition, Long]] = None
- var attempt = 1
- var lastException: Throwable = null
- while (result.isEmpty && attempt <= maxOffsetFetchAttempts
- && !Thread.currentThread().isInterrupted) {
- Thread.currentThread match {
- case ut: UninterruptibleThread =>
- // "KafkaConsumer.poll" may hang forever if the thread is interrupted (E.g., the query
- // is stopped)(KAFKA-1894). Hence, we just make sure we don't interrupt it.
- //
- // If the broker addresses are wrong, or Kafka cluster is down, "KafkaConsumer.poll" may
- // hang forever as well. This cannot be resolved in KafkaSource until Kafka fixes the
- // issue.
- ut.runUninterruptibly {
- try {
- result = Some(body)
- } catch {
- case NonFatal(e) =>
- lastException = e
- logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e)
- attempt += 1
- Thread.sleep(offsetFetchAttemptIntervalMs)
- resetConsumer()
- }
- }
- case _ =>
- throw new IllegalStateException(
- "Kafka APIs must be executed on a o.a.spark.util.UninterruptibleThread")
- }
- }
- if (Thread.interrupted()) {
- throw new InterruptedException()
- }
- if (result.isEmpty) {
- assert(attempt > maxOffsetFetchAttempts)
- assert(lastException != null)
- throw lastException
- }
- result.get
- }
- }
+ override def toString(): String = s"KafkaSource[$kafkaReader]"
* If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
@@ -506,10 +317,8 @@ private[kafka010] class KafkaSource(
/** Companion object for the [[KafkaSource]]. */
private[kafka010] object KafkaSource {
|Some data may have been lost because they are not available in Kafka any more; either the
@@ -526,57 +335,7 @@ private[kafka010] object KafkaSource {
| source option "failOnDataLoss" to "false".
- def kafkaSchema: StructType = StructType(Seq(
- StructField("key", BinaryType),
- StructField("value", BinaryType),
- StructField("topic", StringType),
- StructField("partition", IntegerType),
- StructField("offset", LongType),
- StructField("timestamp", TimestampType),
- StructField("timestampType", IntegerType)
- ))
- sealed trait ConsumerStrategy {
- def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
- }
- case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy {
- override def createConsumer(
- kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = {
- val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
- consumer.assign(ju.Arrays.asList(partitions: _*))
- consumer
- }
- override def toString: String = s"Assign[${partitions.mkString(", ")}]"
- }
- case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy {
- override def createConsumer(
- kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = {
- val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
- consumer.subscribe(topics.asJava)
- consumer
- }
- override def toString: String = s"Subscribe[${topics.mkString(", ")}]"
- }
- case class SubscribePatternStrategy(topicPattern: String)
- extends ConsumerStrategy {
- override def createConsumer(
- kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = {
- val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
- consumer.subscribe(
- ju.regex.Pattern.compile(topicPattern),
- new NoOpConsumerRebalanceListener())
- consumer
- }
- override def toString: String = s"SubscribePattern[$topicPattern]"
- }
- private def getSortedExecutorList(sc: SparkContext): Array[String] = {
+ def getSortedExecutorList(sc: SparkContext): Array[String] = {
val bm = sc.env.blockManager
.map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
@@ -588,5 +347,5 @@ private[kafka010] object KafkaSource {
if (a.host == b.host) { a.executorId > b.executorId } else { a.host > b.host }
- private def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b
+ def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b