diff options
Diffstat (limited to 'external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala')
-rw-r--r-- | external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala | 323 |
1 files changed, 41 insertions, 282 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 8e6c93e672..02b23111af 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -21,11 +21,6 @@ import java.{util => ju} import java.io._ import java.nio.charset.StandardCharsets -import scala.collection.JavaConverters._ -import scala.util.control.NonFatal - -import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer} -import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkContext @@ -38,11 +33,9 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.kafka010.KafkaSource._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String -import org.apache.spark.util.UninterruptibleThread /** - * A [[Source]] that uses Kafka's own [[KafkaConsumer]] API to reads data from Kafka. The design - * for this source is as follows. + * A [[Source]] that reads data from Kafka using the following design. * * - The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For @@ -50,20 +43,14 @@ import org.apache.spark.util.UninterruptibleThread * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent * with the semantics of `KafkaConsumer.position()`. * - * - The [[ConsumerStrategy]] class defines which Kafka topics and partitions should be read - * by this source. These strategies directly correspond to the different consumption options - * in . This class is designed to return a configured [[KafkaConsumer]] that is used by the - * [[KafkaSource]] to query for the offsets. See the docs on - * [[org.apache.spark.sql.kafka010.KafkaSource.ConsumerStrategy]] for more details. - * * - The [[KafkaSource]] written to do the following. * - * - As soon as the source is created, the pre-configured KafkaConsumer returned by the - * [[ConsumerStrategy]] is used to query the initial offsets that this source should - * start reading from. This used to create the first batch. + * - As soon as the source is created, the pre-configured [[KafkaOffsetReader]] + * is used to query the initial offsets that this source should + * start reading from. This is used to create the first batch. * - * - `getOffset()` uses the KafkaConsumer to query the latest available offsets, which are - * returned as a [[KafkaSourceOffset]]. + * - `getOffset()` uses the [[KafkaOffsetReader]] to query the latest + * available offsets, which are returned as a [[KafkaSourceOffset]]. * * - `getBatch()` returns a DF that reads from the 'start offset' until the 'end offset' in * for each partition. The end offset is excluded to be consistent with the semantics of @@ -82,15 +69,13 @@ import org.apache.spark.util.UninterruptibleThread * and not use wrong broker addresses. */ private[kafka010] class KafkaSource( - sqlContext: SQLContext, - consumerStrategy: ConsumerStrategy, - driverKafkaParams: ju.Map[String, Object], - executorKafkaParams: ju.Map[String, Object], - sourceOptions: Map[String, String], - metadataPath: String, - startingOffsets: StartingOffsets, - failOnDataLoss: Boolean, - driverGroupIdPrefix: String) + sqlContext: SQLContext, + kafkaReader: KafkaOffsetReader, + executorKafkaParams: ju.Map[String, Object], + sourceOptions: Map[String, String], + metadataPath: String, + startingOffsets: KafkaOffsetRangeLimit, + failOnDataLoss: Boolean) extends Source with Logging { private val sc = sqlContext.sparkContext @@ -100,41 +85,9 @@ private[kafka010] class KafkaSource( sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString ).toLong - private val maxOffsetFetchAttempts = - sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt - - private val offsetFetchAttemptIntervalMs = - sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong - private val maxOffsetsPerTrigger = sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) - private var groupId: String = null - - private var nextId = 0 - - private def nextGroupId(): String = { - groupId = driverGroupIdPrefix + "-" + nextId - nextId += 1 - groupId - } - - /** - * A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the - * offsets and never commits them. - */ - private var consumer: Consumer[Array[Byte], Array[Byte]] = createConsumer() - - /** - * Create a consumer using the new generated group id. We always use a new consumer to avoid - * just using a broken consumer to retry on Kafka errors, which likely will fail again. - */ - private def createConsumer(): Consumer[Array[Byte], Array[Byte]] = synchronized { - val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams) - newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId()) - consumerStrategy.createConsumer(newKafkaParams) - } - /** * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only * called in StreamExecutionThread. Otherwise, interrupting a thread while running @@ -159,9 +112,9 @@ private[kafka010] class KafkaSource( metadataLog.get(0).getOrElse { val offsets = startingOffsets match { - case EarliestOffsets => KafkaSourceOffset(fetchEarliestOffsets()) - case LatestOffsets => KafkaSourceOffset(fetchLatestOffsets()) - case SpecificOffsets(p) => KafkaSourceOffset(fetchSpecificStartingOffsets(p)) + case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) + case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) + case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) } metadataLog.add(0, offsets) logInfo(s"Initial offsets: $offsets") @@ -169,16 +122,31 @@ private[kafka010] class KafkaSource( }.partitionToOffsets } + private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { + val result = kafkaReader.fetchSpecificOffsets(specificOffsets) + specificOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && + off != KafkaOffsetRangeLimit.EARLIEST => + if (result(tp) != off) { + reportDataLoss( + s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") + } + case _ => + // no real way to check that beginning or end is reasonable + } + KafkaSourceOffset(result) + } + private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None - override def schema: StructType = KafkaSource.kafkaSchema + override def schema: StructType = KafkaOffsetReader.kafkaSchema /** Returns the maximum available offset for this source. */ override def getOffset: Option[Offset] = { // Make sure initialPartitionOffsets is initialized initialPartitionOffsets - val latest = fetchLatestOffsets() + val latest = kafkaReader.fetchLatestOffsets() val offsets = maxOffsetsPerTrigger match { case None => latest @@ -193,17 +161,12 @@ private[kafka010] class KafkaSource( Some(KafkaSourceOffset(offsets)) } - private def resetConsumer(): Unit = synchronized { - consumer.close() - consumer = createConsumer() - } - /** Proportionally distribute limit number of offsets among topicpartitions */ private def rateLimit( limit: Long, from: Map[TopicPartition, Long], until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { - val fromNew = fetchNewPartitionEarliestOffsets(until.keySet.diff(from.keySet).toSeq) + val fromNew = kafkaReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq) val sizes = until.flatMap { case (tp, end) => // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it @@ -253,7 +216,7 @@ private[kafka010] class KafkaSource( // Find the new partitions, and get their earliest offsets val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet) - val newPartitionOffsets = fetchNewPartitionEarliestOffsets(newPartitions.toSeq) + val newPartitionOffsets = kafkaReader.fetchEarliestOffsets(newPartitions.toSeq) if (newPartitionOffsets.keySet != newPartitions) { // We cannot get from offsets for some partitions. It means they got deleted. val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet) @@ -311,7 +274,8 @@ private[kafka010] class KafkaSource( // Create an RDD that reads from Kafka and get the (key, value) pair as byte arrays. val rdd = new KafkaSourceRDD( - sc, executorKafkaParams, offsetRanges, pollTimeoutMs, failOnDataLoss).map { cr => + sc, executorKafkaParams, offsetRanges, pollTimeoutMs, failOnDataLoss, + reuseKafkaConsumer = true).map { cr => InternalRow( cr.key, cr.value, @@ -335,163 +299,10 @@ private[kafka010] class KafkaSource( /** Stop this source and free any resources it has allocated. */ override def stop(): Unit = synchronized { - consumer.close() + kafkaReader.close() } - override def toString(): String = s"KafkaSource[$consumerStrategy]" - - /** - * Set consumer position to specified offsets, making sure all assignments are set. - */ - private def fetchSpecificStartingOffsets( - partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { - val result = withRetriesWithoutInterrupt { - // Poll to get the latest assigned partitions - consumer.poll(0) - val partitions = consumer.assignment() - consumer.pause(partitions) - assert(partitions.asScala == partitionOffsets.keySet, - "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + - "Use -1 for latest, -2 for earliest, if you don't care.\n" + - s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") - logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") - - partitionOffsets.foreach { - case (tp, -1) => consumer.seekToEnd(ju.Arrays.asList(tp)) - case (tp, -2) => consumer.seekToBeginning(ju.Arrays.asList(tp)) - case (tp, off) => consumer.seek(tp, off) - } - partitionOffsets.map { - case (tp, _) => tp -> consumer.position(tp) - } - } - partitionOffsets.foreach { - case (tp, off) if off != -1 && off != -2 => - if (result(tp) != off) { - reportDataLoss( - s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") - } - case _ => - // no real way to check that beginning or end is reasonable - } - result - } - - /** - * Fetch the earliest offsets of partitions. - */ - private def fetchEarliestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt { - // Poll to get the latest assigned partitions - consumer.poll(0) - val partitions = consumer.assignment() - consumer.pause(partitions) - logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the beginning") - - consumer.seekToBeginning(partitions) - val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap - logDebug(s"Got earliest offsets for partition : $partitionOffsets") - partitionOffsets - } - - /** - * Fetch the latest offset of partitions. - */ - private def fetchLatestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt { - // Poll to get the latest assigned partitions - consumer.poll(0) - val partitions = consumer.assignment() - consumer.pause(partitions) - logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.") - - consumer.seekToEnd(partitions) - val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap - logDebug(s"Got latest offsets for partition : $partitionOffsets") - partitionOffsets - } - - /** - * Fetch the earliest offsets for newly discovered partitions. The return result may not contain - * some partitions if they are deleted. - */ - private def fetchNewPartitionEarliestOffsets( - newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = - if (newPartitions.isEmpty) { - Map.empty[TopicPartition, Long] - } else { - withRetriesWithoutInterrupt { - // Poll to get the latest assigned partitions - consumer.poll(0) - val partitions = consumer.assignment() - consumer.pause(partitions) - logDebug(s"\tPartitions assigned to consumer: $partitions") - - // Get the earliest offset of each partition - consumer.seekToBeginning(partitions) - val partitionOffsets = newPartitions.filter { p => - // When deleting topics happen at the same time, some partitions may not be in - // `partitions`. So we need to ignore them - partitions.contains(p) - }.map(p => p -> consumer.position(p)).toMap - logDebug(s"Got earliest offsets for new partitions: $partitionOffsets") - partitionOffsets - } - } - - /** - * Helper function that does multiple retries on the a body of code that returns offsets. - * Retries are needed to handle transient failures. For e.g. race conditions between getting - * assignment and getting position while topics/partitions are deleted can cause NPEs. - * - * This method also makes sure `body` won't be interrupted to workaround a potential issue in - * `KafkaConsumer.poll`. (KAFKA-1894) - */ - private def withRetriesWithoutInterrupt( - body: => Map[TopicPartition, Long]): Map[TopicPartition, Long] = { - // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894) - assert(Thread.currentThread().isInstanceOf[StreamExecutionThread]) - - synchronized { - var result: Option[Map[TopicPartition, Long]] = None - var attempt = 1 - var lastException: Throwable = null - while (result.isEmpty && attempt <= maxOffsetFetchAttempts - && !Thread.currentThread().isInterrupted) { - Thread.currentThread match { - case ut: UninterruptibleThread => - // "KafkaConsumer.poll" may hang forever if the thread is interrupted (E.g., the query - // is stopped)(KAFKA-1894). Hence, we just make sure we don't interrupt it. - // - // If the broker addresses are wrong, or Kafka cluster is down, "KafkaConsumer.poll" may - // hang forever as well. This cannot be resolved in KafkaSource until Kafka fixes the - // issue. - ut.runUninterruptibly { - try { - result = Some(body) - } catch { - case NonFatal(e) => - lastException = e - logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e) - attempt += 1 - Thread.sleep(offsetFetchAttemptIntervalMs) - resetConsumer() - } - } - case _ => - throw new IllegalStateException( - "Kafka APIs must be executed on a o.a.spark.util.UninterruptibleThread") - } - } - if (Thread.interrupted()) { - throw new InterruptedException() - } - if (result.isEmpty) { - assert(attempt > maxOffsetFetchAttempts) - assert(lastException != null) - throw lastException - } - result.get - } - } + override def toString(): String = s"KafkaSource[$kafkaReader]" /** * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`. @@ -506,10 +317,8 @@ private[kafka010] class KafkaSource( } } - /** Companion object for the [[KafkaSource]]. */ private[kafka010] object KafkaSource { - val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE = """ |Some data may have been lost because they are not available in Kafka any more; either the @@ -526,57 +335,7 @@ private[kafka010] object KafkaSource { | source option "failOnDataLoss" to "false". """.stripMargin - def kafkaSchema: StructType = StructType(Seq( - StructField("key", BinaryType), - StructField("value", BinaryType), - StructField("topic", StringType), - StructField("partition", IntegerType), - StructField("offset", LongType), - StructField("timestamp", TimestampType), - StructField("timestampType", IntegerType) - )) - - sealed trait ConsumerStrategy { - def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] - } - - case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy { - override def createConsumer( - kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = { - val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams) - consumer.assign(ju.Arrays.asList(partitions: _*)) - consumer - } - - override def toString: String = s"Assign[${partitions.mkString(", ")}]" - } - - case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy { - override def createConsumer( - kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = { - val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams) - consumer.subscribe(topics.asJava) - consumer - } - - override def toString: String = s"Subscribe[${topics.mkString(", ")}]" - } - - case class SubscribePatternStrategy(topicPattern: String) - extends ConsumerStrategy { - override def createConsumer( - kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = { - val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams) - consumer.subscribe( - ju.regex.Pattern.compile(topicPattern), - new NoOpConsumerRebalanceListener()) - consumer - } - - override def toString: String = s"SubscribePattern[$topicPattern]" - } - - private def getSortedExecutorList(sc: SparkContext): Array[String] = { + def getSortedExecutorList(sc: SparkContext): Array[String] = { val bm = sc.env.blockManager bm.master.getPeers(bm.blockManagerId).toArray .map(x => ExecutorCacheTaskLocation(x.host, x.executorId)) @@ -588,5 +347,5 @@ private[kafka010] object KafkaSource { if (a.host == b.host) { a.executorId > b.executorId } else { a.host > b.host } } - private def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b + def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b } |