aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2015-02-04 12:06:34 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-02-04 12:06:34 -0800
commitb0c0021953826bccaee818a54afc44e8bdfa8572 (patch)
tree2d196c9e214dcdc15f4d6598c1dc1e17dce363ca /external
parentac0b2b788ff144970d6fdbdc445367772770458d (diff)
downloadspark-b0c0021953826bccaee818a54afc44e8bdfa8572.tar.gz
spark-b0c0021953826bccaee818a54afc44e8bdfa8572.tar.bz2
spark-b0c0021953826bccaee818a54afc44e8bdfa8572.zip
[SPARK-4964] [Streaming] Exactly-once semantics for Kafka
Author: cody koeninger <cody@koeninger.org> Closes #3798 from koeninger/kafkaRdd and squashes the following commits: 1dc2941 [cody koeninger] [SPARK-4964] silence ConsumerConfig warnings about broker connection props 59e29f6 [cody koeninger] [SPARK-4964] settle on "Direct" as a naming convention for the new stream 8c31855 [cody koeninger] [SPARK-4964] remove HasOffsetRanges interface from return types 0df3ebe [cody koeninger] [SPARK-4964] add comments per pwendell / dibbhatt 8991017 [cody koeninger] [SPARK-4964] formatting 825110f [cody koeninger] [SPARK-4964] rename stuff per TD 4354bce [cody koeninger] [SPARK-4964] per td, remove java interfaces, replace with final classes, corresponding changes to KafkaRDD constructor and checkpointing 9adaa0a [cody koeninger] [SPARK-4964] formatting 0090553 [cody koeninger] [SPARK-4964] javafication of interfaces 9a838c2 [cody koeninger] [SPARK-4964] code cleanup, add more tests 2b340d8 [cody koeninger] [SPARK-4964] refactor per TD feedback 80fd6ae [cody koeninger] [SPARK-4964] Rename createExactlyOnceStream so it isnt over-promising, change doc 99d2eba [cody koeninger] [SPARK-4964] Reduce level of nesting. If beginning is past end, its actually an error (may happen if Kafka topic was deleted and recreated) 19406cc [cody koeninger] Merge branch 'master' of https://github.com/apache/spark into kafkaRdd 2e67117 [cody koeninger] [SPARK-4964] one potential way of hiding most of the implementation, while still allowing access to offsets (but not subclassing) bb80bbe [cody koeninger] [SPARK-4964] scalastyle line length d4a7cf7 [cody koeninger] [SPARK-4964] allow for use cases that need to override compute for custom kafka dstreams c1bd6d9 [cody koeninger] [SPARK-4964] use newly available attemptNumber for correct retry behavior 548d529 [cody koeninger] Merge branch 'master' of https://github.com/apache/spark into kafkaRdd 0458e4e [cody koeninger] [SPARK-4964] recovery of generated rdds from checkpoint e86317b [cody koeninger] [SPARK-4964] try seed brokers in random order to spread metadata requests e93eb72 [cody koeninger] [SPARK-4964] refactor to add preferredLocations. depends on SPARK-4014 356c7cc [cody koeninger] [SPARK-4964] code cleanup per helena adf99a6 [cody koeninger] [SPARK-4964] fix serialization issues for checkpointing 1d50749 [cody koeninger] [SPARK-4964] code cleanup per tdas 8bfd6c0 [cody koeninger] [SPARK-4964] configure rate limiting via spark.streaming.receiver.maxRate e09045b [cody koeninger] [SPARK-4964] add foreachPartitionWithIndex, to avoid doing equivalent map + empty foreach boilerplate cac63ee [cody koeninger] additional testing, fix fencepost error 37d3053 [cody koeninger] make KafkaRDDPartition available to users so offsets can be committed per partition bcca8a4 [cody koeninger] Merge branch 'master' of https://github.com/apache/spark into kafkaRdd 6bf14f2 [cody koeninger] first attempt at a Kafka dstream that allows for exactly-once semantics 326ff3c [cody koeninger] add some tests 38bb727 [cody koeninger] give easy access to the parameters of a KafkaRDD 979da25 [cody koeninger] dont allow empty leader offsets to be returned 8d7de4a [cody koeninger] make sure leader offsets can be found even for leaders that arent in the seed brokers 4b078bf [cody koeninger] differentiate between leader and consumer offsets in error message 3c2a96a [cody koeninger] fix scalastyle errors 29c6b43 [cody koeninger] cleanup logging 783b477 [cody koeninger] update tests for kafka 8.1.1 7d050bc [cody koeninger] methods to set consumer offsets and get topic metadata, switch back to inclusive start / exclusive end to match typical kafka consumer behavior ce91c59 [cody koeninger] method to get consumer offsets, explicit error handling 4dafd1b [cody koeninger] method to get leader offsets, switch rdd bound to being exclusive start, inclusive end to match offsets typically returned from cluster 0b94b33 [cody koeninger] use dropWhile rather than filter to trim beginning of fetch response 1d70625 [cody koeninger] WIP on kafka cluster 76913e2 [cody koeninger] Batch oriented kafka rdd, WIP. todo: cluster metadata / finding leader
Diffstat (limited to 'external')
-rw-r--r--external/kafka/pom.xml2
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala162
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala369
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala224
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala59
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala178
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala46
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala70
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala73
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala92
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala99
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala8
12 files changed, 1376 insertions, 6 deletions
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
index b29b050965..af96138d79 100644
--- a/external/kafka/pom.xml
+++ b/external/kafka/pom.xml
@@ -44,7 +44,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
- <version>0.8.0</version>
+ <version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
new file mode 100644
index 0000000000..c7bca43eb8
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+
+/**
+ * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
+ * of messages
+ * per second that each '''partition''' will accept.
+ * Starting offsets are specified in advance,
+ * and this DStream is not responsible for committing offsets,
+ * so that you can control exactly-once semantics.
+ * For an easy interface to Kafka-managed offsets,
+ * see {@link org.apache.spark.streaming.kafka.KafkaCluster}
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>.
+ * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the stream
+ * @param messageHandler function for translating each message into the desired type
+ * @param maxRetries maximum number of times in a row to retry getting leaders' offsets
+ */
+private[streaming]
+class DirectKafkaInputDStream[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag,
+ R: ClassTag](
+ @transient ssc_ : StreamingContext,
+ val kafkaParams: Map[String, String],
+ val fromOffsets: Map[TopicAndPartition, Long],
+ messageHandler: MessageAndMetadata[K, V] => R
+) extends InputDStream[R](ssc_) with Logging {
+ val maxRetries = context.sparkContext.getConf.getInt(
+ "spark.streaming.kafka.maxRetries", 1)
+
+ protected[streaming] override val checkpointData =
+ new DirectKafkaInputDStreamCheckpointData
+
+ protected val kc = new KafkaCluster(kafkaParams)
+
+ protected val maxMessagesPerPartition: Option[Long] = {
+ val ratePerSec = context.sparkContext.getConf.getInt(
+ "spark.streaming.kafka.maxRatePerPartition", 0)
+ if (ratePerSec > 0) {
+ val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
+ Some((secsPerBatch * ratePerSec).toLong)
+ } else {
+ None
+ }
+ }
+
+ protected var currentOffsets = fromOffsets
+
+ @tailrec
+ protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
+ val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
+ // Either.fold would confuse @tailrec, do it manually
+ if (o.isLeft) {
+ val err = o.left.get.toString
+ if (retries <= 0) {
+ throw new SparkException(err)
+ } else {
+ log.error(err)
+ Thread.sleep(kc.config.refreshLeaderBackoffMs)
+ latestLeaderOffsets(retries - 1)
+ }
+ } else {
+ o.right.get
+ }
+ }
+
+ // limits the maximum number of messages per partition
+ protected def clamp(
+ leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
+ maxMessagesPerPartition.map { mmp =>
+ leaderOffsets.map { case (tp, lo) =>
+ tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))
+ }
+ }.getOrElse(leaderOffsets)
+ }
+
+ override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
+ val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
+ val rdd = KafkaRDD[K, V, U, T, R](
+ context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
+
+ currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
+ Some(rdd)
+ }
+
+ override def start(): Unit = {
+ }
+
+ def stop(): Unit = {
+ }
+
+ private[streaming]
+ class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
+ def batchForTime = data.asInstanceOf[mutable.HashMap[
+ Time, Array[OffsetRange.OffsetRangeTuple]]]
+
+ override def update(time: Time) {
+ batchForTime.clear()
+ generatedRDDs.foreach { kv =>
+ val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray
+ batchForTime += kv._1 -> a
+ }
+ }
+
+ override def cleanup(time: Time) { }
+
+ override def restore() {
+ // this is assuming that the topics don't change during execution, which is true currently
+ val topics = fromOffsets.keySet
+ val leaders = kc.findLeaders(topics).fold(
+ errs => throw new SparkException(errs.mkString("\n")),
+ ok => ok
+ )
+
+ batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
+ logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
+ generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
+ context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler)
+ }
+ }
+ }
+
+}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
new file mode 100644
index 0000000000..ccc62bfe8f
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import org.apache.spark.SparkException
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>.
+ * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
+ import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig}
+
+ // ConsumerConfig isn't serializable
+ @transient private var _config: SimpleConsumerConfig = null
+
+ def config: SimpleConsumerConfig = this.synchronized {
+ if (_config == null) {
+ _config = SimpleConsumerConfig(kafkaParams)
+ }
+ _config
+ }
+
+ def connect(host: String, port: Int): SimpleConsumer =
+ new SimpleConsumer(host, port, config.socketTimeoutMs,
+ config.socketReceiveBufferBytes, config.clientId)
+
+ def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] =
+ findLeader(topic, partition).right.map(hp => connect(hp._1, hp._2))
+
+ // Metadata api
+ // scalastyle:off
+ // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+ // scalastyle:on
+
+ def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = {
+ val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+ 0, config.clientId, Seq(topic))
+ val errs = new Err
+ withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+ val resp: TopicMetadataResponse = consumer.send(req)
+ resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata =>
+ tm.partitionsMetadata.find(_.partitionId == partition)
+ }.foreach { pm: PartitionMetadata =>
+ pm.leader.foreach { leader =>
+ return Right((leader.host, leader.port))
+ }
+ }
+ }
+ Left(errs)
+ }
+
+ def findLeaders(
+ topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+ val topics = topicAndPartitions.map(_.topic)
+ val response = getPartitionMetadata(topics).right
+ val answer = response.flatMap { tms: Set[TopicMetadata] =>
+ val leaderMap = tms.flatMap { tm: TopicMetadata =>
+ tm.partitionsMetadata.flatMap { pm: PartitionMetadata =>
+ val tp = TopicAndPartition(tm.topic, pm.partitionId)
+ if (topicAndPartitions(tp)) {
+ pm.leader.map { l =>
+ tp -> (l.host -> l.port)
+ }
+ } else {
+ None
+ }
+ }
+ }.toMap
+
+ if (leaderMap.keys.size == topicAndPartitions.size) {
+ Right(leaderMap)
+ } else {
+ val missing = topicAndPartitions.diff(leaderMap.keySet)
+ val err = new Err
+ err.append(new SparkException(s"Couldn't find leaders for ${missing}"))
+ Left(err)
+ }
+ }
+ answer
+ }
+
+ def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] = {
+ getPartitionMetadata(topics).right.map { r =>
+ r.flatMap { tm: TopicMetadata =>
+ tm.partitionsMetadata.map { pm: PartitionMetadata =>
+ TopicAndPartition(tm.topic, pm.partitionId)
+ }
+ }
+ }
+ }
+
+ def getPartitionMetadata(topics: Set[String]): Either[Err, Set[TopicMetadata]] = {
+ val req = TopicMetadataRequest(
+ TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq)
+ val errs = new Err
+ withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+ val resp: TopicMetadataResponse = consumer.send(req)
+ // error codes here indicate missing / just created topic,
+ // repeating on a different broker wont be useful
+ return Right(resp.topicsMetadata.toSet)
+ }
+ Left(errs)
+ }
+
+ // Leader offset api
+ // scalastyle:off
+ // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
+ // scalastyle:on
+
+ def getLatestLeaderOffsets(
+ topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
+ getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)
+
+ def getEarliestLeaderOffsets(
+ topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
+ getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime)
+
+ def getLeaderOffsets(
+ topicAndPartitions: Set[TopicAndPartition],
+ before: Long
+ ): Either[Err, Map[TopicAndPartition, LeaderOffset]] = {
+ getLeaderOffsets(topicAndPartitions, before, 1).right.map { r =>
+ r.map { kv =>
+ // mapValues isnt serializable, see SI-7005
+ kv._1 -> kv._2.head
+ }
+ }
+ }
+
+ private def flip[K, V](m: Map[K, V]): Map[V, Seq[K]] =
+ m.groupBy(_._2).map { kv =>
+ kv._1 -> kv._2.keys.toSeq
+ }
+
+ def getLeaderOffsets(
+ topicAndPartitions: Set[TopicAndPartition],
+ before: Long,
+ maxNumOffsets: Int
+ ): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = {
+ findLeaders(topicAndPartitions).right.flatMap { tpToLeader =>
+ val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader)
+ val leaders = leaderToTp.keys
+ var result = Map[TopicAndPartition, Seq[LeaderOffset]]()
+ val errs = new Err
+ withBrokers(leaders, errs) { consumer =>
+ val partitionsToGetOffsets: Seq[TopicAndPartition] =
+ leaderToTp((consumer.host, consumer.port))
+ val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition =>
+ tp -> PartitionOffsetRequestInfo(before, maxNumOffsets)
+ }.toMap
+ val req = OffsetRequest(reqMap)
+ val resp = consumer.getOffsetsBefore(req)
+ val respMap = resp.partitionErrorAndOffsets
+ partitionsToGetOffsets.foreach { tp: TopicAndPartition =>
+ respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
+ if (por.error == ErrorMapping.NoError) {
+ if (por.offsets.nonEmpty) {
+ result += tp -> por.offsets.map { off =>
+ LeaderOffset(consumer.host, consumer.port, off)
+ }
+ } else {
+ errs.append(new SparkException(
+ s"Empty offsets for ${tp}, is ${before} before log beginning?"))
+ }
+ } else {
+ errs.append(ErrorMapping.exceptionFor(por.error))
+ }
+ }
+ }
+ if (result.keys.size == topicAndPartitions.size) {
+ return Right(result)
+ }
+ }
+ val missing = topicAndPartitions.diff(result.keySet)
+ errs.append(new SparkException(s"Couldn't find leader offsets for ${missing}"))
+ Left(errs)
+ }
+ }
+
+ // Consumer offset api
+ // scalastyle:off
+ // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
+ // scalastyle:on
+
+ /** Requires Kafka >= 0.8.1.1 */
+ def getConsumerOffsets(
+ groupId: String,
+ topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, Long]] = {
+ getConsumerOffsetMetadata(groupId, topicAndPartitions).right.map { r =>
+ r.map { kv =>
+ kv._1 -> kv._2.offset
+ }
+ }
+ }
+
+ /** Requires Kafka >= 0.8.1.1 */
+ def getConsumerOffsetMetadata(
+ groupId: String,
+ topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
+ var result = Map[TopicAndPartition, OffsetMetadataAndError]()
+ val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq)
+ val errs = new Err
+ withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+ val resp = consumer.fetchOffsets(req)
+ val respMap = resp.requestInfo
+ val needed = topicAndPartitions.diff(result.keySet)
+ needed.foreach { tp: TopicAndPartition =>
+ respMap.get(tp).foreach { ome: OffsetMetadataAndError =>
+ if (ome.error == ErrorMapping.NoError) {
+ result += tp -> ome
+ } else {
+ errs.append(ErrorMapping.exceptionFor(ome.error))
+ }
+ }
+ }
+ if (result.keys.size == topicAndPartitions.size) {
+ return Right(result)
+ }
+ }
+ val missing = topicAndPartitions.diff(result.keySet)
+ errs.append(new SparkException(s"Couldn't find consumer offsets for ${missing}"))
+ Left(errs)
+ }
+
+ /** Requires Kafka >= 0.8.1.1 */
+ def setConsumerOffsets(
+ groupId: String,
+ offsets: Map[TopicAndPartition, Long]
+ ): Either[Err, Map[TopicAndPartition, Short]] = {
+ setConsumerOffsetMetadata(groupId, offsets.map { kv =>
+ kv._1 -> OffsetMetadataAndError(kv._2)
+ })
+ }
+
+ /** Requires Kafka >= 0.8.1.1 */
+ def setConsumerOffsetMetadata(
+ groupId: String,
+ metadata: Map[TopicAndPartition, OffsetMetadataAndError]
+ ): Either[Err, Map[TopicAndPartition, Short]] = {
+ var result = Map[TopicAndPartition, Short]()
+ val req = OffsetCommitRequest(groupId, metadata)
+ val errs = new Err
+ val topicAndPartitions = metadata.keySet
+ withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+ val resp = consumer.commitOffsets(req)
+ val respMap = resp.requestInfo
+ val needed = topicAndPartitions.diff(result.keySet)
+ needed.foreach { tp: TopicAndPartition =>
+ respMap.get(tp).foreach { err: Short =>
+ if (err == ErrorMapping.NoError) {
+ result += tp -> err
+ } else {
+ errs.append(ErrorMapping.exceptionFor(err))
+ }
+ }
+ }
+ if (result.keys.size == topicAndPartitions.size) {
+ return Right(result)
+ }
+ }
+ val missing = topicAndPartitions.diff(result.keySet)
+ errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
+ Left(errs)
+ }
+
+ // Try a call against potentially multiple brokers, accumulating errors
+ private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
+ (fn: SimpleConsumer => Any): Unit = {
+ brokers.foreach { hp =>
+ var consumer: SimpleConsumer = null
+ try {
+ consumer = connect(hp._1, hp._2)
+ fn(consumer)
+ } catch {
+ case NonFatal(e) =>
+ errs.append(e)
+ } finally {
+ if (consumer != null) {
+ consumer.close()
+ }
+ }
+ }
+ }
+}
+
+private[spark]
+object KafkaCluster {
+ type Err = ArrayBuffer[Throwable]
+
+ private[spark]
+ case class LeaderOffset(host: String, port: Int, offset: Long)
+
+ /**
+ * High-level kafka consumers connect to ZK. ConsumerConfig assumes this use case.
+ * Simple consumers connect directly to brokers, but need many of the same configs.
+ * This subclass won't warn about missing ZK params, or presence of broker params.
+ */
+ private[spark]
+ class SimpleConsumerConfig private(brokers: String, originalProps: Properties)
+ extends ConsumerConfig(originalProps) {
+ val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
+ val hpa = hp.split(":")
+ (hpa(0), hpa(1).toInt)
+ }
+ }
+
+ private[spark]
+ object SimpleConsumerConfig {
+ /**
+ * Make a consumer config without requiring group.id or zookeeper.connect,
+ * since communicating with brokers also needs common settings such as timeout
+ */
+ def apply(kafkaParams: Map[String, String]): SimpleConsumerConfig = {
+ // These keys are from other pre-existing kafka configs for specifying brokers, accept either
+ val brokers = kafkaParams.get("metadata.broker.list")
+ .orElse(kafkaParams.get("bootstrap.servers"))
+ .getOrElse(throw new SparkException(
+ "Must specify metadata.broker.list or bootstrap.servers"))
+
+ val props = new Properties()
+ kafkaParams.foreach { case (key, value) =>
+ // prevent warnings on parameters ConsumerConfig doesn't know about
+ if (key != "metadata.broker.list" && key != "bootstrap.servers") {
+ props.put(key, value)
+ }
+ }
+
+ Seq("zookeeper.connect", "group.id").foreach { s =>
+ if (!props.contains(s)) {
+ props.setProperty(s, "")
+ }
+ }
+
+ new SimpleConsumerConfig(brokers, props)
+ }
+ }
+}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
new file mode 100644
index 0000000000..50bf7cbdb8
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.spark.{Logging, Partition, SparkContext, SparkException, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.NextIterator
+
+import java.util.Properties
+import kafka.api.{FetchRequestBuilder, FetchResponse}
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import kafka.message.{MessageAndMetadata, MessageAndOffset}
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>.
+ * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param batch Each KafkaRDDPartition in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ * @param messageHandler function for translating each message into the desired type
+ */
+private[spark]
+class KafkaRDD[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag,
+ R: ClassTag] private[spark] (
+ sc: SparkContext,
+ kafkaParams: Map[String, String],
+ val offsetRanges: Array[OffsetRange],
+ leaders: Map[TopicAndPartition, (String, Int)],
+ messageHandler: MessageAndMetadata[K, V] => R
+ ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
+ override def getPartitions: Array[Partition] = {
+ offsetRanges.zipWithIndex.map { case (o, i) =>
+ val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
+ new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
+ }.toArray
+ }
+
+ override def getPreferredLocations(thePart: Partition): Seq[String] = {
+ val part = thePart.asInstanceOf[KafkaRDDPartition]
+ // TODO is additional hostname resolution necessary here
+ Seq(part.host)
+ }
+
+ private def errBeginAfterEnd(part: KafkaRDDPartition): String =
+ s"Beginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} " +
+ s"for topic ${part.topic} partition ${part.partition}. " +
+ "You either provided an invalid fromOffset, or the Kafka topic has been damaged"
+
+ private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
+ s"Ran out of messages before reaching ending offset ${part.untilOffset} " +
+ s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
+ " This should not happen, and indicates that messages may have been lost"
+
+ private def errOvershotEnd(itemOffset: Long, part: KafkaRDDPartition): String =
+ s"Got ${itemOffset} > ending offset ${part.untilOffset} " +
+ s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
+ " This should not happen, and indicates a message may have been skipped"
+
+ override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
+ val part = thePart.asInstanceOf[KafkaRDDPartition]
+ assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
+ if (part.fromOffset == part.untilOffset) {
+ log.warn("Beginning offset ${part.fromOffset} is the same as ending offset " +
+ s"skipping ${part.topic} ${part.partition}")
+ Iterator.empty
+ } else {
+ new KafkaRDDIterator(part, context)
+ }
+ }
+
+ private class KafkaRDDIterator(
+ part: KafkaRDDPartition,
+ context: TaskContext) extends NextIterator[R] {
+
+ context.addTaskCompletionListener{ context => closeIfNeeded() }
+
+ log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
+ s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+
+ val kc = new KafkaCluster(kafkaParams)
+ val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(kc.config.props)
+ .asInstanceOf[Decoder[K]]
+ val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(kc.config.props)
+ .asInstanceOf[Decoder[V]]
+ val consumer = connectLeader
+ var requestOffset = part.fromOffset
+ var iter: Iterator[MessageAndOffset] = null
+
+ // The idea is to use the provided preferred host, except on task retry atttempts,
+ // to minimize number of kafka metadata requests
+ private def connectLeader: SimpleConsumer = {
+ if (context.attemptNumber > 0) {
+ kc.connectLeader(part.topic, part.partition).fold(
+ errs => throw new SparkException(
+ s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " +
+ errs.mkString("\n")),
+ consumer => consumer
+ )
+ } else {
+ kc.connect(part.host, part.port)
+ }
+ }
+
+ private def handleFetchErr(resp: FetchResponse) {
+ if (resp.hasError) {
+ val err = resp.errorCode(part.topic, part.partition)
+ if (err == ErrorMapping.LeaderNotAvailableCode ||
+ err == ErrorMapping.NotLeaderForPartitionCode) {
+ log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
+ s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
+ Thread.sleep(kc.config.refreshLeaderBackoffMs)
+ }
+ // Let normal rdd retry sort out reconnect attempts
+ throw ErrorMapping.exceptionFor(err)
+ }
+ }
+
+ private def fetchBatch: Iterator[MessageAndOffset] = {
+ val req = new FetchRequestBuilder()
+ .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
+ .build()
+ val resp = consumer.fetch(req)
+ handleFetchErr(resp)
+ // kafka may return a batch that starts before the requested offset
+ resp.messageSet(part.topic, part.partition)
+ .iterator
+ .dropWhile(_.offset < requestOffset)
+ }
+
+ override def close() = consumer.close()
+
+ override def getNext(): R = {
+ if (iter == null || !iter.hasNext) {
+ iter = fetchBatch
+ }
+ if (!iter.hasNext) {
+ assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
+ finished = true
+ null.asInstanceOf[R]
+ } else {
+ val item = iter.next()
+ if (item.offset >= part.untilOffset) {
+ assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
+ finished = true
+ null.asInstanceOf[R]
+ } else {
+ requestOffset = item.nextOffset
+ messageHandler(new MessageAndMetadata(
+ part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
+ }
+ }
+ }
+ }
+}
+
+private[spark]
+object KafkaRDD {
+ import KafkaCluster.LeaderOffset
+
+ /**
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>.
+ * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the batch
+ * @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive)
+ * ending point of the batch
+ * @param messageHandler function for translating each message into the desired type
+ */
+ def apply[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag,
+ R: ClassTag](
+ sc: SparkContext,
+ kafkaParams: Map[String, String],
+ fromOffsets: Map[TopicAndPartition, Long],
+ untilOffsets: Map[TopicAndPartition, LeaderOffset],
+ messageHandler: MessageAndMetadata[K, V] => R
+ ): KafkaRDD[K, V, U, T, R] = {
+ val leaders = untilOffsets.map { case (tp, lo) =>
+ tp -> (lo.host, lo.port)
+ }.toMap
+
+ val offsetRanges = fromOffsets.map { case (tp, fo) =>
+ val uo = untilOffsets(tp)
+ OffsetRange(tp.topic, tp.partition, fo, uo.offset)
+ }.toArray
+
+ new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, messageHandler)
+ }
+}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
new file mode 100644
index 0000000000..36372e08f6
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import org.apache.spark.Partition
+
+/** @param topic kafka topic name
+ * @param partition kafka partition id
+ * @param fromOffset inclusive starting offset
+ * @param untilOffset exclusive ending offset
+ * @param host preferred kafka host, i.e. the leader at the time the rdd was created
+ * @param port preferred kafka host's port
+ */
+private[spark]
+class KafkaRDDPartition(
+ val index: Int,
+ val topic: String,
+ val partition: Int,
+ val fromOffset: Long,
+ val untilOffset: Long,
+ val host: String,
+ val port: Int
+) extends Partition
+
+private[spark]
+object KafkaRDDPartition {
+ def apply(
+ index: Int,
+ topic: String,
+ partition: Int,
+ fromOffset: Long,
+ untilOffset: Long,
+ host: String,
+ port: Int
+ ): KafkaRDDPartition = new KafkaRDDPartition(
+ index,
+ topic,
+ partition,
+ fromOffset,
+ untilOffset,
+ host,
+ port
+ )
+}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index df725f0c65..f8aa6c5c62 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -23,12 +23,18 @@ import java.util.{Map => JMap}
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
import kafka.serializer.{Decoder, StringDecoder}
+
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
object KafkaUtils {
/**
@@ -144,4 +150,174 @@ object KafkaUtils {
createStream[K, V, U, T](
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}
+
+ /** A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param sc SparkContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>.
+ * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param offsetRanges Each OffsetRange in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ */
+ @Experimental
+ def createRDD[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag] (
+ sc: SparkContext,
+ kafkaParams: Map[String, String],
+ offsetRanges: Array[OffsetRange]
+ ): RDD[(K, V)] = {
+ val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
+ val kc = new KafkaCluster(kafkaParams)
+ val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet
+ val leaders = kc.findLeaders(topics).fold(
+ errs => throw new SparkException(errs.mkString("\n")),
+ ok => ok
+ )
+ new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
+ }
+
+ /** A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param sc SparkContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>.
+ * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param offsetRanges Each OffsetRange in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ * @param leaders Kafka leaders for each offset range in batch
+ * @param messageHandler function for translating each message into the desired type
+ */
+ @Experimental
+ def createRDD[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag,
+ R: ClassTag] (
+ sc: SparkContext,
+ kafkaParams: Map[String, String],
+ offsetRanges: Array[OffsetRange],
+ leaders: Array[Leader],
+ messageHandler: MessageAndMetadata[K, V] => R
+ ): RDD[R] = {
+
+ val leaderMap = leaders
+ .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
+ .toMap
+ new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
+ }
+
+ /**
+ * This stream can guarantee that each message from Kafka is included in transformations
+ * (as opposed to output actions) exactly once, even in most failure situations.
+ *
+ * Points to note:
+ *
+ * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them
+ * as the fromOffsets parameter on restart.
+ * Kafka must have sufficient log retention to obtain messages after failure.
+ *
+ * Getting offsets from the stream - see programming guide
+ *
+. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors
+ * that depend on Zookeeper, you must store offsets in ZK yourself.
+ *
+ * End-to-end semantics - This does not guarantee that any output operation will push each record
+ * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and
+ * outputting exactly once), you have to either ensure that the output operation is
+ * idempotent, or transactionally store offsets with the output. See the programming guide for
+ * more details.
+ *
+ * @param ssc StreamingContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>.
+ * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param messageHandler function for translating each message into the desired type
+ * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the stream
+ */
+ @Experimental
+ def createDirectStream[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag,
+ R: ClassTag] (
+ ssc: StreamingContext,
+ kafkaParams: Map[String, String],
+ fromOffsets: Map[TopicAndPartition, Long],
+ messageHandler: MessageAndMetadata[K, V] => R
+ ): InputDStream[R] = {
+ new DirectKafkaInputDStream[K, V, U, T, R](
+ ssc, kafkaParams, fromOffsets, messageHandler)
+ }
+
+ /**
+ * This stream can guarantee that each message from Kafka is included in transformations
+ * (as opposed to output actions) exactly once, even in most failure situations.
+ *
+ * Points to note:
+ *
+ * Failure Recovery - You must checkpoint this stream.
+ * Kafka must have sufficient log retention to obtain messages after failure.
+ *
+ * Getting offsets from the stream - see programming guide
+ *
+. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors
+ * that depend on Zookeeper, you must store offsets in ZK yourself.
+ *
+ * End-to-end semantics - This does not guarantee that any output operation will push each record
+ * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and
+ * outputting exactly once), you have to ensure that the output operation is idempotent.
+ *
+ * @param ssc StreamingContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>.
+ * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * If starting without a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
+ * to determine where the stream starts (defaults to "largest")
+ * @param topics names of the topics to consume
+ */
+ @Experimental
+ def createDirectStream[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag] (
+ ssc: StreamingContext,
+ kafkaParams: Map[String, String],
+ topics: Set[String]
+ ): InputDStream[(K, V)] = {
+ val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
+ val kc = new KafkaCluster(kafkaParams)
+ val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
+
+ (for {
+ topicPartitions <- kc.getPartitions(topics).right
+ leaderOffsets <- (if (reset == Some("smallest")) {
+ kc.getEarliestLeaderOffsets(topicPartitions)
+ } else {
+ kc.getLatestLeaderOffsets(topicPartitions)
+ }).right
+ } yield {
+ val fromOffsets = leaderOffsets.map { case (tp, lo) =>
+ (tp, lo.offset)
+ }
+ new DirectKafkaInputDStream[K, V, U, T, (K, V)](
+ ssc, kafkaParams, fromOffsets, messageHandler)
+ }).fold(
+ errs => throw new SparkException(errs.mkString("\n")),
+ ok => ok
+ )
+ }
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
new file mode 100644
index 0000000000..3454d92e72
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.common.TopicAndPartition
+
+/** Host info for the leader of a Kafka TopicAndPartition */
+final class Leader private(
+ /** kafka topic name */
+ val topic: String,
+ /** kafka partition id */
+ val partition: Int,
+ /** kafka hostname */
+ val host: String,
+ /** kafka host's port */
+ val port: Int) extends Serializable
+
+object Leader {
+ def create(topic: String, partition: Int, host: String, port: Int): Leader =
+ new Leader(topic, partition, host, port)
+
+ def create(topicAndPartition: TopicAndPartition, host: String, port: Int): Leader =
+ new Leader(topicAndPartition.topic, topicAndPartition.partition, host, port)
+
+ def apply(topic: String, partition: Int, host: String, port: Int): Leader =
+ new Leader(topic, partition, host, port)
+
+ def apply(topicAndPartition: TopicAndPartition, host: String, port: Int): Leader =
+ new Leader(topicAndPartition.topic, topicAndPartition.partition, host, port)
+
+}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
new file mode 100644
index 0000000000..334c12e462
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.common.TopicAndPartition
+
+/** Something that has a collection of OffsetRanges */
+trait HasOffsetRanges {
+ def offsetRanges: Array[OffsetRange]
+}
+
+/** Represents a range of offsets from a single Kafka TopicAndPartition */
+final class OffsetRange private(
+ /** kafka topic name */
+ val topic: String,
+ /** kafka partition id */
+ val partition: Int,
+ /** inclusive starting offset */
+ val fromOffset: Long,
+ /** exclusive ending offset */
+ val untilOffset: Long) extends Serializable {
+ import OffsetRange.OffsetRangeTuple
+
+ /** this is to avoid ClassNotFoundException during checkpoint restore */
+ private[streaming]
+ def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset)
+}
+
+object OffsetRange {
+ def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
+ new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+ def create(
+ topicAndPartition: TopicAndPartition,
+ fromOffset: Long,
+ untilOffset: Long): OffsetRange =
+ new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)
+
+ def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
+ new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+ def apply(
+ topicAndPartition: TopicAndPartition,
+ fromOffset: Long,
+ untilOffset: Long): OffsetRange =
+ new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)
+
+ /** this is to avoid ClassNotFoundException during checkpoint restore */
+ private[spark]
+ type OffsetRangeTuple = (String, Int, Long, Long)
+
+ private[streaming]
+ def apply(t: OffsetRangeTuple) =
+ new OffsetRange(t._1, t._2, t._3, t._4)
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
new file mode 100644
index 0000000000..e57c8f6987
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfter
+import kafka.common.TopicAndPartition
+
+class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+ val brokerHost = "localhost"
+
+ val kafkaParams = Map("metadata.broker.list" -> s"$brokerHost:$brokerPort")
+
+ val kc = new KafkaCluster(kafkaParams)
+
+ val topic = "kcsuitetopic" + Random.nextInt(10000)
+
+ val topicAndPartition = TopicAndPartition(topic, 0)
+
+ before {
+ setupKafka()
+ createTopic(topic)
+ produceAndSendMessage(topic, Map("a" -> 1))
+ }
+
+ after {
+ tearDownKafka()
+ }
+
+ test("metadata apis") {
+ val leader = kc.findLeaders(Set(topicAndPartition)).right.get
+ assert(leader(topicAndPartition) === (brokerHost, brokerPort), "didn't get leader")
+
+ val parts = kc.getPartitions(Set(topic)).right.get
+ assert(parts(topicAndPartition), "didn't get partitions")
+ }
+
+ test("leader offset apis") {
+ val earliest = kc.getEarliestLeaderOffsets(Set(topicAndPartition)).right.get
+ assert(earliest(topicAndPartition).offset === 0, "didn't get earliest")
+
+ val latest = kc.getLatestLeaderOffsets(Set(topicAndPartition)).right.get
+ assert(latest(topicAndPartition).offset === 1, "didn't get latest")
+ }
+
+ test("consumer offset apis") {
+ val group = "kcsuitegroup" + Random.nextInt(10000)
+
+ val offset = Random.nextInt(10000)
+
+ val set = kc.setConsumerOffsets(group, Map(topicAndPartition -> offset))
+ assert(set.isRight, "didn't set consumer offsets")
+
+ val get = kc.getConsumerOffsets(group, Set(topicAndPartition)).right.get
+ assert(get(topicAndPartition) === offset, "didn't get consumer offsets")
+ }
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala
new file mode 100644
index 0000000000..0891ce344f
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+import scala.concurrent.duration._
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually
+
+import kafka.serializer.StringDecoder
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+
+class KafkaDirectStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
+ val sparkConf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName(this.getClass.getSimpleName)
+
+ val brokerHost = "localhost"
+
+ val kafkaParams = Map(
+ "metadata.broker.list" -> s"$brokerHost:$brokerPort",
+ "auto.offset.reset" -> "smallest"
+ )
+
+ var ssc: StreamingContext = _
+
+ before {
+ setupKafka()
+
+ ssc = new StreamingContext(sparkConf, Milliseconds(500))
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ }
+ tearDownKafka()
+ }
+
+ test("multi topic stream") {
+ val topics = Set("newA", "newB")
+ val data = Map("a" -> 7, "b" -> 9)
+ topics.foreach { t =>
+ createTopic(t)
+ produceAndSendMessage(t, data)
+ }
+ val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics)
+ var total = 0L;
+
+ stream.foreachRDD { rdd =>
+ val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
+ val off = offsets(i)
+ val all = iter.toSeq
+ val partSize = all.size
+ val rangeSize = off.untilOffset - off.fromOffset
+ all.map { _ =>
+ (partSize, rangeSize)
+ }.toIterator
+ }.collect
+ collected.foreach { case (partSize, rangeSize) =>
+ assert(partSize === rangeSize, "offset ranges are wrong")
+ }
+ total += collected.size
+ }
+ ssc.start()
+ eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+ assert(total === data.values.sum * topics.size, "didn't get all messages")
+ }
+ ssc.stop()
+ }
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
new file mode 100644
index 0000000000..9b9e3f5fce
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+
+import kafka.serializer.StringDecoder
+import kafka.common.TopicAndPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+
+class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+ var sc: SparkContext = _
+ before {
+ setupKafka()
+ }
+
+ after {
+ if (sc != null) {
+ sc.stop
+ sc = null
+ }
+ tearDownKafka()
+ }
+
+ test("Kafka RDD") {
+ val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
+ sc = new SparkContext(sparkConf)
+ val topic = "topic1"
+ val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
+ createTopic(topic)
+ produceAndSendMessage(topic, sent)
+
+ val kafkaParams = Map("metadata.broker.list" -> s"localhost:$brokerPort",
+ "group.id" -> s"test-consumer-${Random.nextInt(10000)}")
+
+ val kc = new KafkaCluster(kafkaParams)
+
+ val rdd = getRdd(kc, Set(topic))
+ // this is the "lots of messages" case
+ // make sure we get all of them
+ assert(rdd.isDefined)
+ assert(rdd.get.count === sent.values.sum)
+
+ kc.setConsumerOffsets(
+ kafkaParams("group.id"),
+ rdd.get.offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap)
+
+ val rdd2 = getRdd(kc, Set(topic))
+ val sent2 = Map("d" -> 1)
+ produceAndSendMessage(topic, sent2)
+ // this is the "0 messages" case
+ // make sure we dont get anything, since messages were sent after rdd was defined
+ assert(rdd2.isDefined)
+ assert(rdd2.get.count === 0)
+
+ val rdd3 = getRdd(kc, Set(topic))
+ produceAndSendMessage(topic, Map("extra" -> 22))
+ // this is the "exactly 1 message" case
+ // make sure we get exactly one message, despite there being lots more available
+ assert(rdd3.isDefined)
+ assert(rdd3.get.count === sent2.values.sum)
+
+ }
+
+ // get an rdd from the committed consumer offsets until the latest leader offsets,
+ private def getRdd(kc: KafkaCluster, topics: Set[String]) = {
+ val groupId = kc.kafkaParams("group.id")
+ for {
+ topicPartitions <- kc.getPartitions(topics).right.toOption
+ from <- kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
+ kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs =>
+ offs.map(kv => kv._1 -> kv._2.offset)
+ }
+ )
+ until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption
+ } yield {
+ KafkaRDD[String, String, StringDecoder, StringDecoder, String](
+ sc, kc.kafkaParams, from, until, mmd => s"${mmd.offset} ${mmd.message}")
+ }
+ }
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 0817c56d8f..f207dc6d4f 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -26,7 +26,7 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
import kafka.common.{KafkaException, TopicAndPartition}
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.serializer.{StringDecoder, StringEncoder}
@@ -56,7 +56,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
private val zkSessionTimeout = 6000
private var zookeeper: EmbeddedZookeeper = _
private var zkPort: Int = 0
- private var brokerPort = 9092
+ protected var brokerPort = 9092
private var brokerConf: KafkaConfig = _
private var server: KafkaServer = _
private var producer: Producer[String, String] = _
@@ -130,7 +130,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
}
def createTopic(topic: String) {
- CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+ AdminUtils.createTopic(zkClient, topic, 1, 1)
logInfo("==================== 5 ====================")
// wait until metadata is propagated
waitUntilMetadataIsPropagated(topic, 0)
@@ -166,7 +166,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
private def waitUntilMetadataIsPropagated(topic: String, partition: Int) {
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(
- server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)),
+ server.apis.metadataCache.containsTopicAndPartition(topic, partition),
s"Partition [$topic, $partition] metadata not propagated after timeout"
)
}