aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-8/src/main/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-8/src/main/scala/org')
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala66
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala227
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala425
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala142
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala269
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala42
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala275
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala805
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala109
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala302
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java21
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala23
12 files changed, 2706 insertions, 0 deletions
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
new file mode 100644
index 0000000000..9159051ba0
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * Represents the host and port info for a Kafka broker.
+ * Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID.
+ */
+final class Broker private(
+ /** Broker's hostname */
+ val host: String,
+ /** Broker's port */
+ val port: Int) extends Serializable {
+ override def equals(obj: Any): Boolean = obj match {
+ case that: Broker =>
+ this.host == that.host &&
+ this.port == that.port
+ case _ => false
+ }
+
+ override def hashCode: Int = {
+ 41 * (41 + host.hashCode) + port
+ }
+
+ override def toString(): String = {
+ s"Broker($host, $port)"
+ }
+}
+
+/**
+ * :: Experimental ::
+ * Companion object that provides methods to create instances of [[Broker]].
+ */
+@Experimental
+object Broker {
+ def create(host: String, port: Int): Broker =
+ new Broker(host, port)
+
+ def apply(host: String, port: Int): Broker =
+ new Broker(host, port)
+
+ def unapply(broker: Broker): Option[(String, Int)] = {
+ if (broker == null) {
+ None
+ } else {
+ Some((broker.host, broker.port))
+ }
+ }
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
new file mode 100644
index 0000000000..fb58ed7898
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
+ * of messages
+ * per second that each '''partition''' will accept.
+ * Starting offsets are specified in advance,
+ * and this DStream is not responsible for committing offsets,
+ * so that you can control exactly-once semantics.
+ * For an easy interface to Kafka-managed offsets,
+ * see {@link org.apache.spark.streaming.kafka.KafkaCluster}
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>.
+ * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the stream
+ * @param messageHandler function for translating each message into the desired type
+ */
+private[streaming]
+class DirectKafkaInputDStream[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[K]: ClassTag,
+ T <: Decoder[V]: ClassTag,
+ R: ClassTag](
+ _ssc: StreamingContext,
+ val kafkaParams: Map[String, String],
+ val fromOffsets: Map[TopicAndPartition, Long],
+ messageHandler: MessageAndMetadata[K, V] => R
+ ) extends InputDStream[R](_ssc) with Logging {
+ val maxRetries = context.sparkContext.getConf.getInt(
+ "spark.streaming.kafka.maxRetries", 1)
+
+ // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
+ private[streaming] override def name: String = s"Kafka direct stream [$id]"
+
+ protected[streaming] override val checkpointData =
+ new DirectKafkaInputDStreamCheckpointData
+
+
+ /**
+ * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
+ */
+ override protected[streaming] val rateController: Option[RateController] = {
+ if (RateController.isBackPressureEnabled(ssc.conf)) {
+ Some(new DirectKafkaRateController(id,
+ RateEstimator.create(ssc.conf, context.graph.batchDuration)))
+ } else {
+ None
+ }
+ }
+
+ protected val kc = new KafkaCluster(kafkaParams)
+
+ private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
+ "spark.streaming.kafka.maxRatePerPartition", 0)
+
+ protected[streaming] def maxMessagesPerPartition(
+ offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = {
+ val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
+
+ // calculate a per-partition rate limit based on current lag
+ val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
+ case Some(rate) =>
+ val lagPerPartition = offsets.map { case (tp, offset) =>
+ tp -> Math.max(offset - currentOffsets(tp), 0)
+ }
+ val totalLag = lagPerPartition.values.sum
+
+ lagPerPartition.map { case (tp, lag) =>
+ val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
+ tp -> (if (maxRateLimitPerPartition > 0) {
+ Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
+ }
+ case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }
+ }
+
+ if (effectiveRateLimitPerPartition.values.sum > 0) {
+ val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
+ Some(effectiveRateLimitPerPartition.map {
+ case (tp, limit) => tp -> (secsPerBatch * limit).toLong
+ })
+ } else {
+ None
+ }
+ }
+
+ protected var currentOffsets = fromOffsets
+
+ @tailrec
+ protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
+ val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
+ // Either.fold would confuse @tailrec, do it manually
+ if (o.isLeft) {
+ val err = o.left.get.toString
+ if (retries <= 0) {
+ throw new SparkException(err)
+ } else {
+ log.error(err)
+ Thread.sleep(kc.config.refreshLeaderBackoffMs)
+ latestLeaderOffsets(retries - 1)
+ }
+ } else {
+ o.right.get
+ }
+ }
+
+ // limits the maximum number of messages per partition
+ protected def clamp(
+ leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
+ val offsets = leaderOffsets.mapValues(lo => lo.offset)
+
+ maxMessagesPerPartition(offsets).map { mmp =>
+ mmp.map { case (tp, messages) =>
+ val lo = leaderOffsets(tp)
+ tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, lo.offset))
+ }
+ }.getOrElse(leaderOffsets)
+ }
+
+ override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
+ val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
+ val rdd = KafkaRDD[K, V, U, T, R](
+ context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
+
+ // Report the record number and metadata of this batch interval to InputInfoTracker.
+ val offsetRanges = currentOffsets.map { case (tp, fo) =>
+ val uo = untilOffsets(tp)
+ OffsetRange(tp.topic, tp.partition, fo, uo.offset)
+ }
+ val description = offsetRanges.filter { offsetRange =>
+ // Don't display empty ranges.
+ offsetRange.fromOffset != offsetRange.untilOffset
+ }.map { offsetRange =>
+ s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
+ s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+ }.mkString("\n")
+ // Copy offsetRanges to immutable.List to prevent from being modified by the user
+ val metadata = Map(
+ "offsets" -> offsetRanges.toList,
+ StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
+ val inputInfo = StreamInputInfo(id, rdd.count, metadata)
+ ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
+
+ currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
+ Some(rdd)
+ }
+
+ override def start(): Unit = {
+ }
+
+ def stop(): Unit = {
+ }
+
+ private[streaming]
+ class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
+ def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
+ data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
+ }
+
+ override def update(time: Time) {
+ batchForTime.clear()
+ generatedRDDs.foreach { kv =>
+ val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray
+ batchForTime += kv._1 -> a
+ }
+ }
+
+ override def cleanup(time: Time) { }
+
+ override def restore() {
+ // this is assuming that the topics don't change during execution, which is true currently
+ val topics = fromOffsets.keySet
+ val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))
+
+ batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
+ logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
+ generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
+ context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler)
+ }
+ }
+ }
+
+ /**
+ * A RateController to retrieve the rate from RateEstimator.
+ */
+ private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator)
+ extends RateController(id, estimator) {
+ override def publish(rate: Long): Unit = ()
+ }
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
new file mode 100644
index 0000000000..726b5d8ec3
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+import scala.util.control.NonFatal
+
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Convenience methods for interacting with a Kafka cluster.
+ * See <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">
+ * A Guide To The Kafka Protocol</a> for more details on individual api calls.
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>.
+ * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+@DeveloperApi
+class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
+ import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig}
+
+ // ConsumerConfig isn't serializable
+ @transient private var _config: SimpleConsumerConfig = null
+
+ def config: SimpleConsumerConfig = this.synchronized {
+ if (_config == null) {
+ _config = SimpleConsumerConfig(kafkaParams)
+ }
+ _config
+ }
+
+ def connect(host: String, port: Int): SimpleConsumer =
+ new SimpleConsumer(host, port, config.socketTimeoutMs,
+ config.socketReceiveBufferBytes, config.clientId)
+
+ def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] =
+ findLeader(topic, partition).right.map(hp => connect(hp._1, hp._2))
+
+ // Metadata api
+ // scalastyle:off
+ // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+ // scalastyle:on
+
+ def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = {
+ val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+ 0, config.clientId, Seq(topic))
+ val errs = new Err
+ withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+ val resp: TopicMetadataResponse = consumer.send(req)
+ resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata =>
+ tm.partitionsMetadata.find(_.partitionId == partition)
+ }.foreach { pm: PartitionMetadata =>
+ pm.leader.foreach { leader =>
+ return Right((leader.host, leader.port))
+ }
+ }
+ }
+ Left(errs)
+ }
+
+ def findLeaders(
+ topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+ val topics = topicAndPartitions.map(_.topic)
+ val response = getPartitionMetadata(topics).right
+ val answer = response.flatMap { tms: Set[TopicMetadata] =>
+ val leaderMap = tms.flatMap { tm: TopicMetadata =>
+ tm.partitionsMetadata.flatMap { pm: PartitionMetadata =>
+ val tp = TopicAndPartition(tm.topic, pm.partitionId)
+ if (topicAndPartitions(tp)) {
+ pm.leader.map { l =>
+ tp -> (l.host -> l.port)
+ }
+ } else {
+ None
+ }
+ }
+ }.toMap
+
+ if (leaderMap.keys.size == topicAndPartitions.size) {
+ Right(leaderMap)
+ } else {
+ val missing = topicAndPartitions.diff(leaderMap.keySet)
+ val err = new Err
+ err.append(new SparkException(s"Couldn't find leaders for ${missing}"))
+ Left(err)
+ }
+ }
+ answer
+ }
+
+ def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] = {
+ getPartitionMetadata(topics).right.map { r =>
+ r.flatMap { tm: TopicMetadata =>
+ tm.partitionsMetadata.map { pm: PartitionMetadata =>
+ TopicAndPartition(tm.topic, pm.partitionId)
+ }
+ }
+ }
+ }
+
+ def getPartitionMetadata(topics: Set[String]): Either[Err, Set[TopicMetadata]] = {
+ val req = TopicMetadataRequest(
+ TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq)
+ val errs = new Err
+ withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+ val resp: TopicMetadataResponse = consumer.send(req)
+ val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError)
+
+ if (respErrs.isEmpty) {
+ return Right(resp.topicsMetadata.toSet)
+ } else {
+ respErrs.foreach { m =>
+ val cause = ErrorMapping.exceptionFor(m.errorCode)
+ val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?"
+ errs.append(new SparkException(msg, cause))
+ }
+ }
+ }
+ Left(errs)
+ }
+
+ // Leader offset api
+ // scalastyle:off
+ // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
+ // scalastyle:on
+
+ def getLatestLeaderOffsets(
+ topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
+ getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)
+
+ def getEarliestLeaderOffsets(
+ topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
+ getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime)
+
+ def getLeaderOffsets(
+ topicAndPartitions: Set[TopicAndPartition],
+ before: Long
+ ): Either[Err, Map[TopicAndPartition, LeaderOffset]] = {
+ getLeaderOffsets(topicAndPartitions, before, 1).right.map { r =>
+ r.map { kv =>
+ // mapValues isn't serializable, see SI-7005
+ kv._1 -> kv._2.head
+ }
+ }
+ }
+
+ private def flip[K, V](m: Map[K, V]): Map[V, Seq[K]] =
+ m.groupBy(_._2).map { kv =>
+ kv._1 -> kv._2.keys.toSeq
+ }
+
+ def getLeaderOffsets(
+ topicAndPartitions: Set[TopicAndPartition],
+ before: Long,
+ maxNumOffsets: Int
+ ): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = {
+ findLeaders(topicAndPartitions).right.flatMap { tpToLeader =>
+ val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader)
+ val leaders = leaderToTp.keys
+ var result = Map[TopicAndPartition, Seq[LeaderOffset]]()
+ val errs = new Err
+ withBrokers(leaders, errs) { consumer =>
+ val partitionsToGetOffsets: Seq[TopicAndPartition] =
+ leaderToTp((consumer.host, consumer.port))
+ val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition =>
+ tp -> PartitionOffsetRequestInfo(before, maxNumOffsets)
+ }.toMap
+ val req = OffsetRequest(reqMap)
+ val resp = consumer.getOffsetsBefore(req)
+ val respMap = resp.partitionErrorAndOffsets
+ partitionsToGetOffsets.foreach { tp: TopicAndPartition =>
+ respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
+ if (por.error == ErrorMapping.NoError) {
+ if (por.offsets.nonEmpty) {
+ result += tp -> por.offsets.map { off =>
+ LeaderOffset(consumer.host, consumer.port, off)
+ }
+ } else {
+ errs.append(new SparkException(
+ s"Empty offsets for ${tp}, is ${before} before log beginning?"))
+ }
+ } else {
+ errs.append(ErrorMapping.exceptionFor(por.error))
+ }
+ }
+ }
+ if (result.keys.size == topicAndPartitions.size) {
+ return Right(result)
+ }
+ }
+ val missing = topicAndPartitions.diff(result.keySet)
+ errs.append(new SparkException(s"Couldn't find leader offsets for ${missing}"))
+ Left(errs)
+ }
+ }
+
+ // Consumer offset api
+ // scalastyle:off
+ // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
+ // scalastyle:on
+
+ // this 0 here indicates api version, in this case the original ZK backed api.
+ private def defaultConsumerApiVersion: Short = 0
+
+ /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
+ def getConsumerOffsets(
+ groupId: String,
+ topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, Long]] =
+ getConsumerOffsets(groupId, topicAndPartitions, defaultConsumerApiVersion)
+
+ def getConsumerOffsets(
+ groupId: String,
+ topicAndPartitions: Set[TopicAndPartition],
+ consumerApiVersion: Short
+ ): Either[Err, Map[TopicAndPartition, Long]] = {
+ getConsumerOffsetMetadata(groupId, topicAndPartitions, consumerApiVersion).right.map { r =>
+ r.map { kv =>
+ kv._1 -> kv._2.offset
+ }
+ }
+ }
+
+ /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
+ def getConsumerOffsetMetadata(
+ groupId: String,
+ topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] =
+ getConsumerOffsetMetadata(groupId, topicAndPartitions, defaultConsumerApiVersion)
+
+ def getConsumerOffsetMetadata(
+ groupId: String,
+ topicAndPartitions: Set[TopicAndPartition],
+ consumerApiVersion: Short
+ ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
+ var result = Map[TopicAndPartition, OffsetMetadataAndError]()
+ val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, consumerApiVersion)
+ val errs = new Err
+ withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+ val resp = consumer.fetchOffsets(req)
+ val respMap = resp.requestInfo
+ val needed = topicAndPartitions.diff(result.keySet)
+ needed.foreach { tp: TopicAndPartition =>
+ respMap.get(tp).foreach { ome: OffsetMetadataAndError =>
+ if (ome.error == ErrorMapping.NoError) {
+ result += tp -> ome
+ } else {
+ errs.append(ErrorMapping.exceptionFor(ome.error))
+ }
+ }
+ }
+ if (result.keys.size == topicAndPartitions.size) {
+ return Right(result)
+ }
+ }
+ val missing = topicAndPartitions.diff(result.keySet)
+ errs.append(new SparkException(s"Couldn't find consumer offsets for ${missing}"))
+ Left(errs)
+ }
+
+ /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
+ def setConsumerOffsets(
+ groupId: String,
+ offsets: Map[TopicAndPartition, Long]
+ ): Either[Err, Map[TopicAndPartition, Short]] =
+ setConsumerOffsets(groupId, offsets, defaultConsumerApiVersion)
+
+ def setConsumerOffsets(
+ groupId: String,
+ offsets: Map[TopicAndPartition, Long],
+ consumerApiVersion: Short
+ ): Either[Err, Map[TopicAndPartition, Short]] = {
+ val meta = offsets.map { kv =>
+ kv._1 -> OffsetAndMetadata(kv._2)
+ }
+ setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
+ }
+
+ /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
+ def setConsumerOffsetMetadata(
+ groupId: String,
+ metadata: Map[TopicAndPartition, OffsetAndMetadata]
+ ): Either[Err, Map[TopicAndPartition, Short]] =
+ setConsumerOffsetMetadata(groupId, metadata, defaultConsumerApiVersion)
+
+ def setConsumerOffsetMetadata(
+ groupId: String,
+ metadata: Map[TopicAndPartition, OffsetAndMetadata],
+ consumerApiVersion: Short
+ ): Either[Err, Map[TopicAndPartition, Short]] = {
+ var result = Map[TopicAndPartition, Short]()
+ val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
+ val errs = new Err
+ val topicAndPartitions = metadata.keySet
+ withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+ val resp = consumer.commitOffsets(req)
+ val respMap = resp.commitStatus
+ val needed = topicAndPartitions.diff(result.keySet)
+ needed.foreach { tp: TopicAndPartition =>
+ respMap.get(tp).foreach { err: Short =>
+ if (err == ErrorMapping.NoError) {
+ result += tp -> err
+ } else {
+ errs.append(ErrorMapping.exceptionFor(err))
+ }
+ }
+ }
+ if (result.keys.size == topicAndPartitions.size) {
+ return Right(result)
+ }
+ }
+ val missing = topicAndPartitions.diff(result.keySet)
+ errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
+ Left(errs)
+ }
+
+ // Try a call against potentially multiple brokers, accumulating errors
+ private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
+ (fn: SimpleConsumer => Any): Unit = {
+ brokers.foreach { hp =>
+ var consumer: SimpleConsumer = null
+ try {
+ consumer = connect(hp._1, hp._2)
+ fn(consumer)
+ } catch {
+ case NonFatal(e) =>
+ errs.append(e)
+ } finally {
+ if (consumer != null) {
+ consumer.close()
+ }
+ }
+ }
+ }
+}
+
+@DeveloperApi
+object KafkaCluster {
+ type Err = ArrayBuffer[Throwable]
+
+ /** If the result is right, return it, otherwise throw SparkException */
+ def checkErrors[T](result: Either[Err, T]): T = {
+ result.fold(
+ errs => throw new SparkException(errs.mkString("\n")),
+ ok => ok
+ )
+ }
+
+ case class LeaderOffset(host: String, port: Int, offset: Long)
+
+ /**
+ * High-level kafka consumers connect to ZK. ConsumerConfig assumes this use case.
+ * Simple consumers connect directly to brokers, but need many of the same configs.
+ * This subclass won't warn about missing ZK params, or presence of broker params.
+ */
+ class SimpleConsumerConfig private(brokers: String, originalProps: Properties)
+ extends ConsumerConfig(originalProps) {
+ val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
+ val hpa = hp.split(":")
+ if (hpa.size == 1) {
+ throw new SparkException(s"Broker not in the correct format of <host>:<port> [$brokers]")
+ }
+ (hpa(0), hpa(1).toInt)
+ }
+ }
+
+ object SimpleConsumerConfig {
+ /**
+ * Make a consumer config without requiring group.id or zookeeper.connect,
+ * since communicating with brokers also needs common settings such as timeout
+ */
+ def apply(kafkaParams: Map[String, String]): SimpleConsumerConfig = {
+ // These keys are from other pre-existing kafka configs for specifying brokers, accept either
+ val brokers = kafkaParams.get("metadata.broker.list")
+ .orElse(kafkaParams.get("bootstrap.servers"))
+ .getOrElse(throw new SparkException(
+ "Must specify metadata.broker.list or bootstrap.servers"))
+
+ val props = new Properties()
+ kafkaParams.foreach { case (key, value) =>
+ // prevent warnings on parameters ConsumerConfig doesn't know about
+ if (key != "metadata.broker.list" && key != "bootstrap.servers") {
+ props.put(key, value)
+ }
+ }
+
+ Seq("zookeeper.connect", "group.id").foreach { s =>
+ if (!props.containsKey(s)) {
+ props.setProperty(s, "")
+ }
+ }
+
+ new SimpleConsumerConfig(brokers, props)
+ }
+ }
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
new file mode 100644
index 0000000000..3713bda41b
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.util.Properties
+
+import scala.collection.Map
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Input stream that pulls messages from a Kafka Broker.
+ *
+ * @param kafkaParams Map of kafka configuration parameters.
+ * See: http://kafka.apache.org/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel RDD storage level.
+ */
+private[streaming]
+class KafkaInputDStream[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag](
+ _ssc: StreamingContext,
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ useReliableReceiver: Boolean,
+ storageLevel: StorageLevel
+ ) extends ReceiverInputDStream[(K, V)](_ssc) with Logging {
+
+ def getReceiver(): Receiver[(K, V)] = {
+ if (!useReliableReceiver) {
+ new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
+ } else {
+ new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
+ }
+ }
+}
+
+private[streaming]
+class KafkaReceiver[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
+ ) extends Receiver[(K, V)](storageLevel) with Logging {
+
+ // Connection to Kafka
+ var consumerConnector: ConsumerConnector = null
+
+ def onStop() {
+ if (consumerConnector != null) {
+ consumerConnector.shutdown()
+ consumerConnector = null
+ }
+ }
+
+ def onStart() {
+
+ logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
+
+ // Kafka connection properties
+ val props = new Properties()
+ kafkaParams.foreach(param => props.put(param._1, param._2))
+
+ val zkConnect = kafkaParams("zookeeper.connect")
+ // Create the connection to the cluster
+ logInfo("Connecting to Zookeeper: " + zkConnect)
+ val consumerConfig = new ConsumerConfig(props)
+ consumerConnector = Consumer.create(consumerConfig)
+ logInfo("Connected to " + zkConnect)
+
+ val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[K]]
+ val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[V]]
+
+ // Create threads for each topic/message Stream we are listening
+ val topicMessageStreams = consumerConnector.createMessageStreams(
+ topics, keyDecoder, valueDecoder)
+
+ val executorPool =
+ ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
+ try {
+ // Start the messages handler for each partition
+ topicMessageStreams.values.foreach { streams =>
+ streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
+ }
+ } finally {
+ executorPool.shutdown() // Just causes threads to terminate after work is done
+ }
+ }
+
+ // Handles Kafka messages
+ private class MessageHandler(stream: KafkaStream[K, V])
+ extends Runnable {
+ def run() {
+ logInfo("Starting MessageHandler.")
+ try {
+ val streamIterator = stream.iterator()
+ while (streamIterator.hasNext()) {
+ val msgAndMetadata = streamIterator.next()
+ store((msgAndMetadata.key, msgAndMetadata.message))
+ }
+ } catch {
+ case e: Throwable => reportError("Error handling message; exiting", e)
+ }
+ }
+ }
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
new file mode 100644
index 0000000000..d4881b140d
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.api.{FetchRequestBuilder, FetchResponse}
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.consumer.SimpleConsumer
+import kafka.message.{MessageAndMetadata, MessageAndOffset}
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
+
+import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.NextIterator
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
+ * @param messageHandler function for translating each message into the desired type
+ */
+private[kafka]
+class KafkaRDD[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag,
+ R: ClassTag] private[spark] (
+ sc: SparkContext,
+ kafkaParams: Map[String, String],
+ val offsetRanges: Array[OffsetRange],
+ leaders: Map[TopicAndPartition, (String, Int)],
+ messageHandler: MessageAndMetadata[K, V] => R
+ ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
+ override def getPartitions: Array[Partition] = {
+ offsetRanges.zipWithIndex.map { case (o, i) =>
+ val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
+ new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
+ }.toArray
+ }
+
+ override def count(): Long = offsetRanges.map(_.count).sum
+
+ override def countApprox(
+ timeout: Long,
+ confidence: Double = 0.95
+ ): PartialResult[BoundedDouble] = {
+ val c = count
+ new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
+ }
+
+ override def isEmpty(): Boolean = count == 0L
+
+ override def take(num: Int): Array[R] = {
+ val nonEmptyPartitions = this.partitions
+ .map(_.asInstanceOf[KafkaRDDPartition])
+ .filter(_.count > 0)
+
+ if (num < 1 || nonEmptyPartitions.isEmpty) {
+ return new Array[R](0)
+ }
+
+ // Determine in advance how many messages need to be taken from each partition
+ val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
+ val remain = num - result.values.sum
+ if (remain > 0) {
+ val taken = Math.min(remain, part.count)
+ result + (part.index -> taken.toInt)
+ } else {
+ result
+ }
+ }
+
+ val buf = new ArrayBuffer[R]
+ val res = context.runJob(
+ this,
+ (tc: TaskContext, it: Iterator[R]) => it.take(parts(tc.partitionId)).toArray,
+ parts.keys.toArray)
+ res.foreach(buf ++= _)
+ buf.toArray
+ }
+
+ override def getPreferredLocations(thePart: Partition): Seq[String] = {
+ val part = thePart.asInstanceOf[KafkaRDDPartition]
+ // TODO is additional hostname resolution necessary here
+ Seq(part.host)
+ }
+
+ private def errBeginAfterEnd(part: KafkaRDDPartition): String =
+ s"Beginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} " +
+ s"for topic ${part.topic} partition ${part.partition}. " +
+ "You either provided an invalid fromOffset, or the Kafka topic has been damaged"
+
+ private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
+ s"Ran out of messages before reaching ending offset ${part.untilOffset} " +
+ s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
+ " This should not happen, and indicates that messages may have been lost"
+
+ private def errOvershotEnd(itemOffset: Long, part: KafkaRDDPartition): String =
+ s"Got ${itemOffset} > ending offset ${part.untilOffset} " +
+ s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
+ " This should not happen, and indicates a message may have been skipped"
+
+ override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
+ val part = thePart.asInstanceOf[KafkaRDDPartition]
+ assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
+ if (part.fromOffset == part.untilOffset) {
+ log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
+ s"skipping ${part.topic} ${part.partition}")
+ Iterator.empty
+ } else {
+ new KafkaRDDIterator(part, context)
+ }
+ }
+
+ private class KafkaRDDIterator(
+ part: KafkaRDDPartition,
+ context: TaskContext) extends NextIterator[R] {
+
+ context.addTaskCompletionListener{ context => closeIfNeeded() }
+
+ log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
+ s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+
+ val kc = new KafkaCluster(kafkaParams)
+ val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(kc.config.props)
+ .asInstanceOf[Decoder[K]]
+ val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(kc.config.props)
+ .asInstanceOf[Decoder[V]]
+ val consumer = connectLeader
+ var requestOffset = part.fromOffset
+ var iter: Iterator[MessageAndOffset] = null
+
+ // The idea is to use the provided preferred host, except on task retry attempts,
+ // to minimize number of kafka metadata requests
+ private def connectLeader: SimpleConsumer = {
+ if (context.attemptNumber > 0) {
+ kc.connectLeader(part.topic, part.partition).fold(
+ errs => throw new SparkException(
+ s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " +
+ errs.mkString("\n")),
+ consumer => consumer
+ )
+ } else {
+ kc.connect(part.host, part.port)
+ }
+ }
+
+ private def handleFetchErr(resp: FetchResponse) {
+ if (resp.hasError) {
+ val err = resp.errorCode(part.topic, part.partition)
+ if (err == ErrorMapping.LeaderNotAvailableCode ||
+ err == ErrorMapping.NotLeaderForPartitionCode) {
+ log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
+ s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
+ Thread.sleep(kc.config.refreshLeaderBackoffMs)
+ }
+ // Let normal rdd retry sort out reconnect attempts
+ throw ErrorMapping.exceptionFor(err)
+ }
+ }
+
+ private def fetchBatch: Iterator[MessageAndOffset] = {
+ val req = new FetchRequestBuilder()
+ .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
+ .build()
+ val resp = consumer.fetch(req)
+ handleFetchErr(resp)
+ // kafka may return a batch that starts before the requested offset
+ resp.messageSet(part.topic, part.partition)
+ .iterator
+ .dropWhile(_.offset < requestOffset)
+ }
+
+ override def close(): Unit = {
+ if (consumer != null) {
+ consumer.close()
+ }
+ }
+
+ override def getNext(): R = {
+ if (iter == null || !iter.hasNext) {
+ iter = fetchBatch
+ }
+ if (!iter.hasNext) {
+ assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
+ finished = true
+ null.asInstanceOf[R]
+ } else {
+ val item = iter.next()
+ if (item.offset >= part.untilOffset) {
+ assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
+ finished = true
+ null.asInstanceOf[R]
+ } else {
+ requestOffset = item.nextOffset
+ messageHandler(new MessageAndMetadata(
+ part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
+ }
+ }
+ }
+ }
+}
+
+private[kafka]
+object KafkaRDD {
+ import KafkaCluster.LeaderOffset
+
+ /**
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>.
+ * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the batch
+ * @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive)
+ * ending point of the batch
+ * @param messageHandler function for translating each message into the desired type
+ */
+ def apply[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag,
+ R: ClassTag](
+ sc: SparkContext,
+ kafkaParams: Map[String, String],
+ fromOffsets: Map[TopicAndPartition, Long],
+ untilOffsets: Map[TopicAndPartition, LeaderOffset],
+ messageHandler: MessageAndMetadata[K, V] => R
+ ): KafkaRDD[K, V, U, T, R] = {
+ val leaders = untilOffsets.map { case (tp, lo) =>
+ tp -> (lo.host, lo.port)
+ }.toMap
+
+ val offsetRanges = fromOffsets.map { case (tp, fo) =>
+ val uo = untilOffsets(tp)
+ OffsetRange(tp.topic, tp.partition, fo, uo.offset)
+ }.toArray
+
+ new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, messageHandler)
+ }
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
new file mode 100644
index 0000000000..02917becf0
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import org.apache.spark.Partition
+
+/**
+ * @param topic kafka topic name
+ * @param partition kafka partition id
+ * @param fromOffset inclusive starting offset
+ * @param untilOffset exclusive ending offset
+ * @param host preferred kafka host, i.e. the leader at the time the rdd was created
+ * @param port preferred kafka host's port
+ */
+private[kafka]
+class KafkaRDDPartition(
+ val index: Int,
+ val topic: String,
+ val partition: Int,
+ val fromOffset: Long,
+ val untilOffset: Long,
+ val host: String,
+ val port: Int
+) extends Partition {
+ /** Number of messages this partition refers to */
+ def count(): Long = untilOffset - fromOffset
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
new file mode 100644
index 0000000000..d9d4240c05
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.io.File
+import java.lang.{Integer => JInt}
+import java.net.InetSocketAddress
+import java.util.{Map => JMap, Properties}
+import java.util.concurrent.TimeoutException
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.control.NonFatal
+
+import kafka.admin.AdminUtils
+import kafka.api.Request
+import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
+import kafka.serializer.StringEncoder
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{ZKStringSerializer, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
+import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.Time
+import org.apache.spark.util.Utils
+
+/**
+ * This is a helper class for Kafka test suites. This has the functionality to set up
+ * and tear down local Kafka servers, and to push data using Kafka producers.
+ *
+ * The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
+ */
+private[kafka] class KafkaTestUtils extends Logging {
+
+ // Zookeeper related configurations
+ private val zkHost = "localhost"
+ private var zkPort: Int = 0
+ private val zkConnectionTimeout = 60000
+ private val zkSessionTimeout = 6000
+
+ private var zookeeper: EmbeddedZookeeper = _
+
+ private var zkClient: ZkClient = _
+
+ // Kafka broker related configurations
+ private val brokerHost = "localhost"
+ private var brokerPort = 9092
+ private var brokerConf: KafkaConfig = _
+
+ // Kafka broker server
+ private var server: KafkaServer = _
+
+ // Kafka producer
+ private var producer: Producer[String, String] = _
+
+ // Flag to test whether the system is correctly started
+ private var zkReady = false
+ private var brokerReady = false
+
+ def zkAddress: String = {
+ assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address")
+ s"$zkHost:$zkPort"
+ }
+
+ def brokerAddress: String = {
+ assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address")
+ s"$brokerHost:$brokerPort"
+ }
+
+ def zookeeperClient: ZkClient = {
+ assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
+ Option(zkClient).getOrElse(
+ throw new IllegalStateException("Zookeeper client is not yet initialized"))
+ }
+
+ // Set up the Embedded Zookeeper server and get the proper Zookeeper port
+ private def setupEmbeddedZookeeper(): Unit = {
+ // Zookeeper server startup
+ zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+ // Get the actual zookeeper binding port
+ zkPort = zookeeper.actualPort
+ zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout,
+ ZKStringSerializer)
+ zkReady = true
+ }
+
+ // Set up the Embedded Kafka server
+ private def setupEmbeddedKafkaServer(): Unit = {
+ assert(zkReady, "Zookeeper should be set up beforehand")
+
+ // Kafka broker startup
+ Utils.startServiceOnPort(brokerPort, port => {
+ brokerPort = port
+ brokerConf = new KafkaConfig(brokerConfiguration)
+ server = new KafkaServer(brokerConf)
+ server.startup()
+ (server, port)
+ }, new SparkConf(), "KafkaBroker")
+
+ brokerReady = true
+ }
+
+ /** setup the whole embedded servers, including Zookeeper and Kafka brokers */
+ def setup(): Unit = {
+ setupEmbeddedZookeeper()
+ setupEmbeddedKafkaServer()
+ }
+
+ /** Teardown the whole servers, including Kafka broker and Zookeeper */
+ def teardown(): Unit = {
+ brokerReady = false
+ zkReady = false
+
+ if (producer != null) {
+ producer.close()
+ producer = null
+ }
+
+ if (server != null) {
+ server.shutdown()
+ server = null
+ }
+
+ brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+
+ if (zkClient != null) {
+ zkClient.close()
+ zkClient = null
+ }
+
+ if (zookeeper != null) {
+ zookeeper.shutdown()
+ zookeeper = null
+ }
+ }
+
+ /** Create a Kafka topic and wait until it is propagated to the whole cluster */
+ def createTopic(topic: String, partitions: Int): Unit = {
+ AdminUtils.createTopic(zkClient, topic, partitions, 1)
+ // wait until metadata is propagated
+ (0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, p) }
+ }
+
+ /** Single-argument version for backwards compatibility */
+ def createTopic(topic: String): Unit = createTopic(topic, 1)
+
+ /** Java-friendly function for sending messages to the Kafka broker */
+ def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
+ sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
+ }
+
+ /** Send the messages to the Kafka broker */
+ def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
+ val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray
+ sendMessages(topic, messages)
+ }
+
+ /** Send the array of messages to the Kafka broker */
+ def sendMessages(topic: String, messages: Array[String]): Unit = {
+ producer = new Producer[String, String](new ProducerConfig(producerConfiguration))
+ producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
+ producer.close()
+ producer = null
+ }
+
+ private def brokerConfiguration: Properties = {
+ val props = new Properties()
+ props.put("broker.id", "0")
+ props.put("host.name", "localhost")
+ props.put("port", brokerPort.toString)
+ props.put("log.dir", Utils.createTempDir().getAbsolutePath)
+ props.put("zookeeper.connect", zkAddress)
+ props.put("log.flush.interval.messages", "1")
+ props.put("replica.socket.timeout.ms", "1500")
+ props
+ }
+
+ private def producerConfiguration: Properties = {
+ val props = new Properties()
+ props.put("metadata.broker.list", brokerAddress)
+ props.put("serializer.class", classOf[StringEncoder].getName)
+ // wait for all in-sync replicas to ack sends
+ props.put("request.required.acks", "-1")
+ props
+ }
+
+ // A simplified version of scalatest eventually, rewritten here to avoid adding extra test
+ // dependency
+ def eventually[T](timeout: Time, interval: Time)(func: => T): T = {
+ def makeAttempt(): Either[Throwable, T] = {
+ try {
+ Right(func)
+ } catch {
+ case e if NonFatal(e) => Left(e)
+ }
+ }
+
+ val startTime = System.currentTimeMillis()
+ @tailrec
+ def tryAgain(attempt: Int): T = {
+ makeAttempt() match {
+ case Right(result) => result
+ case Left(e) =>
+ val duration = System.currentTimeMillis() - startTime
+ if (duration < timeout.milliseconds) {
+ Thread.sleep(interval.milliseconds)
+ } else {
+ throw new TimeoutException(e.getMessage)
+ }
+
+ tryAgain(attempt + 1)
+ }
+ }
+
+ tryAgain(1)
+ }
+
+ private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
+ def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
+ case Some(partitionState) =>
+ val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
+
+ ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined &&
+ Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
+ leaderAndInSyncReplicas.isr.size >= 1
+
+ case _ =>
+ false
+ }
+ eventually(Time(10000), Time(100)) {
+ assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
+ }
+ }
+
+ private class EmbeddedZookeeper(val zkConnect: String) {
+ val snapshotDir = Utils.createTempDir()
+ val logDir = Utils.createTempDir()
+
+ val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
+ val (ip, port) = {
+ val splits = zkConnect.split(":")
+ (splits(0), splits(1).toInt)
+ }
+ val factory = new NIOServerCnxnFactory()
+ factory.configure(new InetSocketAddress(ip, port), 16)
+ factory.startup(zookeeper)
+
+ val actualPort = factory.getLocalPort
+
+ def shutdown() {
+ factory.shutdown()
+ Utils.deleteRecursively(snapshotDir)
+ Utils.deleteRecursively(logDir)
+ }
+ }
+}
+
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
new file mode 100644
index 0000000000..edaafb912c
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.io.OutputStream
+import java.lang.{Integer => JInt, Long => JLong}
+import java.nio.charset.StandardCharsets
+import java.util.{List => JList, Map => JMap, Set => JSet}
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder}
+import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
+
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.api.python.SerDeUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java._
+import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
+import org.apache.spark.streaming.util.WriteAheadLogUtils
+
+object KafkaUtils {
+ /**
+ * Create an input stream that pulls messages from Kafka Brokers.
+ * @param ssc StreamingContext object
+ * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
+ * @param groupId The group id for this consumer
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ * @return DStream of (Kafka message key, Kafka message value)
+ */
+ def createStream(
+ ssc: StreamingContext,
+ zkQuorum: String,
+ groupId: String,
+ topics: Map[String, Int],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): ReceiverInputDStream[(String, String)] = {
+ val kafkaParams = Map[String, String](
+ "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
+ "zookeeper.connection.timeout.ms" -> "10000")
+ createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics, storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages from Kafka Brokers.
+ * @param ssc StreamingContext object
+ * @param kafkaParams Map of kafka configuration parameters,
+ * see http://kafka.apache.org/08/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam U type of Kafka message key decoder
+ * @tparam T type of Kafka message value decoder
+ * @return DStream of (Kafka message key, Kafka message value)
+ */
+ def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
+ ssc: StreamingContext,
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
+ ): ReceiverInputDStream[(K, V)] = {
+ val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
+ new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages from Kafka Brokers.
+ * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param jssc JavaStreamingContext object
+ * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
+ * @param groupId The group id for this consumer
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread
+ * @return DStream of (Kafka message key, Kafka message value)
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ zkQuorum: String,
+ groupId: String,
+ topics: JMap[String, JInt]
+ ): JavaPairReceiverInputDStream[String, String] = {
+ createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*))
+ }
+
+ /**
+ * Create an input stream that pulls messages from Kafka Brokers.
+ * @param jssc JavaStreamingContext object
+ * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..).
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel RDD storage level.
+ * @return DStream of (Kafka message key, Kafka message value)
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ zkQuorum: String,
+ groupId: String,
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel
+ ): JavaPairReceiverInputDStream[String, String] = {
+ createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
+ storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages from Kafka Brokers.
+ * @param jssc JavaStreamingContext object
+ * @param keyTypeClass Key type of DStream
+ * @param valueTypeClass value type of Dstream
+ * @param keyDecoderClass Type of kafka key decoder
+ * @param valueDecoderClass Type of kafka value decoder
+ * @param kafkaParams Map of kafka configuration parameters,
+ * see http://kafka.apache.org/08/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread
+ * @param storageLevel RDD storage level.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam U type of Kafka message key decoder
+ * @tparam T type of Kafka message value decoder
+ * @return DStream of (Kafka message key, Kafka message value)
+ */
+ def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
+ jssc: JavaStreamingContext,
+ keyTypeClass: Class[K],
+ valueTypeClass: Class[V],
+ keyDecoderClass: Class[U],
+ valueDecoderClass: Class[T],
+ kafkaParams: JMap[String, String],
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel
+ ): JavaPairReceiverInputDStream[K, V] = {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)
+
+ implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
+ implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)
+
+ createStream[K, V, U, T](
+ jssc.ssc,
+ kafkaParams.asScala.toMap,
+ Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
+ storageLevel)
+ }
+
+ /** get leaders for the given offset ranges, or throw an exception */
+ private def leadersForRanges(
+ kc: KafkaCluster,
+ offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, Int)] = {
+ val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet
+ val leaders = kc.findLeaders(topics)
+ KafkaCluster.checkErrors(leaders)
+ }
+
+ /** Make sure offsets are available in kafka, or throw an exception */
+ private def checkOffsets(
+ kc: KafkaCluster,
+ offsetRanges: Array[OffsetRange]): Unit = {
+ val topics = offsetRanges.map(_.topicAndPartition).toSet
+ val result = for {
+ low <- kc.getEarliestLeaderOffsets(topics).right
+ high <- kc.getLatestLeaderOffsets(topics).right
+ } yield {
+ offsetRanges.filterNot { o =>
+ low(o.topicAndPartition).offset <= o.fromOffset &&
+ o.untilOffset <= high(o.topicAndPartition).offset
+ }
+ }
+ val badRanges = KafkaCluster.checkErrors(result)
+ if (!badRanges.isEmpty) {
+ throw new SparkException("Offsets not available on leader: " + badRanges.mkString(","))
+ }
+ }
+
+ private[kafka] def getFromOffsets(
+ kc: KafkaCluster,
+ kafkaParams: Map[String, String],
+ topics: Set[String]
+ ): Map[TopicAndPartition, Long] = {
+ val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
+ val result = for {
+ topicPartitions <- kc.getPartitions(topics).right
+ leaderOffsets <- (if (reset == Some("smallest")) {
+ kc.getEarliestLeaderOffsets(topicPartitions)
+ } else {
+ kc.getLatestLeaderOffsets(topicPartitions)
+ }).right
+ } yield {
+ leaderOffsets.map { case (tp, lo) =>
+ (tp, lo.offset)
+ }
+ }
+ KafkaCluster.checkErrors(result)
+ }
+
+ /**
+ * Create a RDD from Kafka using offset ranges for each topic and partition.
+ *
+ * @param sc SparkContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param offsetRanges Each OffsetRange in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @return RDD of (Kafka message key, Kafka message value)
+ */
+ def createRDD[
+ K: ClassTag,
+ V: ClassTag,
+ KD <: Decoder[K]: ClassTag,
+ VD <: Decoder[V]: ClassTag](
+ sc: SparkContext,
+ kafkaParams: Map[String, String],
+ offsetRanges: Array[OffsetRange]
+ ): RDD[(K, V)] = sc.withScope {
+ val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
+ val kc = new KafkaCluster(kafkaParams)
+ val leaders = leadersForRanges(kc, offsetRanges)
+ checkOffsets(kc, offsetRanges)
+ new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
+ }
+
+ /**
+ * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
+ * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
+ * as the metadata.
+ *
+ * @param sc SparkContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param offsetRanges Each OffsetRange in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
+ * in which case leaders will be looked up on the driver.
+ * @param messageHandler Function for translating each message and metadata into the desired type
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @tparam R type returned by messageHandler
+ * @return RDD of R
+ */
+ def createRDD[
+ K: ClassTag,
+ V: ClassTag,
+ KD <: Decoder[K]: ClassTag,
+ VD <: Decoder[V]: ClassTag,
+ R: ClassTag](
+ sc: SparkContext,
+ kafkaParams: Map[String, String],
+ offsetRanges: Array[OffsetRange],
+ leaders: Map[TopicAndPartition, Broker],
+ messageHandler: MessageAndMetadata[K, V] => R
+ ): RDD[R] = sc.withScope {
+ val kc = new KafkaCluster(kafkaParams)
+ val leaderMap = if (leaders.isEmpty) {
+ leadersForRanges(kc, offsetRanges)
+ } else {
+ // This could be avoided by refactoring KafkaRDD.leaders and KafkaCluster to use Broker
+ leaders.map {
+ case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
+ }
+ }
+ val cleanedHandler = sc.clean(messageHandler)
+ checkOffsets(kc, offsetRanges)
+ new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, cleanedHandler)
+ }
+
+ /**
+ * Create a RDD from Kafka using offset ranges for each topic and partition.
+ *
+ * @param jsc JavaSparkContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param offsetRanges Each OffsetRange in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ * @param keyClass type of Kafka message key
+ * @param valueClass type of Kafka message value
+ * @param keyDecoderClass type of Kafka message key decoder
+ * @param valueDecoderClass type of Kafka message value decoder
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @return RDD of (Kafka message key, Kafka message value)
+ */
+ def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
+ jsc: JavaSparkContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ keyDecoderClass: Class[KD],
+ valueDecoderClass: Class[VD],
+ kafkaParams: JMap[String, String],
+ offsetRanges: Array[OffsetRange]
+ ): JavaPairRDD[K, V] = jsc.sc.withScope {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+ implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+ new JavaPairRDD(createRDD[K, V, KD, VD](
+ jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges))
+ }
+
+ /**
+ * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
+ * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
+ * as the metadata.
+ *
+ * @param jsc JavaSparkContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param offsetRanges Each OffsetRange in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
+ * in which case leaders will be looked up on the driver.
+ * @param messageHandler Function for translating each message and metadata into the desired type
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @tparam R type returned by messageHandler
+ * @return RDD of R
+ */
+ def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
+ jsc: JavaSparkContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ keyDecoderClass: Class[KD],
+ valueDecoderClass: Class[VD],
+ recordClass: Class[R],
+ kafkaParams: JMap[String, String],
+ offsetRanges: Array[OffsetRange],
+ leaders: JMap[TopicAndPartition, Broker],
+ messageHandler: JFunction[MessageAndMetadata[K, V], R]
+ ): JavaRDD[R] = jsc.sc.withScope {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+ implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+ implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+ val leaderMap = Map(leaders.asScala.toSeq: _*)
+ createRDD[K, V, KD, VD, R](
+ jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges, leaderMap, messageHandler.call(_))
+ }
+
+ /**
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
+ *
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself. For interoperability with Kafka monitoring tools that depend on
+ * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
+ *
+ * @param ssc StreamingContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the stream
+ * @param messageHandler Function for translating each message and metadata into the desired type
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @tparam R type returned by messageHandler
+ * @return DStream of R
+ */
+ def createDirectStream[
+ K: ClassTag,
+ V: ClassTag,
+ KD <: Decoder[K]: ClassTag,
+ VD <: Decoder[V]: ClassTag,
+ R: ClassTag] (
+ ssc: StreamingContext,
+ kafkaParams: Map[String, String],
+ fromOffsets: Map[TopicAndPartition, Long],
+ messageHandler: MessageAndMetadata[K, V] => R
+ ): InputDStream[R] = {
+ val cleanedHandler = ssc.sc.clean(messageHandler)
+ new DirectKafkaInputDStream[K, V, KD, VD, R](
+ ssc, kafkaParams, fromOffsets, cleanedHandler)
+ }
+
+ /**
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
+ *
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself. For interoperability with Kafka monitoring tools that depend on
+ * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
+ *
+ * @param ssc StreamingContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+ * host1:port1,host2:port2 form.
+ * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
+ * to determine where the stream starts (defaults to "largest")
+ * @param topics Names of the topics to consume
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @return DStream of (Kafka message key, Kafka message value)
+ */
+ def createDirectStream[
+ K: ClassTag,
+ V: ClassTag,
+ KD <: Decoder[K]: ClassTag,
+ VD <: Decoder[V]: ClassTag] (
+ ssc: StreamingContext,
+ kafkaParams: Map[String, String],
+ topics: Set[String]
+ ): InputDStream[(K, V)] = {
+ val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
+ val kc = new KafkaCluster(kafkaParams)
+ val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
+ new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
+ ssc, kafkaParams, fromOffsets, messageHandler)
+ }
+
+ /**
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
+ *
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself. For interoperability with Kafka monitoring tools that depend on
+ * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param keyClass Class of the keys in the Kafka records
+ * @param valueClass Class of the values in the Kafka records
+ * @param keyDecoderClass Class of the key decoder
+ * @param valueDecoderClass Class of the value decoder
+ * @param recordClass Class of the records in DStream
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+ * host1:port1,host2:port2 form.
+ * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the stream
+ * @param messageHandler Function for translating each message and metadata into the desired type
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @tparam R type returned by messageHandler
+ * @return DStream of R
+ */
+ def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
+ jssc: JavaStreamingContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ keyDecoderClass: Class[KD],
+ valueDecoderClass: Class[VD],
+ recordClass: Class[R],
+ kafkaParams: JMap[String, String],
+ fromOffsets: JMap[TopicAndPartition, JLong],
+ messageHandler: JFunction[MessageAndMetadata[K, V], R]
+ ): JavaInputDStream[R] = {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+ implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+ implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+ val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
+ createDirectStream[K, V, KD, VD, R](
+ jssc.ssc,
+ Map(kafkaParams.asScala.toSeq: _*),
+ Map(fromOffsets.asScala.mapValues(_.longValue()).toSeq: _*),
+ cleanedHandler
+ )
+ }
+
+ /**
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
+ *
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself. For interoperability with Kafka monitoring tools that depend on
+ * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param keyClass Class of the keys in the Kafka records
+ * @param valueClass Class of the values in the Kafka records
+ * @param keyDecoderClass Class of the key decoder
+ * @param valueDecoderClass Class type of the value decoder
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+ * host1:port1,host2:port2 form.
+ * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
+ * to determine where the stream starts (defaults to "largest")
+ * @param topics Names of the topics to consume
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @return DStream of (Kafka message key, Kafka message value)
+ */
+ def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](
+ jssc: JavaStreamingContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ keyDecoderClass: Class[KD],
+ valueDecoderClass: Class[VD],
+ kafkaParams: JMap[String, String],
+ topics: JSet[String]
+ ): JavaPairInputDStream[K, V] = {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+ implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+ createDirectStream[K, V, KD, VD](
+ jssc.ssc,
+ Map(kafkaParams.asScala.toSeq: _*),
+ Set(topics.asScala.toSeq: _*)
+ )
+ }
+}
+
+/**
+ * This is a helper class that wraps the KafkaUtils.createStream() into more
+ * Python-friendly class and function so that it can be easily
+ * instantiated and called from Python's KafkaUtils.
+ *
+ * The zero-arg constructor helps instantiate this class from the Class object
+ * classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
+ * takes care of known parameters instead of passing them from Python
+ */
+private[kafka] class KafkaUtilsPythonHelper {
+ import KafkaUtilsPythonHelper._
+
+ def createStream(
+ jssc: JavaStreamingContext,
+ kafkaParams: JMap[String, String],
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], Array[Byte]] = {
+ KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
+ jssc,
+ classOf[Array[Byte]],
+ classOf[Array[Byte]],
+ classOf[DefaultDecoder],
+ classOf[DefaultDecoder],
+ kafkaParams,
+ topics,
+ storageLevel)
+ }
+
+ def createRDDWithoutMessageHandler(
+ jsc: JavaSparkContext,
+ kafkaParams: JMap[String, String],
+ offsetRanges: JList[OffsetRange],
+ leaders: JMap[TopicAndPartition, Broker]): JavaRDD[(Array[Byte], Array[Byte])] = {
+ val messageHandler =
+ (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message)
+ new JavaRDD(createRDD(jsc, kafkaParams, offsetRanges, leaders, messageHandler))
+ }
+
+ def createRDDWithMessageHandler(
+ jsc: JavaSparkContext,
+ kafkaParams: JMap[String, String],
+ offsetRanges: JList[OffsetRange],
+ leaders: JMap[TopicAndPartition, Broker]): JavaRDD[Array[Byte]] = {
+ val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
+ new PythonMessageAndMetadata(
+ mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message())
+ val rdd = createRDD(jsc, kafkaParams, offsetRanges, leaders, messageHandler).
+ mapPartitions(picklerIterator)
+ new JavaRDD(rdd)
+ }
+
+ private def createRDD[V: ClassTag](
+ jsc: JavaSparkContext,
+ kafkaParams: JMap[String, String],
+ offsetRanges: JList[OffsetRange],
+ leaders: JMap[TopicAndPartition, Broker],
+ messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): RDD[V] = {
+ KafkaUtils.createRDD[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, V](
+ jsc.sc,
+ kafkaParams.asScala.toMap,
+ offsetRanges.toArray(new Array[OffsetRange](offsetRanges.size())),
+ leaders.asScala.toMap,
+ messageHandler
+ )
+ }
+
+ def createDirectStreamWithoutMessageHandler(
+ jssc: JavaStreamingContext,
+ kafkaParams: JMap[String, String],
+ topics: JSet[String],
+ fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[(Array[Byte], Array[Byte])] = {
+ val messageHandler =
+ (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message)
+ new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler))
+ }
+
+ def createDirectStreamWithMessageHandler(
+ jssc: JavaStreamingContext,
+ kafkaParams: JMap[String, String],
+ topics: JSet[String],
+ fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[Array[Byte]] = {
+ val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
+ new PythonMessageAndMetadata(mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message())
+ val stream = createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler).
+ mapPartitions(picklerIterator)
+ new JavaDStream(stream)
+ }
+
+ private def createDirectStream[V: ClassTag](
+ jssc: JavaStreamingContext,
+ kafkaParams: JMap[String, String],
+ topics: JSet[String],
+ fromOffsets: JMap[TopicAndPartition, JLong],
+ messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): DStream[V] = {
+
+ val currentFromOffsets = if (!fromOffsets.isEmpty) {
+ val topicsFromOffsets = fromOffsets.keySet().asScala.map(_.topic)
+ if (topicsFromOffsets != topics.asScala.toSet) {
+ throw new IllegalStateException(
+ s"The specified topics: ${topics.asScala.toSet.mkString(" ")} " +
+ s"do not equal to the topic from offsets: ${topicsFromOffsets.mkString(" ")}")
+ }
+ Map(fromOffsets.asScala.mapValues { _.longValue() }.toSeq: _*)
+ } else {
+ val kc = new KafkaCluster(Map(kafkaParams.asScala.toSeq: _*))
+ KafkaUtils.getFromOffsets(
+ kc, Map(kafkaParams.asScala.toSeq: _*), Set(topics.asScala.toSeq: _*))
+ }
+
+ KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, V](
+ jssc.ssc,
+ Map(kafkaParams.asScala.toSeq: _*),
+ Map(currentFromOffsets.toSeq: _*),
+ messageHandler)
+ }
+
+ def createOffsetRange(topic: String, partition: JInt, fromOffset: JLong, untilOffset: JLong
+ ): OffsetRange = OffsetRange.create(topic, partition, fromOffset, untilOffset)
+
+ def createTopicAndPartition(topic: String, partition: JInt): TopicAndPartition =
+ TopicAndPartition(topic, partition)
+
+ def createBroker(host: String, port: JInt): Broker = Broker(host, port)
+
+ def offsetRangesOfKafkaRDD(rdd: RDD[_]): JList[OffsetRange] = {
+ val parentRDDs = rdd.getNarrowAncestors
+ val kafkaRDDs = parentRDDs.filter(rdd => rdd.isInstanceOf[KafkaRDD[_, _, _, _, _]])
+
+ require(
+ kafkaRDDs.length == 1,
+ "Cannot get offset ranges, as there may be multiple Kafka RDDs or no Kafka RDD associated" +
+ "with this RDD, please call this method only on a Kafka RDD.")
+
+ val kafkaRDD = kafkaRDDs.head.asInstanceOf[KafkaRDD[_, _, _, _, _]]
+ kafkaRDD.offsetRanges.toSeq.asJava
+ }
+}
+
+private object KafkaUtilsPythonHelper {
+ private var initialized = false
+
+ def initialize(): Unit = {
+ SerDeUtil.initialize()
+ synchronized {
+ if (!initialized) {
+ new PythonMessageAndMetadataPickler().register()
+ initialized = true
+ }
+ }
+ }
+
+ initialize()
+
+ def picklerIterator(iter: Iterator[Any]): Iterator[Array[Byte]] = {
+ new SerDeUtil.AutoBatchedPickler(iter)
+ }
+
+ case class PythonMessageAndMetadata(
+ topic: String,
+ partition: JInt,
+ offset: JLong,
+ key: Array[Byte],
+ message: Array[Byte])
+
+ class PythonMessageAndMetadataPickler extends IObjectPickler {
+ private val module = "pyspark.streaming.kafka"
+
+ def register(): Unit = {
+ Pickler.registerCustomPickler(classOf[PythonMessageAndMetadata], this)
+ Pickler.registerCustomPickler(this.getClass, this)
+ }
+
+ def pickle(obj: Object, out: OutputStream, pickler: Pickler) {
+ if (obj == this) {
+ out.write(Opcodes.GLOBAL)
+ out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(StandardCharsets.UTF_8))
+ } else {
+ pickler.save(this)
+ val msgAndMetaData = obj.asInstanceOf[PythonMessageAndMetadata]
+ out.write(Opcodes.MARK)
+ pickler.save(msgAndMetaData.topic)
+ pickler.save(msgAndMetaData.partition)
+ pickler.save(msgAndMetaData.offset)
+ pickler.save(msgAndMetaData.key)
+ pickler.save(msgAndMetaData.message)
+ out.write(Opcodes.TUPLE)
+ out.write(Opcodes.REDUCE)
+ }
+ }
+ }
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
new file mode 100644
index 0000000000..d9b856e469
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.common.TopicAndPartition
+
+/**
+ * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the
+ * offset ranges in RDDs generated by the direct Kafka DStream (see
+ * [[KafkaUtils.createDirectStream()]]).
+ * {{{
+ * KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
+ * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ * ...
+ * }
+ * }}}
+ */
+trait HasOffsetRanges {
+ def offsetRanges: Array[OffsetRange]
+}
+
+/**
+ * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class
+ * can be created with `OffsetRange.create()`.
+ * @param topic Kafka topic name
+ * @param partition Kafka partition id
+ * @param fromOffset Inclusive starting offset
+ * @param untilOffset Exclusive ending offset
+ */
+final class OffsetRange private(
+ val topic: String,
+ val partition: Int,
+ val fromOffset: Long,
+ val untilOffset: Long) extends Serializable {
+ import OffsetRange.OffsetRangeTuple
+
+ /** Kafka TopicAndPartition object, for convenience */
+ def topicAndPartition(): TopicAndPartition = TopicAndPartition(topic, partition)
+
+ /** Number of messages this OffsetRange refers to */
+ def count(): Long = untilOffset - fromOffset
+
+ override def equals(obj: Any): Boolean = obj match {
+ case that: OffsetRange =>
+ this.topic == that.topic &&
+ this.partition == that.partition &&
+ this.fromOffset == that.fromOffset &&
+ this.untilOffset == that.untilOffset
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ toTuple.hashCode()
+ }
+
+ override def toString(): String = {
+ s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset])"
+ }
+
+ /** this is to avoid ClassNotFoundException during checkpoint restore */
+ private[streaming]
+ def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset)
+}
+
+/**
+ * Companion object the provides methods to create instances of [[OffsetRange]].
+ */
+object OffsetRange {
+ def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
+ new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+ def create(
+ topicAndPartition: TopicAndPartition,
+ fromOffset: Long,
+ untilOffset: Long): OffsetRange =
+ new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)
+
+ def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
+ new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+ def apply(
+ topicAndPartition: TopicAndPartition,
+ fromOffset: Long,
+ untilOffset: Long): OffsetRange =
+ new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)
+
+ /** this is to avoid ClassNotFoundException during checkpoint restore */
+ private[kafka]
+ type OffsetRangeTuple = (String, Int, Long, Long)
+
+ private[kafka]
+ def apply(t: OffsetRangeTuple) =
+ new OffsetRange(t._1, t._2, t._3, t._4)
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
new file mode 100644
index 0000000000..39abe3c3e2
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.util.Properties
+import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
+
+import scala.collection.{mutable, Map}
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
+ * It is turned off by default and will be enabled when
+ * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
+ * is that this receiver manages topic-partition/offset itself and updates the offset information
+ * after data is reliably stored as write-ahead log. Offsets will only be updated when data is
+ * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
+ *
+ * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
+ * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
+ * will not take effect.
+ */
+private[streaming]
+class ReliableKafkaReceiver[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel)
+ extends Receiver[(K, V)](storageLevel) with Logging {
+
+ private val groupId = kafkaParams("group.id")
+ private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
+ private def conf = SparkEnv.get.conf
+
+ /** High level consumer to connect to Kafka. */
+ private var consumerConnector: ConsumerConnector = null
+
+ /** zkClient to connect to Zookeeper to commit the offsets. */
+ private var zkClient: ZkClient = null
+
+ /**
+ * A HashMap to manage the offset for each topic/partition, this HashMap is called in
+ * synchronized block, so mutable HashMap will not meet concurrency issue.
+ */
+ private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
+
+ /** A concurrent HashMap to store the stream block id and related offset snapshot. */
+ private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
+
+ /**
+ * Manage the BlockGenerator in receiver itself for better managing block store and offset
+ * commit.
+ */
+ private var blockGenerator: BlockGenerator = null
+
+ /** Thread pool running the handlers for receiving message from multiple topics and partitions. */
+ private var messageHandlerThreadPool: ThreadPoolExecutor = null
+
+ override def onStart(): Unit = {
+ logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
+
+ // Initialize the topic-partition / offset hash map.
+ topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
+
+ // Initialize the stream block id / offset snapshot hash map.
+ blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()
+
+ // Initialize the block generator for storing Kafka message.
+ blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
+
+ if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
+ logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
+ "otherwise we will manually set it to false to turn off auto offset commit in Kafka")
+ }
+
+ val props = new Properties()
+ kafkaParams.foreach(param => props.put(param._1, param._2))
+ // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true,
+ // we have to make sure this property is set to false to turn off auto commit mechanism in
+ // Kafka.
+ props.setProperty(AUTO_OFFSET_COMMIT, "false")
+
+ val consumerConfig = new ConsumerConfig(props)
+
+ assert(!consumerConfig.autoCommitEnable)
+
+ logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
+ consumerConnector = Consumer.create(consumerConfig)
+ logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
+
+ zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
+ consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
+
+ messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
+ topics.values.sum, "KafkaMessageHandler")
+
+ blockGenerator.start()
+
+ val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[K]]
+
+ val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[V]]
+
+ val topicMessageStreams = consumerConnector.createMessageStreams(
+ topics, keyDecoder, valueDecoder)
+
+ topicMessageStreams.values.foreach { streams =>
+ streams.foreach { stream =>
+ messageHandlerThreadPool.submit(new MessageHandler(stream))
+ }
+ }
+ }
+
+ override def onStop(): Unit = {
+ if (messageHandlerThreadPool != null) {
+ messageHandlerThreadPool.shutdown()
+ messageHandlerThreadPool = null
+ }
+
+ if (consumerConnector != null) {
+ consumerConnector.shutdown()
+ consumerConnector = null
+ }
+
+ if (zkClient != null) {
+ zkClient.close()
+ zkClient = null
+ }
+
+ if (blockGenerator != null) {
+ blockGenerator.stop()
+ blockGenerator = null
+ }
+
+ if (topicPartitionOffsetMap != null) {
+ topicPartitionOffsetMap.clear()
+ topicPartitionOffsetMap = null
+ }
+
+ if (blockOffsetMap != null) {
+ blockOffsetMap.clear()
+ blockOffsetMap = null
+ }
+ }
+
+ /** Store a Kafka message and the associated metadata as a tuple. */
+ private def storeMessageAndMetadata(
+ msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
+ val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
+ val data = (msgAndMetadata.key, msgAndMetadata.message)
+ val metadata = (topicAndPartition, msgAndMetadata.offset)
+ blockGenerator.addDataWithCallback(data, metadata)
+ }
+
+ /** Update stored offset */
+ private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = {
+ topicPartitionOffsetMap.put(topicAndPartition, offset)
+ }
+
+ /**
+ * Remember the current offsets for each topic and partition. This is called when a block is
+ * generated.
+ */
+ private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
+ // Get a snapshot of current offset map and store with related block id.
+ val offsetSnapshot = topicPartitionOffsetMap.toMap
+ blockOffsetMap.put(blockId, offsetSnapshot)
+ topicPartitionOffsetMap.clear()
+ }
+
+ /**
+ * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method
+ * will try a fixed number of times to push the block. If the push fails, the receiver is stopped.
+ */
+ private def storeBlockAndCommitOffset(
+ blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
+ var count = 0
+ var pushed = false
+ var exception: Exception = null
+ while (!pushed && count <= 3) {
+ try {
+ store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
+ pushed = true
+ } catch {
+ case ex: Exception =>
+ count += 1
+ exception = ex
+ }
+ }
+ if (pushed) {
+ Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
+ blockOffsetMap.remove(blockId)
+ } else {
+ stop("Error while storing block into Spark", exception)
+ }
+ }
+
+ /**
+ * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
+ * metadata schema in Zookeeper.
+ */
+ private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
+ if (zkClient == null) {
+ val thrown = new IllegalStateException("Zookeeper client is unexpectedly null")
+ stop("Zookeeper client is not initialized before commit offsets to ZK", thrown)
+ return
+ }
+
+ for ((topicAndPart, offset) <- offsetMap) {
+ try {
+ val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic)
+ val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}"
+
+ ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
+ } catch {
+ case e: Exception =>
+ logWarning(s"Exception during commit offset $offset for topic" +
+ s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
+ }
+
+ logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
+ s"partition ${topicAndPart.partition}")
+ }
+ }
+
+ /** Class to handle received Kafka message. */
+ private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
+ override def run(): Unit = {
+ while (!isStopped) {
+ try {
+ val streamIterator = stream.iterator()
+ while (streamIterator.hasNext) {
+ storeMessageAndMetadata(streamIterator.next)
+ }
+ } catch {
+ case e: Exception =>
+ reportError("Error handling message", e)
+ }
+ }
+ }
+ }
+
+ /** Class to handle blocks generated by the block generator. */
+ private final class GeneratedBlockHandler extends BlockGeneratorListener {
+
+ def onAddData(data: Any, metadata: Any): Unit = {
+ // Update the offset of the data that was added to the generator
+ if (metadata != null) {
+ val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
+ updateOffset(topicAndPartition, offset)
+ }
+ }
+
+ def onGenerateBlock(blockId: StreamBlockId): Unit = {
+ // Remember the offsets of topics/partitions when a block has been generated
+ rememberBlockOffsets(blockId)
+ }
+
+ def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
+ // Store block and commit the blocks offset
+ storeBlockAndCommitOffset(blockId, arrayBuffer)
+ }
+
+ def onError(message: String, throwable: Throwable): Unit = {
+ reportError(message, throwable)
+ }
+ }
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
new file mode 100644
index 0000000000..2e5ab0fb3b
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Kafka receiver for spark streaming.
+ */
+package org.apache.spark.streaming.kafka;
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala
new file mode 100644
index 0000000000..47c5187f87
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+/**
+ * Kafka receiver for spark streaming,
+ */
+package object kafka