aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-8
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2016-05-11 12:15:41 -0700
committerReynold Xin <rxin@databricks.com>2016-05-11 12:15:41 -0700
commit89e67d6667d5f8be9c6fb6c120fbcd350ae2950d (patch)
tree670699f20dcc785e1889c8c1afc4db1a0b2b11ee /external/kafka-0-8
parent6d0368ab8d1043735e5fe89f801aae1c6826876c (diff)
downloadspark-89e67d6667d5f8be9c6fb6c120fbcd350ae2950d.tar.gz
spark-89e67d6667d5f8be9c6fb6c120fbcd350ae2950d.tar.bz2
spark-89e67d6667d5f8be9c6fb6c120fbcd350ae2950d.zip
[SPARK-15085][STREAMING][KAFKA] Rename streaming-kafka artifact
## What changes were proposed in this pull request? Renaming the streaming-kafka artifact to include kafka version, in anticipation of needing a different artifact for later kafka versions ## How was this patch tested? Unit tests Author: cody koeninger <cody@koeninger.org> Closes #12946 from koeninger/SPARK-15085.
Diffstat (limited to 'external/kafka-0-8')
-rw-r--r--external/kafka-0-8/pom.xml98
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala66
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala227
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala425
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala142
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala269
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala42
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala275
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala805
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala109
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala302
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java21
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala23
-rw-r--r--external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java175
-rw-r--r--external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java156
-rw-r--r--external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java135
-rw-r--r--external/kafka-0-8/src/test/resources/log4j.properties28
-rw-r--r--external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala531
-rw-r--r--external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala81
-rw-r--r--external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala175
-rw-r--r--external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala84
-rw-r--r--external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala148
22 files changed, 4317 insertions, 0 deletions
diff --git a/external/kafka-0-8/pom.xml b/external/kafka-0-8/pom.xml
new file mode 100644
index 0000000000..cccfda3c61
--- /dev/null
+++ b/external/kafka-0-8/pom.xml
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
+ <properties>
+ <sbt.project.name>streaming-kafka-0-8</sbt.project.name>
+ </properties>
+ <packaging>jar</packaging>
+ <name>Spark Integration for Kafka 0.8</name>
+ <url>http://spark.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ <version>0.8.2.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.jopt-simple</groupId>
+ <artifactId>jopt-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>net.sf.jopt-simple</groupId>
+ <artifactId>jopt-simple</artifactId>
+ <version>3.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+</project>
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
new file mode 100644
index 0000000000..9159051ba0
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * Represents the host and port info for a Kafka broker.
+ * Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID.
+ */
+final class Broker private(
+ /** Broker's hostname */
+ val host: String,
+ /** Broker's port */
+ val port: Int) extends Serializable {
+ override def equals(obj: Any): Boolean = obj match {
+ case that: Broker =>
+ this.host == that.host &&
+ this.port == that.port
+ case _ => false
+ }
+
+ override def hashCode: Int = {
+ 41 * (41 + host.hashCode) + port
+ }
+
+ override def toString(): String = {
+ s"Broker($host, $port)"
+ }
+}
+
+/**
+ * :: Experimental ::
+ * Companion object that provides methods to create instances of [[Broker]].
+ */
+@Experimental
+object Broker {
+ def create(host: String, port: Int): Broker =
+ new Broker(host, port)
+
+ def apply(host: String, port: Int): Broker =
+ new Broker(host, port)
+
+ def unapply(broker: Broker): Option[(String, Int)] = {
+ if (broker == null) {
+ None
+ } else {
+ Some((broker.host, broker.port))
+ }
+ }
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
new file mode 100644
index 0000000000..fb58ed7898
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
+ * of messages
+ * per second that each '''partition''' will accept.
+ * Starting offsets are specified in advance,
+ * and this DStream is not responsible for committing offsets,
+ * so that you can control exactly-once semantics.
+ * For an easy interface to Kafka-managed offsets,
+ * see {@link org.apache.spark.streaming.kafka.KafkaCluster}
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>.
+ * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the stream
+ * @param messageHandler function for translating each message into the desired type
+ */
+private[streaming]
+class DirectKafkaInputDStream[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[K]: ClassTag,
+ T <: Decoder[V]: ClassTag,
+ R: ClassTag](
+ _ssc: StreamingContext,
+ val kafkaParams: Map[String, String],
+ val fromOffsets: Map[TopicAndPartition, Long],
+ messageHandler: MessageAndMetadata[K, V] => R
+ ) extends InputDStream[R](_ssc) with Logging {
+ val maxRetries = context.sparkContext.getConf.getInt(
+ "spark.streaming.kafka.maxRetries", 1)
+
+ // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
+ private[streaming] override def name: String = s"Kafka direct stream [$id]"
+
+ protected[streaming] override val checkpointData =
+ new DirectKafkaInputDStreamCheckpointData
+
+
+ /**
+ * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
+ */
+ override protected[streaming] val rateController: Option[RateController] = {
+ if (RateController.isBackPressureEnabled(ssc.conf)) {
+ Some(new DirectKafkaRateController(id,
+ RateEstimator.create(ssc.conf, context.graph.batchDuration)))
+ } else {
+ None
+ }
+ }
+
+ protected val kc = new KafkaCluster(kafkaParams)
+
+ private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
+ "spark.streaming.kafka.maxRatePerPartition", 0)
+
+ protected[streaming] def maxMessagesPerPartition(
+ offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = {
+ val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
+
+ // calculate a per-partition rate limit based on current lag
+ val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
+ case Some(rate) =>
+ val lagPerPartition = offsets.map { case (tp, offset) =>
+ tp -> Math.max(offset - currentOffsets(tp), 0)
+ }
+ val totalLag = lagPerPartition.values.sum
+
+ lagPerPartition.map { case (tp, lag) =>
+ val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
+ tp -> (if (maxRateLimitPerPartition > 0) {
+ Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
+ }
+ case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }
+ }
+
+ if (effectiveRateLimitPerPartition.values.sum > 0) {
+ val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
+ Some(effectiveRateLimitPerPartition.map {
+ case (tp, limit) => tp -> (secsPerBatch * limit).toLong
+ })
+ } else {
+ None
+ }
+ }
+
+ protected var currentOffsets = fromOffsets
+
+ @tailrec
+ protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
+ val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
+ // Either.fold would confuse @tailrec, do it manually
+ if (o.isLeft) {
+ val err = o.left.get.toString
+ if (retries <= 0) {
+ throw new SparkException(err)
+ } else {
+ log.error(err)
+ Thread.sleep(kc.config.refreshLeaderBackoffMs)
+ latestLeaderOffsets(retries - 1)
+ }
+ } else {
+ o.right.get
+ }
+ }
+
+ // limits the maximum number of messages per partition
+ protected def clamp(
+ leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
+ val offsets = leaderOffsets.mapValues(lo => lo.offset)
+
+ maxMessagesPerPartition(offsets).map { mmp =>
+ mmp.map { case (tp, messages) =>
+ val lo = leaderOffsets(tp)
+ tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, lo.offset))
+ }
+ }.getOrElse(leaderOffsets)
+ }
+
+ override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
+ val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
+ val rdd = KafkaRDD[K, V, U, T, R](
+ context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
+
+ // Report the record number and metadata of this batch interval to InputInfoTracker.
+ val offsetRanges = currentOffsets.map { case (tp, fo) =>
+ val uo = untilOffsets(tp)
+ OffsetRange(tp.topic, tp.partition, fo, uo.offset)
+ }
+ val description = offsetRanges.filter { offsetRange =>
+ // Don't display empty ranges.
+ offsetRange.fromOffset != offsetRange.untilOffset
+ }.map { offsetRange =>
+ s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
+ s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+ }.mkString("\n")
+ // Copy offsetRanges to immutable.List to prevent from being modified by the user
+ val metadata = Map(
+ "offsets" -> offsetRanges.toList,
+ StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
+ val inputInfo = StreamInputInfo(id, rdd.count, metadata)
+ ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
+
+ currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
+ Some(rdd)
+ }
+
+ override def start(): Unit = {
+ }
+
+ def stop(): Unit = {
+ }
+
+ private[streaming]
+ class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
+ def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
+ data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
+ }
+
+ override def update(time: Time) {
+ batchForTime.clear()
+ generatedRDDs.foreach { kv =>
+ val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray
+ batchForTime += kv._1 -> a
+ }
+ }
+
+ override def cleanup(time: Time) { }
+
+ override def restore() {
+ // this is assuming that the topics don't change during execution, which is true currently
+ val topics = fromOffsets.keySet
+ val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))
+
+ batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
+ logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
+ generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
+ context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler)
+ }
+ }
+ }
+
+ /**
+ * A RateController to retrieve the rate from RateEstimator.
+ */
+ private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator)
+ extends RateController(id, estimator) {
+ override def publish(rate: Long): Unit = ()
+ }
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
new file mode 100644
index 0000000000..726b5d8ec3
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+import scala.util.control.NonFatal
+
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Convenience methods for interacting with a Kafka cluster.
+ * See <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">
+ * A Guide To The Kafka Protocol</a> for more details on individual api calls.
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>.
+ * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+@DeveloperApi
+class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
+ import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig}
+
+ // ConsumerConfig isn't serializable
+ @transient private var _config: SimpleConsumerConfig = null
+
+ def config: SimpleConsumerConfig = this.synchronized {
+ if (_config == null) {
+ _config = SimpleConsumerConfig(kafkaParams)
+ }
+ _config
+ }
+
+ def connect(host: String, port: Int): SimpleConsumer =
+ new SimpleConsumer(host, port, config.socketTimeoutMs,
+ config.socketReceiveBufferBytes, config.clientId)
+
+ def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] =
+ findLeader(topic, partition).right.map(hp => connect(hp._1, hp._2))
+
+ // Metadata api
+ // scalastyle:off
+ // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+ // scalastyle:on
+
+ def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = {
+ val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+ 0, config.clientId, Seq(topic))
+ val errs = new Err
+ withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+ val resp: TopicMetadataResponse = consumer.send(req)
+ resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata =>
+ tm.partitionsMetadata.find(_.partitionId == partition)
+ }.foreach { pm: PartitionMetadata =>
+ pm.leader.foreach { leader =>
+ return Right((leader.host, leader.port))
+ }
+ }
+ }
+ Left(errs)
+ }
+
+ def findLeaders(
+ topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+ val topics = topicAndPartitions.map(_.topic)
+ val response = getPartitionMetadata(topics).right
+ val answer = response.flatMap { tms: Set[TopicMetadata] =>
+ val leaderMap = tms.flatMap { tm: TopicMetadata =>
+ tm.partitionsMetadata.flatMap { pm: PartitionMetadata =>
+ val tp = TopicAndPartition(tm.topic, pm.partitionId)
+ if (topicAndPartitions(tp)) {
+ pm.leader.map { l =>
+ tp -> (l.host -> l.port)
+ }
+ } else {
+ None
+ }
+ }
+ }.toMap
+
+ if (leaderMap.keys.size == topicAndPartitions.size) {
+ Right(leaderMap)
+ } else {
+ val missing = topicAndPartitions.diff(leaderMap.keySet)
+ val err = new Err
+ err.append(new SparkException(s"Couldn't find leaders for ${missing}"))
+ Left(err)
+ }
+ }
+ answer
+ }
+
+ def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] = {
+ getPartitionMetadata(topics).right.map { r =>
+ r.flatMap { tm: TopicMetadata =>
+ tm.partitionsMetadata.map { pm: PartitionMetadata =>
+ TopicAndPartition(tm.topic, pm.partitionId)
+ }
+ }
+ }
+ }
+
+ def getPartitionMetadata(topics: Set[String]): Either[Err, Set[TopicMetadata]] = {
+ val req = TopicMetadataRequest(
+ TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq)
+ val errs = new Err
+ withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+ val resp: TopicMetadataResponse = consumer.send(req)
+ val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError)
+
+ if (respErrs.isEmpty) {
+ return Right(resp.topicsMetadata.toSet)
+ } else {
+ respErrs.foreach { m =>
+ val cause = ErrorMapping.exceptionFor(m.errorCode)
+ val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?"
+ errs.append(new SparkException(msg, cause))
+ }
+ }
+ }
+ Left(errs)
+ }
+
+ // Leader offset api
+ // scalastyle:off
+ // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
+ // scalastyle:on
+
+ def getLatestLeaderOffsets(
+ topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
+ getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)
+
+ def getEarliestLeaderOffsets(
+ topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
+ getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime)
+
+ def getLeaderOffsets(
+ topicAndPartitions: Set[TopicAndPartition],
+ before: Long
+ ): Either[Err, Map[TopicAndPartition, LeaderOffset]] = {
+ getLeaderOffsets(topicAndPartitions, before, 1).right.map { r =>
+ r.map { kv =>
+ // mapValues isn't serializable, see SI-7005
+ kv._1 -> kv._2.head
+ }
+ }
+ }
+
+ private def flip[K, V](m: Map[K, V]): Map[V, Seq[K]] =
+ m.groupBy(_._2).map { kv =>
+ kv._1 -> kv._2.keys.toSeq
+ }
+
+ def getLeaderOffsets(
+ topicAndPartitions: Set[TopicAndPartition],
+ before: Long,
+ maxNumOffsets: Int
+ ): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = {
+ findLeaders(topicAndPartitions).right.flatMap { tpToLeader =>
+ val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader)
+ val leaders = leaderToTp.keys
+ var result = Map[TopicAndPartition, Seq[LeaderOffset]]()
+ val errs = new Err
+ withBrokers(leaders, errs) { consumer =>
+ val partitionsToGetOffsets: Seq[TopicAndPartition] =
+ leaderToTp((consumer.host, consumer.port))
+ val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition =>
+ tp -> PartitionOffsetRequestInfo(before, maxNumOffsets)
+ }.toMap
+ val req = OffsetRequest(reqMap)
+ val resp = consumer.getOffsetsBefore(req)
+ val respMap = resp.partitionErrorAndOffsets
+ partitionsToGetOffsets.foreach { tp: TopicAndPartition =>
+ respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
+ if (por.error == ErrorMapping.NoError) {
+ if (por.offsets.nonEmpty) {
+ result += tp -> por.offsets.map { off =>
+ LeaderOffset(consumer.host, consumer.port, off)
+ }
+ } else {
+ errs.append(new SparkException(
+ s"Empty offsets for ${tp}, is ${before} before log beginning?"))
+ }
+ } else {
+ errs.append(ErrorMapping.exceptionFor(por.error))
+ }
+ }
+ }
+ if (result.keys.size == topicAndPartitions.size) {
+ return Right(result)
+ }
+ }
+ val missing = topicAndPartitions.diff(result.keySet)
+ errs.append(new SparkException(s"Couldn't find leader offsets for ${missing}"))
+ Left(errs)
+ }
+ }
+
+ // Consumer offset api
+ // scalastyle:off
+ // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
+ // scalastyle:on
+
+ // this 0 here indicates api version, in this case the original ZK backed api.
+ private def defaultConsumerApiVersion: Short = 0
+
+ /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
+ def getConsumerOffsets(
+ groupId: String,
+ topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, Long]] =
+ getConsumerOffsets(groupId, topicAndPartitions, defaultConsumerApiVersion)
+
+ def getConsumerOffsets(
+ groupId: String,
+ topicAndPartitions: Set[TopicAndPartition],
+ consumerApiVersion: Short
+ ): Either[Err, Map[TopicAndPartition, Long]] = {
+ getConsumerOffsetMetadata(groupId, topicAndPartitions, consumerApiVersion).right.map { r =>
+ r.map { kv =>
+ kv._1 -> kv._2.offset
+ }
+ }
+ }
+
+ /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
+ def getConsumerOffsetMetadata(
+ groupId: String,
+ topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] =
+ getConsumerOffsetMetadata(groupId, topicAndPartitions, defaultConsumerApiVersion)
+
+ def getConsumerOffsetMetadata(
+ groupId: String,
+ topicAndPartitions: Set[TopicAndPartition],
+ consumerApiVersion: Short
+ ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
+ var result = Map[TopicAndPartition, OffsetMetadataAndError]()
+ val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, consumerApiVersion)
+ val errs = new Err
+ withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+ val resp = consumer.fetchOffsets(req)
+ val respMap = resp.requestInfo
+ val needed = topicAndPartitions.diff(result.keySet)
+ needed.foreach { tp: TopicAndPartition =>
+ respMap.get(tp).foreach { ome: OffsetMetadataAndError =>
+ if (ome.error == ErrorMapping.NoError) {
+ result += tp -> ome
+ } else {
+ errs.append(ErrorMapping.exceptionFor(ome.error))
+ }
+ }
+ }
+ if (result.keys.size == topicAndPartitions.size) {
+ return Right(result)
+ }
+ }
+ val missing = topicAndPartitions.diff(result.keySet)
+ errs.append(new SparkException(s"Couldn't find consumer offsets for ${missing}"))
+ Left(errs)
+ }
+
+ /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
+ def setConsumerOffsets(
+ groupId: String,
+ offsets: Map[TopicAndPartition, Long]
+ ): Either[Err, Map[TopicAndPartition, Short]] =
+ setConsumerOffsets(groupId, offsets, defaultConsumerApiVersion)
+
+ def setConsumerOffsets(
+ groupId: String,
+ offsets: Map[TopicAndPartition, Long],
+ consumerApiVersion: Short
+ ): Either[Err, Map[TopicAndPartition, Short]] = {
+ val meta = offsets.map { kv =>
+ kv._1 -> OffsetAndMetadata(kv._2)
+ }
+ setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
+ }
+
+ /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
+ def setConsumerOffsetMetadata(
+ groupId: String,
+ metadata: Map[TopicAndPartition, OffsetAndMetadata]
+ ): Either[Err, Map[TopicAndPartition, Short]] =
+ setConsumerOffsetMetadata(groupId, metadata, defaultConsumerApiVersion)
+
+ def setConsumerOffsetMetadata(
+ groupId: String,
+ metadata: Map[TopicAndPartition, OffsetAndMetadata],
+ consumerApiVersion: Short
+ ): Either[Err, Map[TopicAndPartition, Short]] = {
+ var result = Map[TopicAndPartition, Short]()
+ val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
+ val errs = new Err
+ val topicAndPartitions = metadata.keySet
+ withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+ val resp = consumer.commitOffsets(req)
+ val respMap = resp.commitStatus
+ val needed = topicAndPartitions.diff(result.keySet)
+ needed.foreach { tp: TopicAndPartition =>
+ respMap.get(tp).foreach { err: Short =>
+ if (err == ErrorMapping.NoError) {
+ result += tp -> err
+ } else {
+ errs.append(ErrorMapping.exceptionFor(err))
+ }
+ }
+ }
+ if (result.keys.size == topicAndPartitions.size) {
+ return Right(result)
+ }
+ }
+ val missing = topicAndPartitions.diff(result.keySet)
+ errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
+ Left(errs)
+ }
+
+ // Try a call against potentially multiple brokers, accumulating errors
+ private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
+ (fn: SimpleConsumer => Any): Unit = {
+ brokers.foreach { hp =>
+ var consumer: SimpleConsumer = null
+ try {
+ consumer = connect(hp._1, hp._2)
+ fn(consumer)
+ } catch {
+ case NonFatal(e) =>
+ errs.append(e)
+ } finally {
+ if (consumer != null) {
+ consumer.close()
+ }
+ }
+ }
+ }
+}
+
+@DeveloperApi
+object KafkaCluster {
+ type Err = ArrayBuffer[Throwable]
+
+ /** If the result is right, return it, otherwise throw SparkException */
+ def checkErrors[T](result: Either[Err, T]): T = {
+ result.fold(
+ errs => throw new SparkException(errs.mkString("\n")),
+ ok => ok
+ )
+ }
+
+ case class LeaderOffset(host: String, port: Int, offset: Long)
+
+ /**
+ * High-level kafka consumers connect to ZK. ConsumerConfig assumes this use case.
+ * Simple consumers connect directly to brokers, but need many of the same configs.
+ * This subclass won't warn about missing ZK params, or presence of broker params.
+ */
+ class SimpleConsumerConfig private(brokers: String, originalProps: Properties)
+ extends ConsumerConfig(originalProps) {
+ val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
+ val hpa = hp.split(":")
+ if (hpa.size == 1) {
+ throw new SparkException(s"Broker not in the correct format of <host>:<port> [$brokers]")
+ }
+ (hpa(0), hpa(1).toInt)
+ }
+ }
+
+ object SimpleConsumerConfig {
+ /**
+ * Make a consumer config without requiring group.id or zookeeper.connect,
+ * since communicating with brokers also needs common settings such as timeout
+ */
+ def apply(kafkaParams: Map[String, String]): SimpleConsumerConfig = {
+ // These keys are from other pre-existing kafka configs for specifying brokers, accept either
+ val brokers = kafkaParams.get("metadata.broker.list")
+ .orElse(kafkaParams.get("bootstrap.servers"))
+ .getOrElse(throw new SparkException(
+ "Must specify metadata.broker.list or bootstrap.servers"))
+
+ val props = new Properties()
+ kafkaParams.foreach { case (key, value) =>
+ // prevent warnings on parameters ConsumerConfig doesn't know about
+ if (key != "metadata.broker.list" && key != "bootstrap.servers") {
+ props.put(key, value)
+ }
+ }
+
+ Seq("zookeeper.connect", "group.id").foreach { s =>
+ if (!props.containsKey(s)) {
+ props.setProperty(s, "")
+ }
+ }
+
+ new SimpleConsumerConfig(brokers, props)
+ }
+ }
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
new file mode 100644
index 0000000000..3713bda41b
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.util.Properties
+
+import scala.collection.Map
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Input stream that pulls messages from a Kafka Broker.
+ *
+ * @param kafkaParams Map of kafka configuration parameters.
+ * See: http://kafka.apache.org/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel RDD storage level.
+ */
+private[streaming]
+class KafkaInputDStream[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag](
+ _ssc: StreamingContext,
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ useReliableReceiver: Boolean,
+ storageLevel: StorageLevel
+ ) extends ReceiverInputDStream[(K, V)](_ssc) with Logging {
+
+ def getReceiver(): Receiver[(K, V)] = {
+ if (!useReliableReceiver) {
+ new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
+ } else {
+ new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
+ }
+ }
+}
+
+private[streaming]
+class KafkaReceiver[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
+ ) extends Receiver[(K, V)](storageLevel) with Logging {
+
+ // Connection to Kafka
+ var consumerConnector: ConsumerConnector = null
+
+ def onStop() {
+ if (consumerConnector != null) {
+ consumerConnector.shutdown()
+ consumerConnector = null
+ }
+ }
+
+ def onStart() {
+
+ logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
+
+ // Kafka connection properties
+ val props = new Properties()
+ kafkaParams.foreach(param => props.put(param._1, param._2))
+
+ val zkConnect = kafkaParams("zookeeper.connect")
+ // Create the connection to the cluster
+ logInfo("Connecting to Zookeeper: " + zkConnect)
+ val consumerConfig = new ConsumerConfig(props)
+ consumerConnector = Consumer.create(consumerConfig)
+ logInfo("Connected to " + zkConnect)
+
+ val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[K]]
+ val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[V]]
+
+ // Create threads for each topic/message Stream we are listening
+ val topicMessageStreams = consumerConnector.createMessageStreams(
+ topics, keyDecoder, valueDecoder)
+
+ val executorPool =
+ ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
+ try {
+ // Start the messages handler for each partition
+ topicMessageStreams.values.foreach { streams =>
+ streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
+ }
+ } finally {
+ executorPool.shutdown() // Just causes threads to terminate after work is done
+ }
+ }
+
+ // Handles Kafka messages
+ private class MessageHandler(stream: KafkaStream[K, V])
+ extends Runnable {
+ def run() {
+ logInfo("Starting MessageHandler.")
+ try {
+ val streamIterator = stream.iterator()
+ while (streamIterator.hasNext()) {
+ val msgAndMetadata = streamIterator.next()
+ store((msgAndMetadata.key, msgAndMetadata.message))
+ }
+ } catch {
+ case e: Throwable => reportError("Error handling message; exiting", e)
+ }
+ }
+ }
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
new file mode 100644
index 0000000000..d4881b140d
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.api.{FetchRequestBuilder, FetchResponse}
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.consumer.SimpleConsumer
+import kafka.message.{MessageAndMetadata, MessageAndOffset}
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
+
+import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.NextIterator
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
+ * @param messageHandler function for translating each message into the desired type
+ */
+private[kafka]
+class KafkaRDD[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag,
+ R: ClassTag] private[spark] (
+ sc: SparkContext,
+ kafkaParams: Map[String, String],
+ val offsetRanges: Array[OffsetRange],
+ leaders: Map[TopicAndPartition, (String, Int)],
+ messageHandler: MessageAndMetadata[K, V] => R
+ ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
+ override def getPartitions: Array[Partition] = {
+ offsetRanges.zipWithIndex.map { case (o, i) =>
+ val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
+ new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
+ }.toArray
+ }
+
+ override def count(): Long = offsetRanges.map(_.count).sum
+
+ override def countApprox(
+ timeout: Long,
+ confidence: Double = 0.95
+ ): PartialResult[BoundedDouble] = {
+ val c = count
+ new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
+ }
+
+ override def isEmpty(): Boolean = count == 0L
+
+ override def take(num: Int): Array[R] = {
+ val nonEmptyPartitions = this.partitions
+ .map(_.asInstanceOf[KafkaRDDPartition])
+ .filter(_.count > 0)
+
+ if (num < 1 || nonEmptyPartitions.isEmpty) {
+ return new Array[R](0)
+ }
+
+ // Determine in advance how many messages need to be taken from each partition
+ val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
+ val remain = num - result.values.sum
+ if (remain > 0) {
+ val taken = Math.min(remain, part.count)
+ result + (part.index -> taken.toInt)
+ } else {
+ result
+ }
+ }
+
+ val buf = new ArrayBuffer[R]
+ val res = context.runJob(
+ this,
+ (tc: TaskContext, it: Iterator[R]) => it.take(parts(tc.partitionId)).toArray,
+ parts.keys.toArray)
+ res.foreach(buf ++= _)
+ buf.toArray
+ }
+
+ override def getPreferredLocations(thePart: Partition): Seq[String] = {
+ val part = thePart.asInstanceOf[KafkaRDDPartition]
+ // TODO is additional hostname resolution necessary here
+ Seq(part.host)
+ }
+
+ private def errBeginAfterEnd(part: KafkaRDDPartition): String =
+ s"Beginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} " +
+ s"for topic ${part.topic} partition ${part.partition}. " +
+ "You either provided an invalid fromOffset, or the Kafka topic has been damaged"
+
+ private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
+ s"Ran out of messages before reaching ending offset ${part.untilOffset} " +
+ s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
+ " This should not happen, and indicates that messages may have been lost"
+
+ private def errOvershotEnd(itemOffset: Long, part: KafkaRDDPartition): String =
+ s"Got ${itemOffset} > ending offset ${part.untilOffset} " +
+ s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
+ " This should not happen, and indicates a message may have been skipped"
+
+ override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
+ val part = thePart.asInstanceOf[KafkaRDDPartition]
+ assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
+ if (part.fromOffset == part.untilOffset) {
+ log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
+ s"skipping ${part.topic} ${part.partition}")
+ Iterator.empty
+ } else {
+ new KafkaRDDIterator(part, context)
+ }
+ }
+
+ private class KafkaRDDIterator(
+ part: KafkaRDDPartition,
+ context: TaskContext) extends NextIterator[R] {
+
+ context.addTaskCompletionListener{ context => closeIfNeeded() }
+
+ log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
+ s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+
+ val kc = new KafkaCluster(kafkaParams)
+ val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(kc.config.props)
+ .asInstanceOf[Decoder[K]]
+ val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(kc.config.props)
+ .asInstanceOf[Decoder[V]]
+ val consumer = connectLeader
+ var requestOffset = part.fromOffset
+ var iter: Iterator[MessageAndOffset] = null
+
+ // The idea is to use the provided preferred host, except on task retry attempts,
+ // to minimize number of kafka metadata requests
+ private def connectLeader: SimpleConsumer = {
+ if (context.attemptNumber > 0) {
+ kc.connectLeader(part.topic, part.partition).fold(
+ errs => throw new SparkException(
+ s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " +
+ errs.mkString("\n")),
+ consumer => consumer
+ )
+ } else {
+ kc.connect(part.host, part.port)
+ }
+ }
+
+ private def handleFetchErr(resp: FetchResponse) {
+ if (resp.hasError) {
+ val err = resp.errorCode(part.topic, part.partition)
+ if (err == ErrorMapping.LeaderNotAvailableCode ||
+ err == ErrorMapping.NotLeaderForPartitionCode) {
+ log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
+ s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
+ Thread.sleep(kc.config.refreshLeaderBackoffMs)
+ }
+ // Let normal rdd retry sort out reconnect attempts
+ throw ErrorMapping.exceptionFor(err)
+ }
+ }
+
+ private def fetchBatch: Iterator[MessageAndOffset] = {
+ val req = new FetchRequestBuilder()
+ .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
+ .build()
+ val resp = consumer.fetch(req)
+ handleFetchErr(resp)
+ // kafka may return a batch that starts before the requested offset
+ resp.messageSet(part.topic, part.partition)
+ .iterator
+ .dropWhile(_.offset < requestOffset)
+ }
+
+ override def close(): Unit = {
+ if (consumer != null) {
+ consumer.close()
+ }
+ }
+
+ override def getNext(): R = {
+ if (iter == null || !iter.hasNext) {
+ iter = fetchBatch
+ }
+ if (!iter.hasNext) {
+ assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
+ finished = true
+ null.asInstanceOf[R]
+ } else {
+ val item = iter.next()
+ if (item.offset >= part.untilOffset) {
+ assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
+ finished = true
+ null.asInstanceOf[R]
+ } else {
+ requestOffset = item.nextOffset
+ messageHandler(new MessageAndMetadata(
+ part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
+ }
+ }
+ }
+ }
+}
+
+private[kafka]
+object KafkaRDD {
+ import KafkaCluster.LeaderOffset
+
+ /**
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>.
+ * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the batch
+ * @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive)
+ * ending point of the batch
+ * @param messageHandler function for translating each message into the desired type
+ */
+ def apply[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag,
+ R: ClassTag](
+ sc: SparkContext,
+ kafkaParams: Map[String, String],
+ fromOffsets: Map[TopicAndPartition, Long],
+ untilOffsets: Map[TopicAndPartition, LeaderOffset],
+ messageHandler: MessageAndMetadata[K, V] => R
+ ): KafkaRDD[K, V, U, T, R] = {
+ val leaders = untilOffsets.map { case (tp, lo) =>
+ tp -> (lo.host, lo.port)
+ }.toMap
+
+ val offsetRanges = fromOffsets.map { case (tp, fo) =>
+ val uo = untilOffsets(tp)
+ OffsetRange(tp.topic, tp.partition, fo, uo.offset)
+ }.toArray
+
+ new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, messageHandler)
+ }
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
new file mode 100644
index 0000000000..02917becf0
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import org.apache.spark.Partition
+
+/**
+ * @param topic kafka topic name
+ * @param partition kafka partition id
+ * @param fromOffset inclusive starting offset
+ * @param untilOffset exclusive ending offset
+ * @param host preferred kafka host, i.e. the leader at the time the rdd was created
+ * @param port preferred kafka host's port
+ */
+private[kafka]
+class KafkaRDDPartition(
+ val index: Int,
+ val topic: String,
+ val partition: Int,
+ val fromOffset: Long,
+ val untilOffset: Long,
+ val host: String,
+ val port: Int
+) extends Partition {
+ /** Number of messages this partition refers to */
+ def count(): Long = untilOffset - fromOffset
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
new file mode 100644
index 0000000000..d9d4240c05
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.io.File
+import java.lang.{Integer => JInt}
+import java.net.InetSocketAddress
+import java.util.{Map => JMap, Properties}
+import java.util.concurrent.TimeoutException
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.control.NonFatal
+
+import kafka.admin.AdminUtils
+import kafka.api.Request
+import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
+import kafka.serializer.StringEncoder
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{ZKStringSerializer, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
+import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.Time
+import org.apache.spark.util.Utils
+
+/**
+ * This is a helper class for Kafka test suites. This has the functionality to set up
+ * and tear down local Kafka servers, and to push data using Kafka producers.
+ *
+ * The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
+ */
+private[kafka] class KafkaTestUtils extends Logging {
+
+ // Zookeeper related configurations
+ private val zkHost = "localhost"
+ private var zkPort: Int = 0
+ private val zkConnectionTimeout = 60000
+ private val zkSessionTimeout = 6000
+
+ private var zookeeper: EmbeddedZookeeper = _
+
+ private var zkClient: ZkClient = _
+
+ // Kafka broker related configurations
+ private val brokerHost = "localhost"
+ private var brokerPort = 9092
+ private var brokerConf: KafkaConfig = _
+
+ // Kafka broker server
+ private var server: KafkaServer = _
+
+ // Kafka producer
+ private var producer: Producer[String, String] = _
+
+ // Flag to test whether the system is correctly started
+ private var zkReady = false
+ private var brokerReady = false
+
+ def zkAddress: String = {
+ assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address")
+ s"$zkHost:$zkPort"
+ }
+
+ def brokerAddress: String = {
+ assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address")
+ s"$brokerHost:$brokerPort"
+ }
+
+ def zookeeperClient: ZkClient = {
+ assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
+ Option(zkClient).getOrElse(
+ throw new IllegalStateException("Zookeeper client is not yet initialized"))
+ }
+
+ // Set up the Embedded Zookeeper server and get the proper Zookeeper port
+ private def setupEmbeddedZookeeper(): Unit = {
+ // Zookeeper server startup
+ zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+ // Get the actual zookeeper binding port
+ zkPort = zookeeper.actualPort
+ zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout,
+ ZKStringSerializer)
+ zkReady = true
+ }
+
+ // Set up the Embedded Kafka server
+ private def setupEmbeddedKafkaServer(): Unit = {
+ assert(zkReady, "Zookeeper should be set up beforehand")
+
+ // Kafka broker startup
+ Utils.startServiceOnPort(brokerPort, port => {
+ brokerPort = port
+ brokerConf = new KafkaConfig(brokerConfiguration)
+ server = new KafkaServer(brokerConf)
+ server.startup()
+ (server, port)
+ }, new SparkConf(), "KafkaBroker")
+
+ brokerReady = true
+ }
+
+ /** setup the whole embedded servers, including Zookeeper and Kafka brokers */
+ def setup(): Unit = {
+ setupEmbeddedZookeeper()
+ setupEmbeddedKafkaServer()
+ }
+
+ /** Teardown the whole servers, including Kafka broker and Zookeeper */
+ def teardown(): Unit = {
+ brokerReady = false
+ zkReady = false
+
+ if (producer != null) {
+ producer.close()
+ producer = null
+ }
+
+ if (server != null) {
+ server.shutdown()
+ server = null
+ }
+
+ brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+
+ if (zkClient != null) {
+ zkClient.close()
+ zkClient = null
+ }
+
+ if (zookeeper != null) {
+ zookeeper.shutdown()
+ zookeeper = null
+ }
+ }
+
+ /** Create a Kafka topic and wait until it is propagated to the whole cluster */
+ def createTopic(topic: String, partitions: Int): Unit = {
+ AdminUtils.createTopic(zkClient, topic, partitions, 1)
+ // wait until metadata is propagated
+ (0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, p) }
+ }
+
+ /** Single-argument version for backwards compatibility */
+ def createTopic(topic: String): Unit = createTopic(topic, 1)
+
+ /** Java-friendly function for sending messages to the Kafka broker */
+ def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
+ sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
+ }
+
+ /** Send the messages to the Kafka broker */
+ def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
+ val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray
+ sendMessages(topic, messages)
+ }
+
+ /** Send the array of messages to the Kafka broker */
+ def sendMessages(topic: String, messages: Array[String]): Unit = {
+ producer = new Producer[String, String](new ProducerConfig(producerConfiguration))
+ producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
+ producer.close()
+ producer = null
+ }
+
+ private def brokerConfiguration: Properties = {
+ val props = new Properties()
+ props.put("broker.id", "0")
+ props.put("host.name", "localhost")
+ props.put("port", brokerPort.toString)
+ props.put("log.dir", Utils.createTempDir().getAbsolutePath)
+ props.put("zookeeper.connect", zkAddress)
+ props.put("log.flush.interval.messages", "1")
+ props.put("replica.socket.timeout.ms", "1500")
+ props
+ }
+
+ private def producerConfiguration: Properties = {
+ val props = new Properties()
+ props.put("metadata.broker.list", brokerAddress)
+ props.put("serializer.class", classOf[StringEncoder].getName)
+ // wait for all in-sync replicas to ack sends
+ props.put("request.required.acks", "-1")
+ props
+ }
+
+ // A simplified version of scalatest eventually, rewritten here to avoid adding extra test
+ // dependency
+ def eventually[T](timeout: Time, interval: Time)(func: => T): T = {
+ def makeAttempt(): Either[Throwable, T] = {
+ try {
+ Right(func)
+ } catch {
+ case e if NonFatal(e) => Left(e)
+ }
+ }
+
+ val startTime = System.currentTimeMillis()
+ @tailrec
+ def tryAgain(attempt: Int): T = {
+ makeAttempt() match {
+ case Right(result) => result
+ case Left(e) =>
+ val duration = System.currentTimeMillis() - startTime
+ if (duration < timeout.milliseconds) {
+ Thread.sleep(interval.milliseconds)
+ } else {
+ throw new TimeoutException(e.getMessage)
+ }
+
+ tryAgain(attempt + 1)
+ }
+ }
+
+ tryAgain(1)
+ }
+
+ private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
+ def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
+ case Some(partitionState) =>
+ val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
+
+ ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined &&
+ Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
+ leaderAndInSyncReplicas.isr.size >= 1
+
+ case _ =>
+ false
+ }
+ eventually(Time(10000), Time(100)) {
+ assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
+ }
+ }
+
+ private class EmbeddedZookeeper(val zkConnect: String) {
+ val snapshotDir = Utils.createTempDir()
+ val logDir = Utils.createTempDir()
+
+ val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
+ val (ip, port) = {
+ val splits = zkConnect.split(":")
+ (splits(0), splits(1).toInt)
+ }
+ val factory = new NIOServerCnxnFactory()
+ factory.configure(new InetSocketAddress(ip, port), 16)
+ factory.startup(zookeeper)
+
+ val actualPort = factory.getLocalPort
+
+ def shutdown() {
+ factory.shutdown()
+ Utils.deleteRecursively(snapshotDir)
+ Utils.deleteRecursively(logDir)
+ }
+ }
+}
+
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
new file mode 100644
index 0000000000..edaafb912c
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.io.OutputStream
+import java.lang.{Integer => JInt, Long => JLong}
+import java.nio.charset.StandardCharsets
+import java.util.{List => JList, Map => JMap, Set => JSet}
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder}
+import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
+
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.api.python.SerDeUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java._
+import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
+import org.apache.spark.streaming.util.WriteAheadLogUtils
+
+object KafkaUtils {
+ /**
+ * Create an input stream that pulls messages from Kafka Brokers.
+ * @param ssc StreamingContext object
+ * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
+ * @param groupId The group id for this consumer
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ * @return DStream of (Kafka message key, Kafka message value)
+ */
+ def createStream(
+ ssc: StreamingContext,
+ zkQuorum: String,
+ groupId: String,
+ topics: Map[String, Int],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): ReceiverInputDStream[(String, String)] = {
+ val kafkaParams = Map[String, String](
+ "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
+ "zookeeper.connection.timeout.ms" -> "10000")
+ createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics, storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages from Kafka Brokers.
+ * @param ssc StreamingContext object
+ * @param kafkaParams Map of kafka configuration parameters,
+ * see http://kafka.apache.org/08/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam U type of Kafka message key decoder
+ * @tparam T type of Kafka message value decoder
+ * @return DStream of (Kafka message key, Kafka message value)
+ */
+ def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
+ ssc: StreamingContext,
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
+ ): ReceiverInputDStream[(K, V)] = {
+ val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
+ new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages from Kafka Brokers.
+ * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param jssc JavaStreamingContext object
+ * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
+ * @param groupId The group id for this consumer
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread
+ * @return DStream of (Kafka message key, Kafka message value)
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ zkQuorum: String,
+ groupId: String,
+ topics: JMap[String, JInt]
+ ): JavaPairReceiverInputDStream[String, String] = {
+ createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*))
+ }
+
+ /**
+ * Create an input stream that pulls messages from Kafka Brokers.
+ * @param jssc JavaStreamingContext object
+ * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..).
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel RDD storage level.
+ * @return DStream of (Kafka message key, Kafka message value)
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ zkQuorum: String,
+ groupId: String,
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel
+ ): JavaPairReceiverInputDStream[String, String] = {
+ createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
+ storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages from Kafka Brokers.
+ * @param jssc JavaStreamingContext object
+ * @param keyTypeClass Key type of DStream
+ * @param valueTypeClass value type of Dstream
+ * @param keyDecoderClass Type of kafka key decoder
+ * @param valueDecoderClass Type of kafka value decoder
+ * @param kafkaParams Map of kafka configuration parameters,
+ * see http://kafka.apache.org/08/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread
+ * @param storageLevel RDD storage level.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam U type of Kafka message key decoder
+ * @tparam T type of Kafka message value decoder
+ * @return DStream of (Kafka message key, Kafka message value)
+ */
+ def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
+ jssc: JavaStreamingContext,
+ keyTypeClass: Class[K],
+ valueTypeClass: Class[V],
+ keyDecoderClass: Class[U],
+ valueDecoderClass: Class[T],
+ kafkaParams: JMap[String, String],
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel
+ ): JavaPairReceiverInputDStream[K, V] = {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)
+
+ implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
+ implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)
+
+ createStream[K, V, U, T](
+ jssc.ssc,
+ kafkaParams.asScala.toMap,
+ Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
+ storageLevel)
+ }
+
+ /** get leaders for the given offset ranges, or throw an exception */
+ private def leadersForRanges(
+ kc: KafkaCluster,
+ offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, Int)] = {
+ val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet
+ val leaders = kc.findLeaders(topics)
+ KafkaCluster.checkErrors(leaders)
+ }
+
+ /** Make sure offsets are available in kafka, or throw an exception */
+ private def checkOffsets(
+ kc: KafkaCluster,
+ offsetRanges: Array[OffsetRange]): Unit = {
+ val topics = offsetRanges.map(_.topicAndPartition).toSet
+ val result = for {
+ low <- kc.getEarliestLeaderOffsets(topics).right
+ high <- kc.getLatestLeaderOffsets(topics).right
+ } yield {
+ offsetRanges.filterNot { o =>
+ low(o.topicAndPartition).offset <= o.fromOffset &&
+ o.untilOffset <= high(o.topicAndPartition).offset
+ }
+ }
+ val badRanges = KafkaCluster.checkErrors(result)
+ if (!badRanges.isEmpty) {
+ throw new SparkException("Offsets not available on leader: " + badRanges.mkString(","))
+ }
+ }
+
+ private[kafka] def getFromOffsets(
+ kc: KafkaCluster,
+ kafkaParams: Map[String, String],
+ topics: Set[String]
+ ): Map[TopicAndPartition, Long] = {
+ val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
+ val result = for {
+ topicPartitions <- kc.getPartitions(topics).right
+ leaderOffsets <- (if (reset == Some("smallest")) {
+ kc.getEarliestLeaderOffsets(topicPartitions)
+ } else {
+ kc.getLatestLeaderOffsets(topicPartitions)
+ }).right
+ } yield {
+ leaderOffsets.map { case (tp, lo) =>
+ (tp, lo.offset)
+ }
+ }
+ KafkaCluster.checkErrors(result)
+ }
+
+ /**
+ * Create a RDD from Kafka using offset ranges for each topic and partition.
+ *
+ * @param sc SparkContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param offsetRanges Each OffsetRange in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @return RDD of (Kafka message key, Kafka message value)
+ */
+ def createRDD[
+ K: ClassTag,
+ V: ClassTag,
+ KD <: Decoder[K]: ClassTag,
+ VD <: Decoder[V]: ClassTag](
+ sc: SparkContext,
+ kafkaParams: Map[String, String],
+ offsetRanges: Array[OffsetRange]
+ ): RDD[(K, V)] = sc.withScope {
+ val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
+ val kc = new KafkaCluster(kafkaParams)
+ val leaders = leadersForRanges(kc, offsetRanges)
+ checkOffsets(kc, offsetRanges)
+ new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
+ }
+
+ /**
+ * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
+ * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
+ * as the metadata.
+ *
+ * @param sc SparkContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param offsetRanges Each OffsetRange in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
+ * in which case leaders will be looked up on the driver.
+ * @param messageHandler Function for translating each message and metadata into the desired type
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @tparam R type returned by messageHandler
+ * @return RDD of R
+ */
+ def createRDD[
+ K: ClassTag,
+ V: ClassTag,
+ KD <: Decoder[K]: ClassTag,
+ VD <: Decoder[V]: ClassTag,
+ R: ClassTag](
+ sc: SparkContext,
+ kafkaParams: Map[String, String],
+ offsetRanges: Array[OffsetRange],
+ leaders: Map[TopicAndPartition, Broker],
+ messageHandler: MessageAndMetadata[K, V] => R
+ ): RDD[R] = sc.withScope {
+ val kc = new KafkaCluster(kafkaParams)
+ val leaderMap = if (leaders.isEmpty) {
+ leadersForRanges(kc, offsetRanges)
+ } else {
+ // This could be avoided by refactoring KafkaRDD.leaders and KafkaCluster to use Broker
+ leaders.map {
+ case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
+ }
+ }
+ val cleanedHandler = sc.clean(messageHandler)
+ checkOffsets(kc, offsetRanges)
+ new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, cleanedHandler)
+ }
+
+ /**
+ * Create a RDD from Kafka using offset ranges for each topic and partition.
+ *
+ * @param jsc JavaSparkContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param offsetRanges Each OffsetRange in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ * @param keyClass type of Kafka message key
+ * @param valueClass type of Kafka message value
+ * @param keyDecoderClass type of Kafka message key decoder
+ * @param valueDecoderClass type of Kafka message value decoder
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @return RDD of (Kafka message key, Kafka message value)
+ */
+ def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
+ jsc: JavaSparkContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ keyDecoderClass: Class[KD],
+ valueDecoderClass: Class[VD],
+ kafkaParams: JMap[String, String],
+ offsetRanges: Array[OffsetRange]
+ ): JavaPairRDD[K, V] = jsc.sc.withScope {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+ implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+ new JavaPairRDD(createRDD[K, V, KD, VD](
+ jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges))
+ }
+
+ /**
+ * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
+ * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
+ * as the metadata.
+ *
+ * @param jsc JavaSparkContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param offsetRanges Each OffsetRange in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
+ * in which case leaders will be looked up on the driver.
+ * @param messageHandler Function for translating each message and metadata into the desired type
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @tparam R type returned by messageHandler
+ * @return RDD of R
+ */
+ def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
+ jsc: JavaSparkContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ keyDecoderClass: Class[KD],
+ valueDecoderClass: Class[VD],
+ recordClass: Class[R],
+ kafkaParams: JMap[String, String],
+ offsetRanges: Array[OffsetRange],
+ leaders: JMap[TopicAndPartition, Broker],
+ messageHandler: JFunction[MessageAndMetadata[K, V], R]
+ ): JavaRDD[R] = jsc.sc.withScope {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+ implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+ implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+ val leaderMap = Map(leaders.asScala.toSeq: _*)
+ createRDD[K, V, KD, VD, R](
+ jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges, leaderMap, messageHandler.call(_))
+ }
+
+ /**
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
+ *
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself. For interoperability with Kafka monitoring tools that depend on
+ * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
+ *
+ * @param ssc StreamingContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the stream
+ * @param messageHandler Function for translating each message and metadata into the desired type
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @tparam R type returned by messageHandler
+ * @return DStream of R
+ */
+ def createDirectStream[
+ K: ClassTag,
+ V: ClassTag,
+ KD <: Decoder[K]: ClassTag,
+ VD <: Decoder[V]: ClassTag,
+ R: ClassTag] (
+ ssc: StreamingContext,
+ kafkaParams: Map[String, String],
+ fromOffsets: Map[TopicAndPartition, Long],
+ messageHandler: MessageAndMetadata[K, V] => R
+ ): InputDStream[R] = {
+ val cleanedHandler = ssc.sc.clean(messageHandler)
+ new DirectKafkaInputDStream[K, V, KD, VD, R](
+ ssc, kafkaParams, fromOffsets, cleanedHandler)
+ }
+
+ /**
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
+ *
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself. For interoperability with Kafka monitoring tools that depend on
+ * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
+ *
+ * @param ssc StreamingContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+ * host1:port1,host2:port2 form.
+ * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
+ * to determine where the stream starts (defaults to "largest")
+ * @param topics Names of the topics to consume
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @return DStream of (Kafka message key, Kafka message value)
+ */
+ def createDirectStream[
+ K: ClassTag,
+ V: ClassTag,
+ KD <: Decoder[K]: ClassTag,
+ VD <: Decoder[V]: ClassTag] (
+ ssc: StreamingContext,
+ kafkaParams: Map[String, String],
+ topics: Set[String]
+ ): InputDStream[(K, V)] = {
+ val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
+ val kc = new KafkaCluster(kafkaParams)
+ val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
+ new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
+ ssc, kafkaParams, fromOffsets, messageHandler)
+ }
+
+ /**
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
+ *
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself. For interoperability with Kafka monitoring tools that depend on
+ * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param keyClass Class of the keys in the Kafka records
+ * @param valueClass Class of the values in the Kafka records
+ * @param keyDecoderClass Class of the key decoder
+ * @param valueDecoderClass Class of the value decoder
+ * @param recordClass Class of the records in DStream
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+ * host1:port1,host2:port2 form.
+ * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the stream
+ * @param messageHandler Function for translating each message and metadata into the desired type
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @tparam R type returned by messageHandler
+ * @return DStream of R
+ */
+ def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
+ jssc: JavaStreamingContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ keyDecoderClass: Class[KD],
+ valueDecoderClass: Class[VD],
+ recordClass: Class[R],
+ kafkaParams: JMap[String, String],
+ fromOffsets: JMap[TopicAndPartition, JLong],
+ messageHandler: JFunction[MessageAndMetadata[K, V], R]
+ ): JavaInputDStream[R] = {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+ implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+ implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+ val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
+ createDirectStream[K, V, KD, VD, R](
+ jssc.ssc,
+ Map(kafkaParams.asScala.toSeq: _*),
+ Map(fromOffsets.asScala.mapValues(_.longValue()).toSeq: _*),
+ cleanedHandler
+ )
+ }
+
+ /**
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
+ *
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself. For interoperability with Kafka monitoring tools that depend on
+ * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param keyClass Class of the keys in the Kafka records
+ * @param valueClass Class of the values in the Kafka records
+ * @param keyDecoderClass Class of the key decoder
+ * @param valueDecoderClass Class type of the value decoder
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+ * host1:port1,host2:port2 form.
+ * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
+ * to determine where the stream starts (defaults to "largest")
+ * @param topics Names of the topics to consume
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @return DStream of (Kafka message key, Kafka message value)
+ */
+ def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](
+ jssc: JavaStreamingContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ keyDecoderClass: Class[KD],
+ valueDecoderClass: Class[VD],
+ kafkaParams: JMap[String, String],
+ topics: JSet[String]
+ ): JavaPairInputDStream[K, V] = {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+ implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+ createDirectStream[K, V, KD, VD](
+ jssc.ssc,
+ Map(kafkaParams.asScala.toSeq: _*),
+ Set(topics.asScala.toSeq: _*)
+ )
+ }
+}
+
+/**
+ * This is a helper class that wraps the KafkaUtils.createStream() into more
+ * Python-friendly class and function so that it can be easily
+ * instantiated and called from Python's KafkaUtils.
+ *
+ * The zero-arg constructor helps instantiate this class from the Class object
+ * classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
+ * takes care of known parameters instead of passing them from Python
+ */
+private[kafka] class KafkaUtilsPythonHelper {
+ import KafkaUtilsPythonHelper._
+
+ def createStream(
+ jssc: JavaStreamingContext,
+ kafkaParams: JMap[String, String],
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], Array[Byte]] = {
+ KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
+ jssc,
+ classOf[Array[Byte]],
+ classOf[Array[Byte]],
+ classOf[DefaultDecoder],
+ classOf[DefaultDecoder],
+ kafkaParams,
+ topics,
+ storageLevel)
+ }
+
+ def createRDDWithoutMessageHandler(
+ jsc: JavaSparkContext,
+ kafkaParams: JMap[String, String],
+ offsetRanges: JList[OffsetRange],
+ leaders: JMap[TopicAndPartition, Broker]): JavaRDD[(Array[Byte], Array[Byte])] = {
+ val messageHandler =
+ (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message)
+ new JavaRDD(createRDD(jsc, kafkaParams, offsetRanges, leaders, messageHandler))
+ }
+
+ def createRDDWithMessageHandler(
+ jsc: JavaSparkContext,
+ kafkaParams: JMap[String, String],
+ offsetRanges: JList[OffsetRange],
+ leaders: JMap[TopicAndPartition, Broker]): JavaRDD[Array[Byte]] = {
+ val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
+ new PythonMessageAndMetadata(
+ mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message())
+ val rdd = createRDD(jsc, kafkaParams, offsetRanges, leaders, messageHandler).
+ mapPartitions(picklerIterator)
+ new JavaRDD(rdd)
+ }
+
+ private def createRDD[V: ClassTag](
+ jsc: JavaSparkContext,
+ kafkaParams: JMap[String, String],
+ offsetRanges: JList[OffsetRange],
+ leaders: JMap[TopicAndPartition, Broker],
+ messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): RDD[V] = {
+ KafkaUtils.createRDD[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, V](
+ jsc.sc,
+ kafkaParams.asScala.toMap,
+ offsetRanges.toArray(new Array[OffsetRange](offsetRanges.size())),
+ leaders.asScala.toMap,
+ messageHandler
+ )
+ }
+
+ def createDirectStreamWithoutMessageHandler(
+ jssc: JavaStreamingContext,
+ kafkaParams: JMap[String, String],
+ topics: JSet[String],
+ fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[(Array[Byte], Array[Byte])] = {
+ val messageHandler =
+ (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message)
+ new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler))
+ }
+
+ def createDirectStreamWithMessageHandler(
+ jssc: JavaStreamingContext,
+ kafkaParams: JMap[String, String],
+ topics: JSet[String],
+ fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[Array[Byte]] = {
+ val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
+ new PythonMessageAndMetadata(mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message())
+ val stream = createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler).
+ mapPartitions(picklerIterator)
+ new JavaDStream(stream)
+ }
+
+ private def createDirectStream[V: ClassTag](
+ jssc: JavaStreamingContext,
+ kafkaParams: JMap[String, String],
+ topics: JSet[String],
+ fromOffsets: JMap[TopicAndPartition, JLong],
+ messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): DStream[V] = {
+
+ val currentFromOffsets = if (!fromOffsets.isEmpty) {
+ val topicsFromOffsets = fromOffsets.keySet().asScala.map(_.topic)
+ if (topicsFromOffsets != topics.asScala.toSet) {
+ throw new IllegalStateException(
+ s"The specified topics: ${topics.asScala.toSet.mkString(" ")} " +
+ s"do not equal to the topic from offsets: ${topicsFromOffsets.mkString(" ")}")
+ }
+ Map(fromOffsets.asScala.mapValues { _.longValue() }.toSeq: _*)
+ } else {
+ val kc = new KafkaCluster(Map(kafkaParams.asScala.toSeq: _*))
+ KafkaUtils.getFromOffsets(
+ kc, Map(kafkaParams.asScala.toSeq: _*), Set(topics.asScala.toSeq: _*))
+ }
+
+ KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, V](
+ jssc.ssc,
+ Map(kafkaParams.asScala.toSeq: _*),
+ Map(currentFromOffsets.toSeq: _*),
+ messageHandler)
+ }
+
+ def createOffsetRange(topic: String, partition: JInt, fromOffset: JLong, untilOffset: JLong
+ ): OffsetRange = OffsetRange.create(topic, partition, fromOffset, untilOffset)
+
+ def createTopicAndPartition(topic: String, partition: JInt): TopicAndPartition =
+ TopicAndPartition(topic, partition)
+
+ def createBroker(host: String, port: JInt): Broker = Broker(host, port)
+
+ def offsetRangesOfKafkaRDD(rdd: RDD[_]): JList[OffsetRange] = {
+ val parentRDDs = rdd.getNarrowAncestors
+ val kafkaRDDs = parentRDDs.filter(rdd => rdd.isInstanceOf[KafkaRDD[_, _, _, _, _]])
+
+ require(
+ kafkaRDDs.length == 1,
+ "Cannot get offset ranges, as there may be multiple Kafka RDDs or no Kafka RDD associated" +
+ "with this RDD, please call this method only on a Kafka RDD.")
+
+ val kafkaRDD = kafkaRDDs.head.asInstanceOf[KafkaRDD[_, _, _, _, _]]
+ kafkaRDD.offsetRanges.toSeq.asJava
+ }
+}
+
+private object KafkaUtilsPythonHelper {
+ private var initialized = false
+
+ def initialize(): Unit = {
+ SerDeUtil.initialize()
+ synchronized {
+ if (!initialized) {
+ new PythonMessageAndMetadataPickler().register()
+ initialized = true
+ }
+ }
+ }
+
+ initialize()
+
+ def picklerIterator(iter: Iterator[Any]): Iterator[Array[Byte]] = {
+ new SerDeUtil.AutoBatchedPickler(iter)
+ }
+
+ case class PythonMessageAndMetadata(
+ topic: String,
+ partition: JInt,
+ offset: JLong,
+ key: Array[Byte],
+ message: Array[Byte])
+
+ class PythonMessageAndMetadataPickler extends IObjectPickler {
+ private val module = "pyspark.streaming.kafka"
+
+ def register(): Unit = {
+ Pickler.registerCustomPickler(classOf[PythonMessageAndMetadata], this)
+ Pickler.registerCustomPickler(this.getClass, this)
+ }
+
+ def pickle(obj: Object, out: OutputStream, pickler: Pickler) {
+ if (obj == this) {
+ out.write(Opcodes.GLOBAL)
+ out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(StandardCharsets.UTF_8))
+ } else {
+ pickler.save(this)
+ val msgAndMetaData = obj.asInstanceOf[PythonMessageAndMetadata]
+ out.write(Opcodes.MARK)
+ pickler.save(msgAndMetaData.topic)
+ pickler.save(msgAndMetaData.partition)
+ pickler.save(msgAndMetaData.offset)
+ pickler.save(msgAndMetaData.key)
+ pickler.save(msgAndMetaData.message)
+ out.write(Opcodes.TUPLE)
+ out.write(Opcodes.REDUCE)
+ }
+ }
+ }
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
new file mode 100644
index 0000000000..d9b856e469
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.common.TopicAndPartition
+
+/**
+ * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the
+ * offset ranges in RDDs generated by the direct Kafka DStream (see
+ * [[KafkaUtils.createDirectStream()]]).
+ * {{{
+ * KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
+ * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ * ...
+ * }
+ * }}}
+ */
+trait HasOffsetRanges {
+ def offsetRanges: Array[OffsetRange]
+}
+
+/**
+ * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class
+ * can be created with `OffsetRange.create()`.
+ * @param topic Kafka topic name
+ * @param partition Kafka partition id
+ * @param fromOffset Inclusive starting offset
+ * @param untilOffset Exclusive ending offset
+ */
+final class OffsetRange private(
+ val topic: String,
+ val partition: Int,
+ val fromOffset: Long,
+ val untilOffset: Long) extends Serializable {
+ import OffsetRange.OffsetRangeTuple
+
+ /** Kafka TopicAndPartition object, for convenience */
+ def topicAndPartition(): TopicAndPartition = TopicAndPartition(topic, partition)
+
+ /** Number of messages this OffsetRange refers to */
+ def count(): Long = untilOffset - fromOffset
+
+ override def equals(obj: Any): Boolean = obj match {
+ case that: OffsetRange =>
+ this.topic == that.topic &&
+ this.partition == that.partition &&
+ this.fromOffset == that.fromOffset &&
+ this.untilOffset == that.untilOffset
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ toTuple.hashCode()
+ }
+
+ override def toString(): String = {
+ s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset])"
+ }
+
+ /** this is to avoid ClassNotFoundException during checkpoint restore */
+ private[streaming]
+ def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset)
+}
+
+/**
+ * Companion object the provides methods to create instances of [[OffsetRange]].
+ */
+object OffsetRange {
+ def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
+ new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+ def create(
+ topicAndPartition: TopicAndPartition,
+ fromOffset: Long,
+ untilOffset: Long): OffsetRange =
+ new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)
+
+ def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
+ new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+ def apply(
+ topicAndPartition: TopicAndPartition,
+ fromOffset: Long,
+ untilOffset: Long): OffsetRange =
+ new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)
+
+ /** this is to avoid ClassNotFoundException during checkpoint restore */
+ private[kafka]
+ type OffsetRangeTuple = (String, Int, Long, Long)
+
+ private[kafka]
+ def apply(t: OffsetRangeTuple) =
+ new OffsetRange(t._1, t._2, t._3, t._4)
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
new file mode 100644
index 0000000000..39abe3c3e2
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.util.Properties
+import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
+
+import scala.collection.{mutable, Map}
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
+ * It is turned off by default and will be enabled when
+ * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
+ * is that this receiver manages topic-partition/offset itself and updates the offset information
+ * after data is reliably stored as write-ahead log. Offsets will only be updated when data is
+ * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
+ *
+ * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
+ * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
+ * will not take effect.
+ */
+private[streaming]
+class ReliableKafkaReceiver[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel)
+ extends Receiver[(K, V)](storageLevel) with Logging {
+
+ private val groupId = kafkaParams("group.id")
+ private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
+ private def conf = SparkEnv.get.conf
+
+ /** High level consumer to connect to Kafka. */
+ private var consumerConnector: ConsumerConnector = null
+
+ /** zkClient to connect to Zookeeper to commit the offsets. */
+ private var zkClient: ZkClient = null
+
+ /**
+ * A HashMap to manage the offset for each topic/partition, this HashMap is called in
+ * synchronized block, so mutable HashMap will not meet concurrency issue.
+ */
+ private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
+
+ /** A concurrent HashMap to store the stream block id and related offset snapshot. */
+ private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
+
+ /**
+ * Manage the BlockGenerator in receiver itself for better managing block store and offset
+ * commit.
+ */
+ private var blockGenerator: BlockGenerator = null
+
+ /** Thread pool running the handlers for receiving message from multiple topics and partitions. */
+ private var messageHandlerThreadPool: ThreadPoolExecutor = null
+
+ override def onStart(): Unit = {
+ logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
+
+ // Initialize the topic-partition / offset hash map.
+ topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
+
+ // Initialize the stream block id / offset snapshot hash map.
+ blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()
+
+ // Initialize the block generator for storing Kafka message.
+ blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
+
+ if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
+ logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
+ "otherwise we will manually set it to false to turn off auto offset commit in Kafka")
+ }
+
+ val props = new Properties()
+ kafkaParams.foreach(param => props.put(param._1, param._2))
+ // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true,
+ // we have to make sure this property is set to false to turn off auto commit mechanism in
+ // Kafka.
+ props.setProperty(AUTO_OFFSET_COMMIT, "false")
+
+ val consumerConfig = new ConsumerConfig(props)
+
+ assert(!consumerConfig.autoCommitEnable)
+
+ logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
+ consumerConnector = Consumer.create(consumerConfig)
+ logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
+
+ zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
+ consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
+
+ messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
+ topics.values.sum, "KafkaMessageHandler")
+
+ blockGenerator.start()
+
+ val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[K]]
+
+ val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[V]]
+
+ val topicMessageStreams = consumerConnector.createMessageStreams(
+ topics, keyDecoder, valueDecoder)
+
+ topicMessageStreams.values.foreach { streams =>
+ streams.foreach { stream =>
+ messageHandlerThreadPool.submit(new MessageHandler(stream))
+ }
+ }
+ }
+
+ override def onStop(): Unit = {
+ if (messageHandlerThreadPool != null) {
+ messageHandlerThreadPool.shutdown()
+ messageHandlerThreadPool = null
+ }
+
+ if (consumerConnector != null) {
+ consumerConnector.shutdown()
+ consumerConnector = null
+ }
+
+ if (zkClient != null) {
+ zkClient.close()
+ zkClient = null
+ }
+
+ if (blockGenerator != null) {
+ blockGenerator.stop()
+ blockGenerator = null
+ }
+
+ if (topicPartitionOffsetMap != null) {
+ topicPartitionOffsetMap.clear()
+ topicPartitionOffsetMap = null
+ }
+
+ if (blockOffsetMap != null) {
+ blockOffsetMap.clear()
+ blockOffsetMap = null
+ }
+ }
+
+ /** Store a Kafka message and the associated metadata as a tuple. */
+ private def storeMessageAndMetadata(
+ msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
+ val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
+ val data = (msgAndMetadata.key, msgAndMetadata.message)
+ val metadata = (topicAndPartition, msgAndMetadata.offset)
+ blockGenerator.addDataWithCallback(data, metadata)
+ }
+
+ /** Update stored offset */
+ private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = {
+ topicPartitionOffsetMap.put(topicAndPartition, offset)
+ }
+
+ /**
+ * Remember the current offsets for each topic and partition. This is called when a block is
+ * generated.
+ */
+ private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
+ // Get a snapshot of current offset map and store with related block id.
+ val offsetSnapshot = topicPartitionOffsetMap.toMap
+ blockOffsetMap.put(blockId, offsetSnapshot)
+ topicPartitionOffsetMap.clear()
+ }
+
+ /**
+ * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method
+ * will try a fixed number of times to push the block. If the push fails, the receiver is stopped.
+ */
+ private def storeBlockAndCommitOffset(
+ blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
+ var count = 0
+ var pushed = false
+ var exception: Exception = null
+ while (!pushed && count <= 3) {
+ try {
+ store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
+ pushed = true
+ } catch {
+ case ex: Exception =>
+ count += 1
+ exception = ex
+ }
+ }
+ if (pushed) {
+ Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
+ blockOffsetMap.remove(blockId)
+ } else {
+ stop("Error while storing block into Spark", exception)
+ }
+ }
+
+ /**
+ * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
+ * metadata schema in Zookeeper.
+ */
+ private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
+ if (zkClient == null) {
+ val thrown = new IllegalStateException("Zookeeper client is unexpectedly null")
+ stop("Zookeeper client is not initialized before commit offsets to ZK", thrown)
+ return
+ }
+
+ for ((topicAndPart, offset) <- offsetMap) {
+ try {
+ val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic)
+ val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}"
+
+ ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
+ } catch {
+ case e: Exception =>
+ logWarning(s"Exception during commit offset $offset for topic" +
+ s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
+ }
+
+ logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
+ s"partition ${topicAndPart.partition}")
+ }
+ }
+
+ /** Class to handle received Kafka message. */
+ private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
+ override def run(): Unit = {
+ while (!isStopped) {
+ try {
+ val streamIterator = stream.iterator()
+ while (streamIterator.hasNext) {
+ storeMessageAndMetadata(streamIterator.next)
+ }
+ } catch {
+ case e: Exception =>
+ reportError("Error handling message", e)
+ }
+ }
+ }
+ }
+
+ /** Class to handle blocks generated by the block generator. */
+ private final class GeneratedBlockHandler extends BlockGeneratorListener {
+
+ def onAddData(data: Any, metadata: Any): Unit = {
+ // Update the offset of the data that was added to the generator
+ if (metadata != null) {
+ val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
+ updateOffset(topicAndPartition, offset)
+ }
+ }
+
+ def onGenerateBlock(blockId: StreamBlockId): Unit = {
+ // Remember the offsets of topics/partitions when a block has been generated
+ rememberBlockOffsets(blockId)
+ }
+
+ def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
+ // Store block and commit the blocks offset
+ storeBlockAndCommitOffset(blockId, arrayBuffer)
+ }
+
+ def onError(message: String, throwable: Throwable): Unit = {
+ reportError(message, throwable)
+ }
+ }
+}
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
new file mode 100644
index 0000000000..2e5ab0fb3b
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Kafka receiver for spark streaming.
+ */
+package org.apache.spark.streaming.kafka;
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala
new file mode 100644
index 0000000000..47c5187f87
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+/**
+ * Kafka receiver for spark streaming,
+ */
+package object kafka
diff --git a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
new file mode 100644
index 0000000000..fa6b0dbc8c
--- /dev/null
+++ b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.Tuple2;
+
+import kafka.common.TopicAndPartition;
+import kafka.message.MessageAndMetadata;
+import kafka.serializer.StringDecoder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+public class JavaDirectKafkaStreamSuite implements Serializable {
+ private transient JavaStreamingContext ssc = null;
+ private transient KafkaTestUtils kafkaTestUtils = null;
+
+ @Before
+ public void setUp() {
+ kafkaTestUtils = new KafkaTestUtils();
+ kafkaTestUtils.setup();
+ SparkConf sparkConf = new SparkConf()
+ .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+ ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
+ }
+
+ @After
+ public void tearDown() {
+ if (ssc != null) {
+ ssc.stop();
+ ssc = null;
+ }
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown();
+ kafkaTestUtils = null;
+ }
+ }
+
+ @Test
+ public void testKafkaStream() throws InterruptedException {
+ final String topic1 = "topic1";
+ final String topic2 = "topic2";
+ // hold a reference to the current offset ranges, so it can be used downstream
+ final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
+
+ String[] topic1data = createTopicAndSendData(topic1);
+ String[] topic2data = createTopicAndSendData(topic2);
+
+ Set<String> sent = new HashSet<>();
+ sent.addAll(Arrays.asList(topic1data));
+ sent.addAll(Arrays.asList(topic2data));
+
+ Map<String, String> kafkaParams = new HashMap<>();
+ kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
+ kafkaParams.put("auto.offset.reset", "smallest");
+
+ JavaDStream<String> stream1 = KafkaUtils.createDirectStream(
+ ssc,
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ kafkaParams,
+ topicToSet(topic1)
+ ).transformToPair(
+ // Make sure you can get offset ranges from the rdd
+ new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
+ @Override
+ public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) {
+ OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+ offsetRanges.set(offsets);
+ Assert.assertEquals(topic1, offsets[0].topic());
+ return rdd;
+ }
+ }
+ ).map(
+ new Function<Tuple2<String, String>, String>() {
+ @Override
+ public String call(Tuple2<String, String> kv) {
+ return kv._2();
+ }
+ }
+ );
+
+ JavaDStream<String> stream2 = KafkaUtils.createDirectStream(
+ ssc,
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ String.class,
+ kafkaParams,
+ topicOffsetToMap(topic2, 0L),
+ new Function<MessageAndMetadata<String, String>, String>() {
+ @Override
+ public String call(MessageAndMetadata<String, String> msgAndMd) {
+ return msgAndMd.message();
+ }
+ }
+ );
+ JavaDStream<String> unifiedStream = stream1.union(stream2);
+
+ final Set<String> result = Collections.synchronizedSet(new HashSet<String>());
+ unifiedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
+ @Override
+ public void call(JavaRDD<String> rdd) {
+ result.addAll(rdd.collect());
+ for (OffsetRange o : offsetRanges.get()) {
+ System.out.println(
+ o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
+ );
+ }
+ }
+ }
+ );
+ ssc.start();
+ long startTime = System.currentTimeMillis();
+ boolean matches = false;
+ while (!matches && System.currentTimeMillis() - startTime < 20000) {
+ matches = sent.size() == result.size();
+ Thread.sleep(50);
+ }
+ Assert.assertEquals(sent, result);
+ ssc.stop();
+ }
+
+ private static Set<String> topicToSet(String topic) {
+ Set<String> topicSet = new HashSet<>();
+ topicSet.add(topic);
+ return topicSet;
+ }
+
+ private static Map<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) {
+ Map<TopicAndPartition, Long> topicMap = new HashMap<>();
+ topicMap.put(new TopicAndPartition(topic, 0), offsetToStart);
+ return topicMap;
+ }
+
+ private String[] createTopicAndSendData(String topic) {
+ String[] data = { topic + "-1", topic + "-2", topic + "-3"};
+ kafkaTestUtils.createTopic(topic, 1);
+ kafkaTestUtils.sendMessages(topic, data);
+ return data;
+ }
+}
diff --git a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
new file mode 100644
index 0000000000..c41b6297b0
--- /dev/null
+++ b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import kafka.common.TopicAndPartition;
+import kafka.message.MessageAndMetadata;
+import kafka.serializer.StringDecoder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+public class JavaKafkaRDDSuite implements Serializable {
+ private transient JavaSparkContext sc = null;
+ private transient KafkaTestUtils kafkaTestUtils = null;
+
+ @Before
+ public void setUp() {
+ kafkaTestUtils = new KafkaTestUtils();
+ kafkaTestUtils.setup();
+ SparkConf sparkConf = new SparkConf()
+ .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+ sc = new JavaSparkContext(sparkConf);
+ }
+
+ @After
+ public void tearDown() {
+ if (sc != null) {
+ sc.stop();
+ sc = null;
+ }
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown();
+ kafkaTestUtils = null;
+ }
+ }
+
+ @Test
+ public void testKafkaRDD() throws InterruptedException {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ createTopicAndSendData(topic1);
+ createTopicAndSendData(topic2);
+
+ Map<String, String> kafkaParams = new HashMap<>();
+ kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
+
+ OffsetRange[] offsetRanges = {
+ OffsetRange.create(topic1, 0, 0, 1),
+ OffsetRange.create(topic2, 0, 0, 1)
+ };
+
+ Map<TopicAndPartition, Broker> emptyLeaders = new HashMap<>();
+ Map<TopicAndPartition, Broker> leaders = new HashMap<>();
+ String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":");
+ Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
+ leaders.put(new TopicAndPartition(topic1, 0), broker);
+ leaders.put(new TopicAndPartition(topic2, 0), broker);
+
+ JavaRDD<String> rdd1 = KafkaUtils.createRDD(
+ sc,
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ kafkaParams,
+ offsetRanges
+ ).map(
+ new Function<Tuple2<String, String>, String>() {
+ @Override
+ public String call(Tuple2<String, String> kv) {
+ return kv._2();
+ }
+ }
+ );
+
+ JavaRDD<String> rdd2 = KafkaUtils.createRDD(
+ sc,
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ String.class,
+ kafkaParams,
+ offsetRanges,
+ emptyLeaders,
+ new Function<MessageAndMetadata<String, String>, String>() {
+ @Override
+ public String call(MessageAndMetadata<String, String> msgAndMd) {
+ return msgAndMd.message();
+ }
+ }
+ );
+
+ JavaRDD<String> rdd3 = KafkaUtils.createRDD(
+ sc,
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ String.class,
+ kafkaParams,
+ offsetRanges,
+ leaders,
+ new Function<MessageAndMetadata<String, String>, String>() {
+ @Override
+ public String call(MessageAndMetadata<String, String> msgAndMd) {
+ return msgAndMd.message();
+ }
+ }
+ );
+
+ // just making sure the java user apis work; the scala tests handle logic corner cases
+ long count1 = rdd1.count();
+ long count2 = rdd2.count();
+ long count3 = rdd3.count();
+ Assert.assertTrue(count1 > 0);
+ Assert.assertEquals(count1, count2);
+ Assert.assertEquals(count1, count3);
+ }
+
+ private String[] createTopicAndSendData(String topic) {
+ String[] data = { topic + "-1", topic + "-2", topic + "-3"};
+ kafkaTestUtils.createTopic(topic, 1);
+ kafkaTestUtils.sendMessages(topic, data);
+ return data;
+ }
+}
diff --git a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
new file mode 100644
index 0000000000..868df64e8c
--- /dev/null
+++ b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
+import kafka.serializer.StringDecoder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+public class JavaKafkaStreamSuite implements Serializable {
+ private transient JavaStreamingContext ssc = null;
+ private transient Random random = new Random();
+ private transient KafkaTestUtils kafkaTestUtils = null;
+
+ @Before
+ public void setUp() {
+ kafkaTestUtils = new KafkaTestUtils();
+ kafkaTestUtils.setup();
+ SparkConf sparkConf = new SparkConf()
+ .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+ ssc = new JavaStreamingContext(sparkConf, new Duration(500));
+ }
+
+ @After
+ public void tearDown() {
+ if (ssc != null) {
+ ssc.stop();
+ ssc = null;
+ }
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown();
+ kafkaTestUtils = null;
+ }
+ }
+
+ @Test
+ public void testKafkaStream() throws InterruptedException {
+ String topic = "topic1";
+ Map<String, Integer> topics = new HashMap<>();
+ topics.put(topic, 1);
+
+ Map<String, Integer> sent = new HashMap<>();
+ sent.put("a", 5);
+ sent.put("b", 3);
+ sent.put("c", 10);
+
+ kafkaTestUtils.createTopic(topic, 1);
+ kafkaTestUtils.sendMessages(topic, sent);
+
+ Map<String, String> kafkaParams = new HashMap<>();
+ kafkaParams.put("zookeeper.connect", kafkaTestUtils.zkAddress());
+ kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000));
+ kafkaParams.put("auto.offset.reset", "smallest");
+
+ JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ kafkaParams,
+ topics,
+ StorageLevel.MEMORY_ONLY_SER());
+
+ final Map<String, Long> result = Collections.synchronizedMap(new HashMap<String, Long>());
+
+ JavaDStream<String> words = stream.map(
+ new Function<Tuple2<String, String>, String>() {
+ @Override
+ public String call(Tuple2<String, String> tuple2) {
+ return tuple2._2();
+ }
+ }
+ );
+
+ words.countByValue().foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
+ @Override
+ public void call(JavaPairRDD<String, Long> rdd) {
+ List<Tuple2<String, Long>> ret = rdd.collect();
+ for (Tuple2<String, Long> r : ret) {
+ if (result.containsKey(r._1())) {
+ result.put(r._1(), result.get(r._1()) + r._2());
+ } else {
+ result.put(r._1(), r._2());
+ }
+ }
+ }
+ }
+ );
+
+ ssc.start();
+
+ long startTime = System.currentTimeMillis();
+ boolean sizeMatches = false;
+ while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) {
+ sizeMatches = sent.size() == result.size();
+ Thread.sleep(200);
+ }
+ Assert.assertEquals(sent.size(), result.size());
+ for (Map.Entry<String, Integer> e : sent.entrySet()) {
+ Assert.assertEquals(e.getValue().intValue(), result.get(e.getKey()).intValue());
+ }
+ }
+}
diff --git a/external/kafka-0-8/src/test/resources/log4j.properties b/external/kafka-0-8/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..fd51f8faf5
--- /dev/null
+++ b/external/kafka-0-8/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark_project.jetty=WARN
+
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
new file mode 100644
index 0000000000..cb782d27fe
--- /dev/null
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.io.File
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.StringDecoder
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+import org.apache.spark.util.Utils
+
+class DirectKafkaStreamSuite
+ extends SparkFunSuite
+ with BeforeAndAfter
+ with BeforeAndAfterAll
+ with Eventually
+ with Logging {
+ val sparkConf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName(this.getClass.getSimpleName)
+
+ private var sc: SparkContext = _
+ private var ssc: StreamingContext = _
+ private var testDir: File = _
+
+ private var kafkaTestUtils: KafkaTestUtils = _
+
+ override def beforeAll {
+ kafkaTestUtils = new KafkaTestUtils
+ kafkaTestUtils.setup()
+ }
+
+ override def afterAll {
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown()
+ kafkaTestUtils = null
+ }
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ sc = null
+ }
+ if (sc != null) {
+ sc.stop()
+ }
+ if (testDir != null) {
+ Utils.deleteRecursively(testDir)
+ }
+ }
+
+
+ test("basic stream receiving with multiple topics and smallest starting offset") {
+ val topics = Set("basic1", "basic2", "basic3")
+ val data = Map("a" -> 7, "b" -> 9)
+ topics.foreach { t =>
+ kafkaTestUtils.createTopic(t)
+ kafkaTestUtils.sendMessages(t, data)
+ }
+ val totalSent = data.values.sum * topics.size
+ val kafkaParams = Map(
+ "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+ "auto.offset.reset" -> "smallest"
+ )
+
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics)
+ }
+
+ val allReceived = new ConcurrentLinkedQueue[(String, String)]()
+
+ // hold a reference to the current offset ranges, so it can be used downstream
+ var offsetRanges = Array[OffsetRange]()
+
+ stream.transform { rdd =>
+ // Get the offset ranges in the RDD
+ offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ rdd
+ }.foreachRDD { rdd =>
+ for (o <- offsetRanges) {
+ logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
+ }
+ val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
+ // For each partition, get size of the range in the partition,
+ // and the number of items in the partition
+ val off = offsetRanges(i)
+ val all = iter.toSeq
+ val partSize = all.size
+ val rangeSize = off.untilOffset - off.fromOffset
+ Iterator((partSize, rangeSize))
+ }.collect
+
+ // Verify whether number of elements in each partition
+ // matches with the corresponding offset range
+ collected.foreach { case (partSize, rangeSize) =>
+ assert(partSize === rangeSize, "offset ranges are wrong")
+ }
+ }
+ stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
+ ssc.start()
+ eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+ assert(allReceived.size === totalSent,
+ "didn't get expected number of messages, messages:\n" +
+ allReceived.asScala.mkString("\n"))
+ }
+ ssc.stop()
+ }
+
+ test("receiving from largest starting offset") {
+ val topic = "largest"
+ val topicPartition = TopicAndPartition(topic, 0)
+ val data = Map("a" -> 10)
+ kafkaTestUtils.createTopic(topic)
+ val kafkaParams = Map(
+ "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+ "auto.offset.reset" -> "largest"
+ )
+ val kc = new KafkaCluster(kafkaParams)
+ def getLatestOffset(): Long = {
+ kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
+ }
+
+ // Send some initial messages before starting context
+ kafkaTestUtils.sendMessages(topic, data)
+ eventually(timeout(10 seconds), interval(20 milliseconds)) {
+ assert(getLatestOffset() > 3)
+ }
+ val offsetBeforeStart = getLatestOffset()
+
+ // Setup context and kafka stream with largest offset
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, Set(topic))
+ }
+ assert(
+ stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
+ .fromOffsets(topicPartition) >= offsetBeforeStart,
+ "Start offset not from latest"
+ )
+
+ val collectedData = new ConcurrentLinkedQueue[String]()
+ stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
+ ssc.start()
+ val newData = Map("b" -> 10)
+ kafkaTestUtils.sendMessages(topic, newData)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ collectedData.contains("b")
+ }
+ assert(!collectedData.contains("a"))
+ }
+
+
+ test("creating stream by offset") {
+ val topic = "offset"
+ val topicPartition = TopicAndPartition(topic, 0)
+ val data = Map("a" -> 10)
+ kafkaTestUtils.createTopic(topic)
+ val kafkaParams = Map(
+ "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+ "auto.offset.reset" -> "largest"
+ )
+ val kc = new KafkaCluster(kafkaParams)
+ def getLatestOffset(): Long = {
+ kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
+ }
+
+ // Send some initial messages before starting context
+ kafkaTestUtils.sendMessages(topic, data)
+ eventually(timeout(10 seconds), interval(20 milliseconds)) {
+ assert(getLatestOffset() >= 10)
+ }
+ val offsetBeforeStart = getLatestOffset()
+
+ // Setup context and kafka stream with largest offset
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
+ ssc, kafkaParams, Map(topicPartition -> 11L),
+ (m: MessageAndMetadata[String, String]) => m.message())
+ }
+ assert(
+ stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
+ .fromOffsets(topicPartition) >= offsetBeforeStart,
+ "Start offset not from latest"
+ )
+
+ val collectedData = new ConcurrentLinkedQueue[String]()
+ stream.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
+ ssc.start()
+ val newData = Map("b" -> 10)
+ kafkaTestUtils.sendMessages(topic, newData)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ collectedData.contains("b")
+ }
+ assert(!collectedData.contains("a"))
+ }
+
+ // Test to verify the offset ranges can be recovered from the checkpoints
+ test("offset recovery") {
+ val topic = "recovery"
+ kafkaTestUtils.createTopic(topic)
+ testDir = Utils.createTempDir()
+
+ val kafkaParams = Map(
+ "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+ "auto.offset.reset" -> "smallest"
+ )
+
+ // Send data to Kafka and wait for it to be received
+ def sendDataAndWaitForReceive(data: Seq[Int]) {
+ val strings = data.map { _.toString}
+ kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains })
+ }
+ }
+
+ // Setup the streaming context
+ ssc = new StreamingContext(sparkConf, Milliseconds(100))
+ val kafkaStream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, Set(topic))
+ }
+ val keyedStream = kafkaStream.map { v => "key" -> v._2.toInt }
+ val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
+ Some(values.sum + state.getOrElse(0))
+ }
+ ssc.checkpoint(testDir.getAbsolutePath)
+
+ // This is to collect the raw data received from Kafka
+ kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
+ val data = rdd.map { _._2 }.collect()
+ DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*))
+ }
+
+ // This is ensure all the data is eventually receiving only once
+ stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
+ rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = x._2 }
+ }
+ ssc.start()
+
+ // Send some data and wait for them to be received
+ for (i <- (1 to 10).grouped(4)) {
+ sendDataAndWaitForReceive(i)
+ }
+
+ ssc.stop()
+
+ // Verify that offset ranges were generated
+ // Since "offsetRangesAfterStop" will be used to compare with "recoveredOffsetRanges", we should
+ // collect offset ranges after stopping. Otherwise, because new RDDs keep being generated before
+ // stopping, we may not be able to get the latest RDDs, then "recoveredOffsetRanges" will
+ // contain something not in "offsetRangesAfterStop".
+ val offsetRangesAfterStop = getOffsetRanges(kafkaStream)
+ assert(offsetRangesAfterStop.size >= 1, "No offset ranges generated")
+ assert(
+ offsetRangesAfterStop.head._2.forall { _.fromOffset === 0 },
+ "starting offset not zero"
+ )
+
+ logInfo("====== RESTARTING ========")
+
+ // Recover context from checkpoints
+ ssc = new StreamingContext(testDir.getAbsolutePath)
+ val recoveredStream = ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]]
+
+ // Verify offset ranges have been recovered
+ val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
+ assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
+ val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) }
+ assert(
+ recoveredOffsetRanges.forall { or =>
+ earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
+ },
+ "Recovered ranges are not the same as the ones generated\n" +
+ s"recoveredOffsetRanges: $recoveredOffsetRanges\n" +
+ s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets"
+ )
+ // Restart context, give more data and verify the total at the end
+ // If the total is write that means each records has been received only once
+ ssc.start()
+ sendDataAndWaitForReceive(11 to 20)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ assert(DirectKafkaStreamSuite.total === (1 to 20).sum)
+ }
+ ssc.stop()
+ }
+
+ test("Direct Kafka stream report input information") {
+ val topic = "report-test"
+ val data = Map("a" -> 7, "b" -> 9)
+ kafkaTestUtils.createTopic(topic)
+ kafkaTestUtils.sendMessages(topic, data)
+
+ val totalSent = data.values.sum
+ val kafkaParams = Map(
+ "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+ "auto.offset.reset" -> "smallest"
+ )
+
+ import DirectKafkaStreamSuite._
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val collector = new InputInfoCollector
+ ssc.addStreamingListener(collector)
+
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, Set(topic))
+ }
+
+ val allReceived = new ConcurrentLinkedQueue[(String, String)]
+
+ stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
+ ssc.start()
+ eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+ assert(allReceived.size === totalSent,
+ "didn't get expected number of messages, messages:\n" +
+ allReceived.asScala.mkString("\n"))
+
+ // Calculate all the record number collected in the StreamingListener.
+ assert(collector.numRecordsSubmitted.get() === totalSent)
+ assert(collector.numRecordsStarted.get() === totalSent)
+ assert(collector.numRecordsCompleted.get() === totalSent)
+ }
+ ssc.stop()
+ }
+
+ test("maxMessagesPerPartition with backpressure disabled") {
+ val topic = "maxMessagesPerPartition"
+ val kafkaStream = getDirectKafkaStream(topic, None)
+
+ val input = Map(TopicAndPartition(topic, 0) -> 50L, TopicAndPartition(topic, 1) -> 50L)
+ assert(kafkaStream.maxMessagesPerPartition(input).get ==
+ Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L))
+ }
+
+ test("maxMessagesPerPartition with no lag") {
+ val topic = "maxMessagesPerPartition"
+ val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 100))
+ val kafkaStream = getDirectKafkaStream(topic, rateController)
+
+ val input = Map(TopicAndPartition(topic, 0) -> 0L, TopicAndPartition(topic, 1) -> 0L)
+ assert(kafkaStream.maxMessagesPerPartition(input).isEmpty)
+ }
+
+ test("maxMessagesPerPartition respects max rate") {
+ val topic = "maxMessagesPerPartition"
+ val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 1000))
+ val kafkaStream = getDirectKafkaStream(topic, rateController)
+
+ val input = Map(TopicAndPartition(topic, 0) -> 1000L, TopicAndPartition(topic, 1) -> 1000L)
+ assert(kafkaStream.maxMessagesPerPartition(input).get ==
+ Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L))
+ }
+
+ test("using rate controller") {
+ val topic = "backpressure"
+ val topicPartitions = Set(TopicAndPartition(topic, 0), TopicAndPartition(topic, 1))
+ kafkaTestUtils.createTopic(topic, 2)
+ val kafkaParams = Map(
+ "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+ "auto.offset.reset" -> "smallest"
+ )
+
+ val batchIntervalMilliseconds = 100
+ val estimator = new ConstantEstimator(100)
+ val messages = Map("foo" -> 200)
+ kafkaTestUtils.sendMessages(topic, messages)
+
+ val sparkConf = new SparkConf()
+ // Safe, even with streaming, because we're using the direct API.
+ // Using 1 core is useful to make the test more predictable.
+ .setMaster("local[1]")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.kafka.maxRatePerPartition", "100")
+
+ // Setup the streaming context
+ ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
+
+ val kafkaStream = withClue("Error creating direct stream") {
+ val kc = new KafkaCluster(kafkaParams)
+ val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
+ val m = kc.getEarliestLeaderOffsets(topicPartitions)
+ .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset))
+
+ new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
+ ssc, kafkaParams, m, messageHandler) {
+ override protected[streaming] val rateController =
+ Some(new DirectKafkaRateController(id, estimator))
+ }
+ }
+
+ val collectedData = new ConcurrentLinkedQueue[Array[String]]()
+
+ // Used for assertion failure messages.
+ def dataToString: String =
+ collectedData.asScala.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")
+
+ // This is to collect the raw data received from Kafka
+ kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
+ val data = rdd.map { _._2 }.collect()
+ collectedData.add(data)
+ }
+
+ ssc.start()
+
+ // Try different rate limits.
+ // Wait for arrays of data to appear matching the rate.
+ Seq(100, 50, 20).foreach { rate =>
+ collectedData.clear() // Empty this buffer on each pass.
+ estimator.updateRate(rate) // Set a new rate.
+ // Expect blocks of data equal to "rate", scaled by the interval length in secs.
+ val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
+ eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
+ // Assert that rate estimator values are used to determine maxMessagesPerPartition.
+ // Funky "-" in message makes the complete assertion message read better.
+ assert(collectedData.asScala.exists(_.size == expectedSize),
+ s" - No arrays of size $expectedSize for rate $rate found in $dataToString")
+ }
+ }
+
+ ssc.stop()
+ }
+
+ /** Get the generated offset ranges from the DirectKafkaStream */
+ private def getOffsetRanges[K, V](
+ kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = {
+ kafkaStream.generatedRDDs.mapValues { rdd =>
+ rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges
+ }.toSeq.sortBy { _._1 }
+ }
+
+ private def getDirectKafkaStream(topic: String, mockRateController: Option[RateController]) = {
+ val batchIntervalMilliseconds = 100
+
+ val sparkConf = new SparkConf()
+ .setMaster("local[1]")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.kafka.maxRatePerPartition", "100")
+
+ // Setup the streaming context
+ ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
+
+ val earliestOffsets = Map(TopicAndPartition(topic, 0) -> 0L, TopicAndPartition(topic, 1) -> 0L)
+ val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
+ new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
+ ssc, Map[String, String](), earliestOffsets, messageHandler) {
+ override protected[streaming] val rateController = mockRateController
+ }
+ }
+}
+
+object DirectKafkaStreamSuite {
+ val collectedData = new ConcurrentLinkedQueue[String]()
+ @volatile var total = -1L
+
+ class InputInfoCollector extends StreamingListener {
+ val numRecordsSubmitted = new AtomicLong(0L)
+ val numRecordsStarted = new AtomicLong(0L)
+ val numRecordsCompleted = new AtomicLong(0L)
+
+ override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
+ numRecordsSubmitted.addAndGet(batchSubmitted.batchInfo.numRecords)
+ }
+
+ override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
+ numRecordsStarted.addAndGet(batchStarted.batchInfo.numRecords)
+ }
+
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
+ numRecordsCompleted.addAndGet(batchCompleted.batchInfo.numRecords)
+ }
+ }
+}
+
+private[streaming] class ConstantEstimator(@volatile private var rate: Long)
+ extends RateEstimator {
+
+ def updateRate(newRate: Long): Unit = {
+ rate = newRate
+ }
+
+ def compute(
+ time: Long,
+ elements: Long,
+ processingDelay: Long,
+ schedulingDelay: Long): Option[Double] = Some(rate)
+}
+
+private[streaming] class ConstantRateController(id: Int, estimator: RateEstimator, rate: Long)
+ extends RateController(id, estimator) {
+ override def publish(rate: Long): Unit = ()
+ override def getLatestRate(): Long = rate
+}
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
new file mode 100644
index 0000000000..d66830cbac
--- /dev/null
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+
+import kafka.common.TopicAndPartition
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.SparkFunSuite
+
+class KafkaClusterSuite extends SparkFunSuite with BeforeAndAfterAll {
+ private val topic = "kcsuitetopic" + Random.nextInt(10000)
+ private val topicAndPartition = TopicAndPartition(topic, 0)
+ private var kc: KafkaCluster = null
+
+ private var kafkaTestUtils: KafkaTestUtils = _
+
+ override def beforeAll() {
+ kafkaTestUtils = new KafkaTestUtils
+ kafkaTestUtils.setup()
+
+ kafkaTestUtils.createTopic(topic)
+ kafkaTestUtils.sendMessages(topic, Map("a" -> 1))
+ kc = new KafkaCluster(Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress))
+ }
+
+ override def afterAll() {
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown()
+ kafkaTestUtils = null
+ }
+ }
+
+ test("metadata apis") {
+ val leader = kc.findLeaders(Set(topicAndPartition)).right.get(topicAndPartition)
+ val leaderAddress = s"${leader._1}:${leader._2}"
+ assert(leaderAddress === kafkaTestUtils.brokerAddress, "didn't get leader")
+
+ val parts = kc.getPartitions(Set(topic)).right.get
+ assert(parts(topicAndPartition), "didn't get partitions")
+
+ val err = kc.getPartitions(Set(topic + "BAD"))
+ assert(err.isLeft, "getPartitions for a nonexistant topic should be an error")
+ }
+
+ test("leader offset apis") {
+ val earliest = kc.getEarliestLeaderOffsets(Set(topicAndPartition)).right.get
+ assert(earliest(topicAndPartition).offset === 0, "didn't get earliest")
+
+ val latest = kc.getLatestLeaderOffsets(Set(topicAndPartition)).right.get
+ assert(latest(topicAndPartition).offset === 1, "didn't get latest")
+ }
+
+ test("consumer offset apis") {
+ val group = "kcsuitegroup" + Random.nextInt(10000)
+
+ val offset = Random.nextInt(10000)
+
+ val set = kc.setConsumerOffsets(group, Map(topicAndPartition -> offset))
+ assert(set.isRight, "didn't set consumer offsets")
+
+ val get = kc.getConsumerOffsets(group, Set(topicAndPartition)).right.get
+ assert(get(topicAndPartition) === offset, "didn't get consumer offsets")
+ }
+}
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
new file mode 100644
index 0000000000..5e539c1d79
--- /dev/null
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.StringDecoder
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark._
+
+class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
+
+ private var kafkaTestUtils: KafkaTestUtils = _
+
+ private val sparkConf = new SparkConf().setMaster("local[4]")
+ .setAppName(this.getClass.getSimpleName)
+ private var sc: SparkContext = _
+
+ override def beforeAll {
+ sc = new SparkContext(sparkConf)
+ kafkaTestUtils = new KafkaTestUtils
+ kafkaTestUtils.setup()
+ }
+
+ override def afterAll {
+ if (sc != null) {
+ sc.stop
+ sc = null
+ }
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown()
+ kafkaTestUtils = null
+ }
+ }
+
+ test("basic usage") {
+ val topic = s"topicbasic-${Random.nextInt}"
+ kafkaTestUtils.createTopic(topic)
+ val messages = Array("the", "quick", "brown", "fox")
+ kafkaTestUtils.sendMessages(topic, messages)
+
+ val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+ "group.id" -> s"test-consumer-${Random.nextInt}")
+
+ val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
+
+ val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
+ sc, kafkaParams, offsetRanges)
+
+ val received = rdd.map(_._2).collect.toSet
+ assert(received === messages.toSet)
+
+ // size-related method optimizations return sane results
+ assert(rdd.count === messages.size)
+ assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
+ assert(!rdd.isEmpty)
+ assert(rdd.take(1).size === 1)
+ assert(rdd.take(1).head._2 === messages.head)
+ assert(rdd.take(messages.size + 10).size === messages.size)
+
+ val emptyRdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
+ sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)))
+
+ assert(emptyRdd.isEmpty)
+
+ // invalid offset ranges throw exceptions
+ val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1))
+ intercept[SparkException] {
+ KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
+ sc, kafkaParams, badRanges)
+ }
+ }
+
+ test("iterator boundary conditions") {
+ // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
+ val topic = s"topicboundary-${Random.nextInt}"
+ val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
+ kafkaTestUtils.createTopic(topic)
+
+ val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+ "group.id" -> s"test-consumer-${Random.nextInt}")
+
+ val kc = new KafkaCluster(kafkaParams)
+
+ // this is the "lots of messages" case
+ kafkaTestUtils.sendMessages(topic, sent)
+ val sentCount = sent.values.sum
+
+ // rdd defined from leaders after sending messages, should get the number sent
+ val rdd = getRdd(kc, Set(topic))
+
+ assert(rdd.isDefined)
+
+ val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges
+ val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum
+
+ assert(rangeCount === sentCount, "offset range didn't include all sent messages")
+ assert(rdd.get.count === sentCount, "didn't get all sent messages")
+
+ val rangesMap = ranges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap
+
+ // make sure consumer offsets are committed before the next getRdd call
+ kc.setConsumerOffsets(kafkaParams("group.id"), rangesMap).fold(
+ err => throw new Exception(err.mkString("\n")),
+ _ => ()
+ )
+
+ // this is the "0 messages" case
+ val rdd2 = getRdd(kc, Set(topic))
+ // shouldn't get anything, since message is sent after rdd was defined
+ val sentOnlyOne = Map("d" -> 1)
+
+ kafkaTestUtils.sendMessages(topic, sentOnlyOne)
+
+ assert(rdd2.isDefined)
+ assert(rdd2.get.count === 0, "got messages when there shouldn't be any")
+
+ // this is the "exactly 1 message" case, namely the single message from sentOnlyOne above
+ val rdd3 = getRdd(kc, Set(topic))
+ // send lots of messages after rdd was defined, they shouldn't show up
+ kafkaTestUtils.sendMessages(topic, Map("extra" -> 22))
+
+ assert(rdd3.isDefined)
+ assert(rdd3.get.count === sentOnlyOne.values.sum, "didn't get exactly one message")
+
+ }
+
+ // get an rdd from the committed consumer offsets until the latest leader offsets,
+ private def getRdd(kc: KafkaCluster, topics: Set[String]) = {
+ val groupId = kc.kafkaParams("group.id")
+ def consumerOffsets(topicPartitions: Set[TopicAndPartition]) = {
+ kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
+ kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs =>
+ offs.map(kv => kv._1 -> kv._2.offset)
+ }
+ )
+ }
+ kc.getPartitions(topics).right.toOption.flatMap { topicPartitions =>
+ consumerOffsets(topicPartitions).flatMap { from =>
+ kc.getLatestLeaderOffsets(topicPartitions).right.toOption.map { until =>
+ val offsetRanges = from.map { case (tp: TopicAndPartition, fromOffset: Long) =>
+ OffsetRange(tp.topic, tp.partition, fromOffset, until(tp).offset)
+ }.toArray
+
+ val leaders = until.map { case (tp: TopicAndPartition, lo: KafkaCluster.LeaderOffset) =>
+ tp -> Broker(lo.host, lo.port)
+ }.toMap
+
+ KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, String](
+ sc, kc.kafkaParams, offsetRanges, leaders,
+ (mmd: MessageAndMetadata[String, String]) => s"${mmd.offset} ${mmd.message}")
+ }
+ }
+ }
+ }
+}
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
new file mode 100644
index 0000000000..6a35ac14a8
--- /dev/null
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
+
+import kafka.serializer.StringDecoder
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+
+class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterAll {
+ private var ssc: StreamingContext = _
+ private var kafkaTestUtils: KafkaTestUtils = _
+
+ override def beforeAll(): Unit = {
+ kafkaTestUtils = new KafkaTestUtils
+ kafkaTestUtils.setup()
+ }
+
+ override def afterAll(): Unit = {
+ if (ssc != null) {
+ ssc.stop()
+ ssc = null
+ }
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown()
+ kafkaTestUtils = null
+ }
+ }
+
+ test("Kafka input stream") {
+ val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
+ ssc = new StreamingContext(sparkConf, Milliseconds(500))
+ val topic = "topic1"
+ val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
+ kafkaTestUtils.createTopic(topic)
+ kafkaTestUtils.sendMessages(topic, sent)
+
+ val kafkaParams = Map("zookeeper.connect" -> kafkaTestUtils.zkAddress,
+ "group.id" -> s"test-consumer-${Random.nextInt(10000)}",
+ "auto.offset.reset" -> "smallest")
+
+ val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
+ val result = new mutable.HashMap[String, Long]()
+ stream.map(_._2).countByValue().foreachRDD { r =>
+ r.collect().foreach { kv =>
+ result.synchronized {
+ val count = result.getOrElseUpdate(kv._1, 0) + kv._2
+ result.put(kv._1, count)
+ }
+ }
+ }
+
+ ssc.start()
+
+ eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+ assert(result.synchronized { sent === result })
+ }
+ }
+}
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
new file mode 100644
index 0000000000..7b9aee39ff
--- /dev/null
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.io.File
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
+
+import kafka.serializer.StringDecoder
+import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+import org.apache.spark.util.Utils
+
+class ReliableKafkaStreamSuite extends SparkFunSuite
+ with BeforeAndAfterAll with BeforeAndAfter with Eventually {
+
+ private val sparkConf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.receiver.writeAheadLog.enable", "true")
+ private val data = Map("a" -> 10, "b" -> 10, "c" -> 10)
+
+ private var kafkaTestUtils: KafkaTestUtils = _
+
+ private var groupId: String = _
+ private var kafkaParams: Map[String, String] = _
+ private var ssc: StreamingContext = _
+ private var tempDirectory: File = null
+
+ override def beforeAll(): Unit = {
+ kafkaTestUtils = new KafkaTestUtils
+ kafkaTestUtils.setup()
+
+ groupId = s"test-consumer-${Random.nextInt(10000)}"
+ kafkaParams = Map(
+ "zookeeper.connect" -> kafkaTestUtils.zkAddress,
+ "group.id" -> groupId,
+ "auto.offset.reset" -> "smallest"
+ )
+
+ tempDirectory = Utils.createTempDir()
+ }
+
+ override def afterAll(): Unit = {
+ Utils.deleteRecursively(tempDirectory)
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown()
+ kafkaTestUtils = null
+ }
+ }
+
+ before {
+ ssc = new StreamingContext(sparkConf, Milliseconds(500))
+ ssc.checkpoint(tempDirectory.getAbsolutePath)
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ ssc = null
+ }
+ }
+
+ test("Reliable Kafka input stream with single topic") {
+ val topic = "test-topic"
+ kafkaTestUtils.createTopic(topic)
+ kafkaTestUtils.sendMessages(topic, data)
+
+ // Verify whether the offset of this group/topic/partition is 0 before starting.
+ assert(getCommitOffset(groupId, topic, 0) === None)
+
+ val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
+ val result = new mutable.HashMap[String, Long]()
+ stream.map { case (k, v) => v }.foreachRDD { r =>
+ val ret = r.collect()
+ ret.foreach { v =>
+ val count = result.getOrElseUpdate(v, 0) + 1
+ result.put(v, count)
+ }
+ }
+ ssc.start()
+
+ eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
+ // A basic process verification for ReliableKafkaReceiver.
+ // Verify whether received message number is equal to the sent message number.
+ assert(data.size === result.size)
+ // Verify whether each message is the same as the data to be verified.
+ data.keys.foreach { k => assert(data(k) === result(k).toInt) }
+ // Verify the offset number whether it is equal to the total message number.
+ assert(getCommitOffset(groupId, topic, 0) === Some(29L))
+ }
+ }
+
+ test("Reliable Kafka input stream with multiple topics") {
+ val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
+ topics.foreach { case (t, _) =>
+ kafkaTestUtils.createTopic(t)
+ kafkaTestUtils.sendMessages(t, data)
+ }
+
+ // Before started, verify all the group/topic/partition offsets are 0.
+ topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === None) }
+
+ // Consuming all the data sent to the broker which will potential commit the offsets internally.
+ val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY)
+ stream.foreachRDD(_ => Unit)
+ ssc.start()
+
+ eventually(timeout(20000 milliseconds), interval(100 milliseconds)) {
+ // Verify the offset for each group/topic to see whether they are equal to the expected one.
+ topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === Some(29L)) }
+ }
+ }
+
+
+ /** Getting partition offset from Zookeeper. */
+ private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = {
+ val topicDirs = new ZKGroupTopicDirs(groupId, topic)
+ val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
+ ZkUtils.readDataMaybeNull(kafkaTestUtils.zookeeperClient, zkPath)._1.map(_.toLong)
+ }
+}