aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala')
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala318
1 files changed, 318 insertions, 0 deletions
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
new file mode 100644
index 0000000000..acd1841d53
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ * A DStream where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
+ * of messages
+ * per second that each '''partition''' will accept.
+ * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+ * see [[LocationStrategy]] for more details.
+ * @param executorKafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
+ * configuration parameters</a>.
+ * Requires "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param consumerStrategy In most cases, pass in [[Subscribe]],
+ * see [[ConsumerStrategy]] for more details
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+private[spark] class DirectKafkaInputDStream[K, V](
+ _ssc: StreamingContext,
+ locationStrategy: LocationStrategy,
+ consumerStrategy: ConsumerStrategy[K, V]
+ ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {
+
+ val executorKafkaParams = {
+ val ekp = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams)
+ KafkaUtils.fixKafkaParams(ekp)
+ ekp
+ }
+
+ protected var currentOffsets = Map[TopicPartition, Long]()
+
+ @transient private var kc: Consumer[K, V] = null
+ def consumer(): Consumer[K, V] = this.synchronized {
+ if (null == kc) {
+ kc = consumerStrategy.onStart(currentOffsets)
+ }
+ kc
+ }
+
+ override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = {
+ logError("Kafka ConsumerRecord is not serializable. " +
+ "Use .map to extract fields before calling .persist or .window")
+ super.persist(newLevel)
+ }
+
+ protected def getBrokers = {
+ val c = consumer
+ val result = new ju.HashMap[TopicPartition, String]()
+ val hosts = new ju.HashMap[TopicPartition, String]()
+ val assignments = c.assignment().iterator()
+ while (assignments.hasNext()) {
+ val tp: TopicPartition = assignments.next()
+ if (null == hosts.get(tp)) {
+ val infos = c.partitionsFor(tp.topic).iterator()
+ while (infos.hasNext()) {
+ val i = infos.next()
+ hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host())
+ }
+ }
+ result.put(tp, hosts.get(tp))
+ }
+ result
+ }
+
+ protected def getPreferredHosts: ju.Map[TopicPartition, String] = {
+ locationStrategy match {
+ case PreferBrokers => getBrokers
+ case PreferConsistent => ju.Collections.emptyMap[TopicPartition, String]()
+ case PreferFixed(hostMap) => hostMap
+ }
+ }
+
+ // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
+ private[streaming] override def name: String = s"Kafka 0.10 direct stream [$id]"
+
+ protected[streaming] override val checkpointData =
+ new DirectKafkaInputDStreamCheckpointData
+
+
+ /**
+ * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
+ */
+ override protected[streaming] val rateController: Option[RateController] = {
+ if (RateController.isBackPressureEnabled(ssc.conf)) {
+ Some(new DirectKafkaRateController(id,
+ RateEstimator.create(ssc.conf, context.graph.batchDuration)))
+ } else {
+ None
+ }
+ }
+
+ private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
+ "spark.streaming.kafka.maxRatePerPartition", 0)
+
+ protected[streaming] def maxMessagesPerPartition(
+ offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = {
+ val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
+
+ // calculate a per-partition rate limit based on current lag
+ val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
+ case Some(rate) =>
+ val lagPerPartition = offsets.map { case (tp, offset) =>
+ tp -> Math.max(offset - currentOffsets(tp), 0)
+ }
+ val totalLag = lagPerPartition.values.sum
+
+ lagPerPartition.map { case (tp, lag) =>
+ val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
+ tp -> (if (maxRateLimitPerPartition > 0) {
+ Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
+ }
+ case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }
+ }
+
+ if (effectiveRateLimitPerPartition.values.sum > 0) {
+ val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
+ Some(effectiveRateLimitPerPartition.map {
+ case (tp, limit) => tp -> (secsPerBatch * limit).toLong
+ })
+ } else {
+ None
+ }
+ }
+
+ /**
+ * Returns the latest (highest) available offsets, taking new partitions into account.
+ */
+ protected def latestOffsets(): Map[TopicPartition, Long] = {
+ val c = consumer
+ c.poll(0)
+ val parts = c.assignment().asScala
+
+ // make sure new partitions are reflected in currentOffsets
+ val newPartitions = parts.diff(currentOffsets.keySet)
+ // position for new partitions determined by auto.offset.reset if no commit
+ currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
+ // don't want to consume messages, so pause
+ c.pause(newPartitions.asJava)
+ // find latest available offsets
+ c.seekToEnd(currentOffsets.keySet.asJava)
+ parts.map(tp => tp -> c.position(tp)).toMap
+ }
+
+ // limits the maximum number of messages per partition
+ protected def clamp(
+ offsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
+
+ maxMessagesPerPartition(offsets).map { mmp =>
+ mmp.map { case (tp, messages) =>
+ val uo = offsets(tp)
+ tp -> Math.min(currentOffsets(tp) + messages, uo)
+ }
+ }.getOrElse(offsets)
+ }
+
+ override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
+ val untilOffsets = clamp(latestOffsets())
+ val offsetRanges = untilOffsets.map { case (tp, uo) =>
+ val fo = currentOffsets(tp)
+ OffsetRange(tp.topic, tp.partition, fo, uo)
+ }
+ val rdd = new KafkaRDD[K, V](
+ context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)
+
+ // Report the record number and metadata of this batch interval to InputInfoTracker.
+ val description = offsetRanges.filter { offsetRange =>
+ // Don't display empty ranges.
+ offsetRange.fromOffset != offsetRange.untilOffset
+ }.map { offsetRange =>
+ s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
+ s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+ }.mkString("\n")
+ // Copy offsetRanges to immutable.List to prevent from being modified by the user
+ val metadata = Map(
+ "offsets" -> offsetRanges.toList,
+ StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
+ val inputInfo = StreamInputInfo(id, rdd.count, metadata)
+ ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
+
+ currentOffsets = untilOffsets
+ commitAll()
+ Some(rdd)
+ }
+
+ override def start(): Unit = {
+ val c = consumer
+ c.poll(0)
+ if (currentOffsets.isEmpty) {
+ currentOffsets = c.assignment().asScala.map { tp =>
+ tp -> c.position(tp)
+ }.toMap
+ }
+
+ // don't actually want to consume any messages, so pause all partitions
+ c.pause(currentOffsets.keySet.asJava)
+ }
+
+ override def stop(): Unit = this.synchronized {
+ if (kc != null) {
+ kc.close()
+ }
+ }
+
+ protected val commitQueue = new ConcurrentLinkedQueue[OffsetRange]
+ protected val commitCallback = new AtomicReference[OffsetCommitCallback]
+
+ /**
+ * Queue up offset ranges for commit to Kafka at a future time. Threadsafe.
+ * @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
+ */
+ def commitAsync(offsetRanges: Array[OffsetRange]): Unit = {
+ commitAsync(offsetRanges, null)
+ }
+
+ /**
+ * Queue up offset ranges for commit to Kafka at a future time. Threadsafe.
+ * @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
+ * @param callback Only the most recently provided callback will be used at commit.
+ */
+ def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit = {
+ commitCallback.set(callback)
+ commitQueue.addAll(ju.Arrays.asList(offsetRanges: _*))
+ }
+
+ protected def commitAll(): Unit = {
+ val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
+ val it = commitQueue.iterator()
+ while (it.hasNext) {
+ val osr = it.next
+ val tp = osr.topicPartition
+ val x = m.get(tp)
+ val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
+ m.put(tp, new OffsetAndMetadata(offset))
+ }
+ if (!m.isEmpty) {
+ consumer.commitAsync(m, commitCallback.get)
+ }
+ }
+
+ private[streaming]
+ class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
+ def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
+ data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
+ }
+
+ override def update(time: Time): Unit = {
+ batchForTime.clear()
+ generatedRDDs.foreach { kv =>
+ val a = kv._2.asInstanceOf[KafkaRDD[K, V]].offsetRanges.map(_.toTuple).toArray
+ batchForTime += kv._1 -> a
+ }
+ }
+
+ override def cleanup(time: Time): Unit = { }
+
+ override def restore(): Unit = {
+ batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
+ logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
+ generatedRDDs += t -> new KafkaRDD[K, V](
+ context.sparkContext,
+ executorKafkaParams,
+ b.map(OffsetRange(_)),
+ getPreferredHosts,
+ // during restore, it's possible same partition will be consumed from multiple
+ // threads, so dont use cache
+ false
+ )
+ }
+ }
+ }
+
+ /**
+ * A RateController to retrieve the rate from RateEstimator.
+ */
+ private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator)
+ extends RateController(id, estimator) {
+ override def publish(rate: Long): Unit = ()
+ }
+}