aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2016-06-29 23:21:03 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-06-29 23:21:03 -0700
commitdedbceec1ef33ccd88101016de969a1ef3e3e142 (patch)
tree2b43daee5f9235eb96e77fe98447aa2414633599
parentbde1d6a61593aeb62370f526542cead94919b0c0 (diff)
downloadspark-dedbceec1ef33ccd88101016de969a1ef3e3e142.tar.gz
spark-dedbceec1ef33ccd88101016de969a1ef3e3e142.tar.bz2
spark-dedbceec1ef33ccd88101016de969a1ef3e3e142.zip
[SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API
## What changes were proposed in this pull request? New Kafka consumer api for the released 0.10 version of Kafka ## How was this patch tested? Unit tests, manual tests Author: cody koeninger <cody@koeninger.org> Closes #11863 from koeninger/kafka-0.9.
-rw-r--r--external/kafka-0-10-assembly/pom.xml176
-rw-r--r--external/kafka-0-10/pom.xml98
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala189
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala314
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala318
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala232
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDDPartition.scala45
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala277
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala175
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala77
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala153
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package-info.java21
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala23
-rw-r--r--external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java84
-rw-r--r--external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java180
-rw-r--r--external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java122
-rw-r--r--external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java58
-rw-r--r--external/kafka-0-10/src/test/resources/log4j.properties28
-rw-r--r--external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala612
-rw-r--r--external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala169
-rw-r--r--pom.xml2
-rw-r--r--project/SparkBuild.scala12
22 files changed, 3359 insertions, 6 deletions
diff --git a/external/kafka-0-10-assembly/pom.xml b/external/kafka-0-10-assembly/pom.xml
new file mode 100644
index 0000000000..f2468d1cba
--- /dev/null
+++ b/external/kafka-0-10-assembly/pom.xml
@@ -0,0 +1,176 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kafka-0-10-assembly_2.11</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Integration for Kafka 0.10 Assembly</name>
+ <url>http://spark.apache.org/</url>
+
+ <properties>
+ <sbt.project.name>streaming-kafka-0-10-assembly</sbt.project.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <!--
+ Demote already included in the Spark assembly.
+ -->
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>lz4</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ <classifier>${avro.mapred.classifier}</classifier>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>net.java.dev.jets3t</groupId>
+ <artifactId>jets3t</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <artifactSet>
+ <includes>
+ <include>*:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>reference.conf</resource>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+ <resource>log4j.properties</resource>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+</build>
+</project>
+
diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml
new file mode 100644
index 0000000000..50395f6d14
--- /dev/null
+++ b/external/kafka-0-10/pom.xml
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
+ <properties>
+ <sbt.project.name>streaming-kafka-0-10</sbt.project.name>
+ </properties>
+ <packaging>jar</packaging>
+ <name>Spark Integration for Kafka 0.10</name>
+ <url>http://spark.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ <version>0.10.0.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.jopt-simple</groupId>
+ <artifactId>jopt-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>net.sf.jopt-simple</groupId>
+ <artifactId>jopt-simple</artifactId>
+ <version>3.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_${scala.binary.version}</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+</project>
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
new file mode 100644
index 0000000000..fa3ea6131a
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer }
+import org.apache.kafka.common.{ KafkaException, TopicPartition }
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads is usually bad anyway.
+ */
+private[kafka010]
+class CachedKafkaConsumer[K, V] private(
+ val groupId: String,
+ val topic: String,
+ val partition: Int,
+ val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+ assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+ "groupId used for cache key must match the groupId in kafkaParams")
+
+ val topicPartition = new TopicPartition(topic, partition)
+
+ protected val consumer = {
+ val c = new KafkaConsumer[K, V](kafkaParams)
+ val tps = new ju.ArrayList[TopicPartition]()
+ tps.add(topicPartition)
+ c.assign(tps)
+ c
+ }
+
+ // TODO if the buffer was kept around as a random-access structure,
+ // could possibly optimize re-calculating of an RDD in the same batch
+ protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator
+ protected var nextOffset = -2L
+
+ def close(): Unit = consumer.close()
+
+ /**
+ * Get the record for the given offset, waiting up to timeout ms if IO is necessary.
+ * Sequential forward access will use buffers, but random access will be horribly inefficient.
+ */
+ def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
+ logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset")
+ if (offset != nextOffset) {
+ logInfo(s"Initial fetch for $groupId $topic $partition $offset")
+ seek(offset)
+ poll(timeout)
+ }
+
+ if (!buffer.hasNext()) { poll(timeout) }
+ assert(buffer.hasNext(),
+ s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
+ var record = buffer.next()
+
+ if (record.offset != offset) {
+ logInfo(s"Buffer miss for $groupId $topic $partition $offset")
+ seek(offset)
+ poll(timeout)
+ assert(buffer.hasNext(),
+ s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
+ record = buffer.next()
+ assert(record.offset == offset,
+ s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset")
+ }
+
+ nextOffset = offset + 1
+ record
+ }
+
+ private def seek(offset: Long): Unit = {
+ logDebug(s"Seeking to $topicPartition $offset")
+ consumer.seek(topicPartition, offset)
+ }
+
+ private def poll(timeout: Long): Unit = {
+ val p = consumer.poll(timeout)
+ val r = p.records(topicPartition)
+ logDebug(s"Polled ${p.partitions()} ${r.size}")
+ buffer = r.iterator
+ }
+
+}
+
+private[kafka010]
+object CachedKafkaConsumer extends Logging {
+
+ private case class CacheKey(groupId: String, topic: String, partition: Int)
+
+ // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap
+ private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null
+
+ /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */
+ def init(
+ initialCapacity: Int,
+ maxCapacity: Int,
+ loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
+ if (null == cache) {
+ logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor")
+ cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]](
+ initialCapacity, loadFactor, true) {
+ override def removeEldestEntry(
+ entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): Boolean = {
+ if (this.size > maxCapacity) {
+ try {
+ entry.getValue.consumer.close()
+ } catch {
+ case x: KafkaException =>
+ logError("Error closing oldest Kafka consumer", x)
+ }
+ true
+ } else {
+ false
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Get a cached consumer for groupId, assigned to topic and partition.
+ * If matching consumer doesn't already exist, will be created using kafkaParams.
+ */
+ def get[K, V](
+ groupId: String,
+ topic: String,
+ partition: Int,
+ kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] =
+ CachedKafkaConsumer.synchronized {
+ val k = CacheKey(groupId, topic, partition)
+ val v = cache.get(k)
+ if (null == v) {
+ logInfo(s"Cache miss for $k")
+ logDebug(cache.keySet.toString)
+ val c = new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams)
+ cache.put(k, c)
+ c
+ } else {
+ // any given topicpartition should have a consistent key and value type
+ v.asInstanceOf[CachedKafkaConsumer[K, V]]
+ }
+ }
+
+ /**
+ * Get a fresh new instance, unassociated with the global cache.
+ * Caller is responsible for closing
+ */
+ def getUncached[K, V](
+ groupId: String,
+ topic: String,
+ partition: Int,
+ kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] =
+ new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams)
+
+ /** remove consumer for given groupId, topic, and partition, if it exists */
+ def remove(groupId: String, topic: String, partition: Int): Unit = {
+ val k = CacheKey(groupId, topic, partition)
+ logInfo(s"Removing $k from cache")
+ val v = CachedKafkaConsumer.synchronized {
+ cache.remove(k)
+ }
+ if (null != v) {
+ v.close()
+ logInfo(s"Removed $k from cache")
+ }
+ }
+}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
new file mode 100644
index 0000000000..079a07dbc2
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.annotation.Experimental
+
+
+/**
+ * :: Experimental ::
+ * Choice of how to create and configure underlying Kafka Consumers on driver and executors.
+ * Kafka 0.10 consumers can require additional, sometimes complex, setup after object
+ * instantiation. This interface encapsulates that process, and allows it to be checkpointed.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+trait ConsumerStrategy[K, V] {
+ /**
+ * Kafka <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * configuration parameters</a> to be used on executors. Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+ def executorKafkaParams: ju.Map[String, Object]
+
+ /**
+ * Must return a fully configured Kafka Consumer, including subscribed or assigned topics.
+ * This consumer will be used on the driver to query for offsets only, not messages.
+ * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver
+ * has successfully read. Will be empty on initial start, possibly non-empty on restart from
+ * checkpoint.
+ */
+ def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
+}
+
+/**
+ * :: Experimental ::
+ * Subscribe to a collection of topics.
+ * @param topics collection of topics to subscribe
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ * Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsets: offsets to begin at on initial startup. If no offset is given for a
+ * TopicPartition, the committed offset (if applicable) or kafka param
+ * auto.offset.reset will be used.
+ */
+@Experimental
+case class Subscribe[K, V] private(
+ topics: ju.Collection[java.lang.String],
+ kafkaParams: ju.Map[String, Object],
+ offsets: ju.Map[TopicPartition, Long]
+ ) extends ConsumerStrategy[K, V] {
+
+ def executorKafkaParams: ju.Map[String, Object] = kafkaParams
+
+ def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
+ val consumer = new KafkaConsumer[K, V](kafkaParams)
+ consumer.subscribe(topics)
+ if (currentOffsets.isEmpty) {
+ offsets.asScala.foreach { case (topicPartition, offset) =>
+ consumer.seek(topicPartition, offset)
+ }
+ }
+
+ consumer
+ }
+}
+
+/**
+ * :: Experimental ::
+ * Companion object for creating [[Subscribe]] strategy
+ */
+@Experimental
+object Subscribe {
+ /**
+ * :: Experimental ::
+ * Subscribe to a collection of topics.
+ * @param topics collection of topics to subscribe
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ * Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsets: offsets to begin at on initial startup. If no offset is given for a
+ * TopicPartition, the committed offset (if applicable) or kafka param
+ * auto.offset.reset will be used.
+ */
+ @Experimental
+ def apply[K, V](
+ topics: Iterable[java.lang.String],
+ kafkaParams: collection.Map[String, Object],
+ offsets: collection.Map[TopicPartition, Long]): Subscribe[K, V] = {
+ Subscribe[K, V](
+ new ju.ArrayList(topics.asJavaCollection),
+ new ju.HashMap[String, Object](kafkaParams.asJava),
+ new ju.HashMap[TopicPartition, Long](offsets.asJava))
+ }
+
+ /**
+ * :: Experimental ::
+ * Subscribe to a collection of topics.
+ * @param topics collection of topics to subscribe
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ * Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+ @Experimental
+ def apply[K, V](
+ topics: Iterable[java.lang.String],
+ kafkaParams: collection.Map[String, Object]): Subscribe[K, V] = {
+ Subscribe[K, V](
+ new ju.ArrayList(topics.asJavaCollection),
+ new ju.HashMap[String, Object](kafkaParams.asJava),
+ ju.Collections.emptyMap[TopicPartition, Long]())
+ }
+
+ /**
+ * :: Experimental ::
+ * Subscribe to a collection of topics.
+ * @param topics collection of topics to subscribe
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ * Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsets: offsets to begin at on initial startup. If no offset is given for a
+ * TopicPartition, the committed offset (if applicable) or kafka param
+ * auto.offset.reset will be used.
+ */
+ @Experimental
+ def create[K, V](
+ topics: ju.Collection[java.lang.String],
+ kafkaParams: ju.Map[String, Object],
+ offsets: ju.Map[TopicPartition, Long]): Subscribe[K, V] = {
+ Subscribe[K, V](topics, kafkaParams, offsets)
+ }
+
+ /**
+ * :: Experimental ::
+ * Subscribe to a collection of topics.
+ * @param topics collection of topics to subscribe
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ * Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+ @Experimental
+ def create[K, V](
+ topics: ju.Collection[java.lang.String],
+ kafkaParams: ju.Map[String, Object]): Subscribe[K, V] = {
+ Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
+ }
+
+}
+
+/**
+ * :: Experimental ::
+ * Assign a fixed collection of TopicPartitions
+ * @param topicPartitions collection of TopicPartitions to assign
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ * Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsets: offsets to begin at on initial startup. If no offset is given for a
+ * TopicPartition, the committed offset (if applicable) or kafka param
+ * auto.offset.reset will be used.
+ */
+@Experimental
+case class Assign[K, V] private(
+ topicPartitions: ju.Collection[TopicPartition],
+ kafkaParams: ju.Map[String, Object],
+ offsets: ju.Map[TopicPartition, Long]
+ ) extends ConsumerStrategy[K, V] {
+
+ def executorKafkaParams: ju.Map[String, Object] = kafkaParams
+
+ def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
+ val consumer = new KafkaConsumer[K, V](kafkaParams)
+ consumer.assign(topicPartitions)
+ if (currentOffsets.isEmpty) {
+ offsets.asScala.foreach { case (topicPartition, offset) =>
+ consumer.seek(topicPartition, offset)
+ }
+ }
+
+ consumer
+ }
+}
+
+/**
+ * :: Experimental ::
+ * Companion object for creating [[Assign]] strategy
+ */
+@Experimental
+object Assign {
+ /**
+ * :: Experimental ::
+ * Assign a fixed collection of TopicPartitions
+ * @param topicPartitions collection of TopicPartitions to assign
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ * Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsets: offsets to begin at on initial startup. If no offset is given for a
+ * TopicPartition, the committed offset (if applicable) or kafka param
+ * auto.offset.reset will be used.
+ */
+ @Experimental
+ def apply[K, V](
+ topicPartitions: Iterable[TopicPartition],
+ kafkaParams: collection.Map[String, Object],
+ offsets: collection.Map[TopicPartition, Long]): Assign[K, V] = {
+ Assign[K, V](
+ new ju.ArrayList(topicPartitions.asJavaCollection),
+ new ju.HashMap[String, Object](kafkaParams.asJava),
+ new ju.HashMap[TopicPartition, Long](offsets.asJava))
+ }
+
+ /**
+ * :: Experimental ::
+ * Assign a fixed collection of TopicPartitions
+ * @param topicPartitions collection of TopicPartitions to assign
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ * Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+ @Experimental
+ def apply[K, V](
+ topicPartitions: Iterable[TopicPartition],
+ kafkaParams: collection.Map[String, Object]): Assign[K, V] = {
+ Assign[K, V](
+ new ju.ArrayList(topicPartitions.asJavaCollection),
+ new ju.HashMap[String, Object](kafkaParams.asJava),
+ ju.Collections.emptyMap[TopicPartition, Long]())
+ }
+
+ /**
+ * :: Experimental ::
+ * Assign a fixed collection of TopicPartitions
+ * @param topicPartitions collection of TopicPartitions to assign
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ * Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsets: offsets to begin at on initial startup. If no offset is given for a
+ * TopicPartition, the committed offset (if applicable) or kafka param
+ * auto.offset.reset will be used.
+ */
+ @Experimental
+ def create[K, V](
+ topicPartitions: ju.Collection[TopicPartition],
+ kafkaParams: ju.Map[String, Object],
+ offsets: ju.Map[TopicPartition, Long]): Assign[K, V] = {
+ Assign[K, V](topicPartitions, kafkaParams, offsets)
+ }
+
+ /**
+ * :: Experimental ::
+ * Assign a fixed collection of TopicPartitions
+ * @param topicPartitions collection of TopicPartitions to assign
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ * Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+ @Experimental
+ def create[K, V](
+ topicPartitions: ju.Collection[TopicPartition],
+ kafkaParams: ju.Map[String, Object]): Assign[K, V] = {
+ Assign[K, V](topicPartitions, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
+ }
+}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
new file mode 100644
index 0000000000..acd1841d53
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ * A DStream where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
+ * of messages
+ * per second that each '''partition''' will accept.
+ * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+ * see [[LocationStrategy]] for more details.
+ * @param executorKafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
+ * configuration parameters</a>.
+ * Requires "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param consumerStrategy In most cases, pass in [[Subscribe]],
+ * see [[ConsumerStrategy]] for more details
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+private[spark] class DirectKafkaInputDStream[K, V](
+ _ssc: StreamingContext,
+ locationStrategy: LocationStrategy,
+ consumerStrategy: ConsumerStrategy[K, V]
+ ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {
+
+ val executorKafkaParams = {
+ val ekp = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams)
+ KafkaUtils.fixKafkaParams(ekp)
+ ekp
+ }
+
+ protected var currentOffsets = Map[TopicPartition, Long]()
+
+ @transient private var kc: Consumer[K, V] = null
+ def consumer(): Consumer[K, V] = this.synchronized {
+ if (null == kc) {
+ kc = consumerStrategy.onStart(currentOffsets)
+ }
+ kc
+ }
+
+ override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = {
+ logError("Kafka ConsumerRecord is not serializable. " +
+ "Use .map to extract fields before calling .persist or .window")
+ super.persist(newLevel)
+ }
+
+ protected def getBrokers = {
+ val c = consumer
+ val result = new ju.HashMap[TopicPartition, String]()
+ val hosts = new ju.HashMap[TopicPartition, String]()
+ val assignments = c.assignment().iterator()
+ while (assignments.hasNext()) {
+ val tp: TopicPartition = assignments.next()
+ if (null == hosts.get(tp)) {
+ val infos = c.partitionsFor(tp.topic).iterator()
+ while (infos.hasNext()) {
+ val i = infos.next()
+ hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host())
+ }
+ }
+ result.put(tp, hosts.get(tp))
+ }
+ result
+ }
+
+ protected def getPreferredHosts: ju.Map[TopicPartition, String] = {
+ locationStrategy match {
+ case PreferBrokers => getBrokers
+ case PreferConsistent => ju.Collections.emptyMap[TopicPartition, String]()
+ case PreferFixed(hostMap) => hostMap
+ }
+ }
+
+ // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
+ private[streaming] override def name: String = s"Kafka 0.10 direct stream [$id]"
+
+ protected[streaming] override val checkpointData =
+ new DirectKafkaInputDStreamCheckpointData
+
+
+ /**
+ * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
+ */
+ override protected[streaming] val rateController: Option[RateController] = {
+ if (RateController.isBackPressureEnabled(ssc.conf)) {
+ Some(new DirectKafkaRateController(id,
+ RateEstimator.create(ssc.conf, context.graph.batchDuration)))
+ } else {
+ None
+ }
+ }
+
+ private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
+ "spark.streaming.kafka.maxRatePerPartition", 0)
+
+ protected[streaming] def maxMessagesPerPartition(
+ offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = {
+ val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
+
+ // calculate a per-partition rate limit based on current lag
+ val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
+ case Some(rate) =>
+ val lagPerPartition = offsets.map { case (tp, offset) =>
+ tp -> Math.max(offset - currentOffsets(tp), 0)
+ }
+ val totalLag = lagPerPartition.values.sum
+
+ lagPerPartition.map { case (tp, lag) =>
+ val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
+ tp -> (if (maxRateLimitPerPartition > 0) {
+ Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
+ }
+ case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }
+ }
+
+ if (effectiveRateLimitPerPartition.values.sum > 0) {
+ val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
+ Some(effectiveRateLimitPerPartition.map {
+ case (tp, limit) => tp -> (secsPerBatch * limit).toLong
+ })
+ } else {
+ None
+ }
+ }
+
+ /**
+ * Returns the latest (highest) available offsets, taking new partitions into account.
+ */
+ protected def latestOffsets(): Map[TopicPartition, Long] = {
+ val c = consumer
+ c.poll(0)
+ val parts = c.assignment().asScala
+
+ // make sure new partitions are reflected in currentOffsets
+ val newPartitions = parts.diff(currentOffsets.keySet)
+ // position for new partitions determined by auto.offset.reset if no commit
+ currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
+ // don't want to consume messages, so pause
+ c.pause(newPartitions.asJava)
+ // find latest available offsets
+ c.seekToEnd(currentOffsets.keySet.asJava)
+ parts.map(tp => tp -> c.position(tp)).toMap
+ }
+
+ // limits the maximum number of messages per partition
+ protected def clamp(
+ offsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
+
+ maxMessagesPerPartition(offsets).map { mmp =>
+ mmp.map { case (tp, messages) =>
+ val uo = offsets(tp)
+ tp -> Math.min(currentOffsets(tp) + messages, uo)
+ }
+ }.getOrElse(offsets)
+ }
+
+ override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
+ val untilOffsets = clamp(latestOffsets())
+ val offsetRanges = untilOffsets.map { case (tp, uo) =>
+ val fo = currentOffsets(tp)
+ OffsetRange(tp.topic, tp.partition, fo, uo)
+ }
+ val rdd = new KafkaRDD[K, V](
+ context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)
+
+ // Report the record number and metadata of this batch interval to InputInfoTracker.
+ val description = offsetRanges.filter { offsetRange =>
+ // Don't display empty ranges.
+ offsetRange.fromOffset != offsetRange.untilOffset
+ }.map { offsetRange =>
+ s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
+ s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+ }.mkString("\n")
+ // Copy offsetRanges to immutable.List to prevent from being modified by the user
+ val metadata = Map(
+ "offsets" -> offsetRanges.toList,
+ StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
+ val inputInfo = StreamInputInfo(id, rdd.count, metadata)
+ ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
+
+ currentOffsets = untilOffsets
+ commitAll()
+ Some(rdd)
+ }
+
+ override def start(): Unit = {
+ val c = consumer
+ c.poll(0)
+ if (currentOffsets.isEmpty) {
+ currentOffsets = c.assignment().asScala.map { tp =>
+ tp -> c.position(tp)
+ }.toMap
+ }
+
+ // don't actually want to consume any messages, so pause all partitions
+ c.pause(currentOffsets.keySet.asJava)
+ }
+
+ override def stop(): Unit = this.synchronized {
+ if (kc != null) {
+ kc.close()
+ }
+ }
+
+ protected val commitQueue = new ConcurrentLinkedQueue[OffsetRange]
+ protected val commitCallback = new AtomicReference[OffsetCommitCallback]
+
+ /**
+ * Queue up offset ranges for commit to Kafka at a future time. Threadsafe.
+ * @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
+ */
+ def commitAsync(offsetRanges: Array[OffsetRange]): Unit = {
+ commitAsync(offsetRanges, null)
+ }
+
+ /**
+ * Queue up offset ranges for commit to Kafka at a future time. Threadsafe.
+ * @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
+ * @param callback Only the most recently provided callback will be used at commit.
+ */
+ def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit = {
+ commitCallback.set(callback)
+ commitQueue.addAll(ju.Arrays.asList(offsetRanges: _*))
+ }
+
+ protected def commitAll(): Unit = {
+ val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
+ val it = commitQueue.iterator()
+ while (it.hasNext) {
+ val osr = it.next
+ val tp = osr.topicPartition
+ val x = m.get(tp)
+ val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
+ m.put(tp, new OffsetAndMetadata(offset))
+ }
+ if (!m.isEmpty) {
+ consumer.commitAsync(m, commitCallback.get)
+ }
+ }
+
+ private[streaming]
+ class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
+ def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
+ data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
+ }
+
+ override def update(time: Time): Unit = {
+ batchForTime.clear()
+ generatedRDDs.foreach { kv =>
+ val a = kv._2.asInstanceOf[KafkaRDD[K, V]].offsetRanges.map(_.toTuple).toArray
+ batchForTime += kv._1 -> a
+ }
+ }
+
+ override def cleanup(time: Time): Unit = { }
+
+ override def restore(): Unit = {
+ batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
+ logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
+ generatedRDDs += t -> new KafkaRDD[K, V](
+ context.sparkContext,
+ executorKafkaParams,
+ b.map(OffsetRange(_)),
+ getPreferredHosts,
+ // during restore, it's possible same partition will be consumed from multiple
+ // threads, so dont use cache
+ false
+ )
+ }
+ }
+ }
+
+ /**
+ * A RateController to retrieve the rate from RateEstimator.
+ */
+ private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator)
+ extends RateController(id, estimator) {
+ override def publish(rate: Long): Unit = ()
+ }
+}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
new file mode 100644
index 0000000000..c15c163449
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * configuration parameters</a>. Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
+ * @param preferredHosts map from TopicPartition to preferred host for processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers.
+ * @param useConsumerCache whether to use a consumer from a per-jvm cache
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+private[spark] class KafkaRDD[K, V](
+ sc: SparkContext,
+ val kafkaParams: ju.Map[String, Object],
+ val offsetRanges: Array[OffsetRange],
+ val preferredHosts: ju.Map[TopicPartition, String],
+ useConsumerCache: Boolean
+) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges {
+
+ assert("none" ==
+ kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String],
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
+ " must be set to none for executor kafka params, else messages may not match offsetRange")
+
+ assert(false ==
+ kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean],
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG +
+ " must be set to false for executor kafka params, else offsets may commit before processing")
+
+ // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
+ private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
+ private val cacheInitialCapacity =
+ conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
+ private val cacheMaxCapacity =
+ conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64)
+ private val cacheLoadFactor =
+ conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 0.75).toFloat
+
+ override def persist(newLevel: StorageLevel): this.type = {
+ logError("Kafka ConsumerRecord is not serializable. " +
+ "Use .map to extract fields before calling .persist or .window")
+ super.persist(newLevel)
+ }
+
+ override def getPartitions: Array[Partition] = {
+ offsetRanges.zipWithIndex.map { case (o, i) =>
+ new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
+ }.toArray
+ }
+
+ override def count(): Long = offsetRanges.map(_.count).sum
+
+ override def countApprox(
+ timeout: Long,
+ confidence: Double = 0.95
+ ): PartialResult[BoundedDouble] = {
+ val c = count
+ new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
+ }
+
+ override def isEmpty(): Boolean = count == 0L
+
+ override def take(num: Int): Array[ConsumerRecord[K, V]] = {
+ val nonEmptyPartitions = this.partitions
+ .map(_.asInstanceOf[KafkaRDDPartition])
+ .filter(_.count > 0)
+
+ if (num < 1 || nonEmptyPartitions.isEmpty) {
+ return new Array[ConsumerRecord[K, V]](0)
+ }
+
+ // Determine in advance how many messages need to be taken from each partition
+ val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
+ val remain = num - result.values.sum
+ if (remain > 0) {
+ val taken = Math.min(remain, part.count)
+ result + (part.index -> taken.toInt)
+ } else {
+ result
+ }
+ }
+
+ val buf = new ArrayBuffer[ConsumerRecord[K, V]]
+ val res = context.runJob(
+ this,
+ (tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) =>
+ it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
+ )
+ res.foreach(buf ++= _)
+ buf.toArray
+ }
+
+ private def executors(): Array[ExecutorCacheTaskLocation] = {
+ val bm = sparkContext.env.blockManager
+ bm.master.getPeers(bm.blockManagerId).toArray
+ .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+ .sortWith(compareExecutors)
+ }
+
+ protected[kafka010] def compareExecutors(
+ a: ExecutorCacheTaskLocation,
+ b: ExecutorCacheTaskLocation): Boolean =
+ if (a.host == b.host) {
+ a.executorId > b.executorId
+ } else {
+ a.host > b.host
+ }
+
+ /**
+ * Non-negative modulus, from java 8 math
+ */
+ private def floorMod(a: Int, b: Int): Int = ((a % b) + b) % b
+
+ override def getPreferredLocations(thePart: Partition): Seq[String] = {
+ // The intention is best-effort consistent executor for a given topicpartition,
+ // so that caching consumers can be effective.
+ // TODO what about hosts specified by ip vs name
+ val part = thePart.asInstanceOf[KafkaRDDPartition]
+ val allExecs = executors()
+ val tp = part.topicPartition
+ val prefHost = preferredHosts.get(tp)
+ val prefExecs = if (null == prefHost) allExecs else allExecs.filter(_.host == prefHost)
+ val execs = if (prefExecs.isEmpty) allExecs else prefExecs
+ if (execs.isEmpty) {
+ Seq()
+ } else {
+ // execs is sorted, tp.hashCode depends only on topic and partition, so consistent index
+ val index = this.floorMod(tp.hashCode, execs.length)
+ val chosen = execs(index)
+ Seq(chosen.toString)
+ }
+ }
+
+ private def errBeginAfterEnd(part: KafkaRDDPartition): String =
+ s"Beginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} " +
+ s"for topic ${part.topic} partition ${part.partition}. " +
+ "You either provided an invalid fromOffset, or the Kafka topic has been damaged"
+
+ override def compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]] = {
+ val part = thePart.asInstanceOf[KafkaRDDPartition]
+ assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
+ if (part.fromOffset == part.untilOffset) {
+ logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
+ s"skipping ${part.topic} ${part.partition}")
+ Iterator.empty
+ } else {
+ new KafkaRDDIterator(part, context)
+ }
+ }
+
+ /**
+ * An iterator that fetches messages directly from Kafka for the offsets in partition.
+ * Uses a cached consumer where possible to take advantage of prefetching
+ */
+ private class KafkaRDDIterator(
+ part: KafkaRDDPartition,
+ context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
+
+ logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
+ s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+
+ val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+ context.addTaskCompletionListener{ context => closeIfNeeded() }
+
+ val consumer = if (useConsumerCache) {
+ CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
+ if (context.attemptNumber > 1) {
+ // just in case the prior attempt failures were cache related
+ CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
+ }
+ CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
+ } else {
+ CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
+ }
+
+ var requestOffset = part.fromOffset
+
+ def closeIfNeeded(): Unit = {
+ if (!useConsumerCache && consumer != null) {
+ consumer.close
+ }
+ }
+
+ override def hasNext(): Boolean = requestOffset < part.untilOffset
+
+ override def next(): ConsumerRecord[K, V] = {
+ assert(hasNext(), "Can't call getNext() once untilOffset has been reached")
+ val r = consumer.get(requestOffset, pollTimeout)
+ requestOffset += 1
+ r
+ }
+ }
+}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDDPartition.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDDPartition.scala
new file mode 100644
index 0000000000..95569b109f
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDDPartition.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.Partition
+
+
+/**
+ * @param topic kafka topic name
+ * @param partition kafka partition id
+ * @param fromOffset inclusive starting offset
+ * @param untilOffset exclusive ending offset
+ */
+private[kafka010]
+class KafkaRDDPartition(
+ val index: Int,
+ val topic: String,
+ val partition: Int,
+ val fromOffset: Long,
+ val untilOffset: Long
+) extends Partition {
+ /** Number of messages this partition refers to */
+ def count(): Long = untilOffset - fromOffset
+
+ /** Kafka TopicPartition object, for convenience */
+ def topicPartition(): TopicPartition = new TopicPartition(topic, partition)
+
+}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
new file mode 100644
index 0000000000..13c08430db
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.io.File
+import java.lang.{Integer => JInt}
+import java.net.InetSocketAddress
+import java.util.{Map => JMap, Properties}
+import java.util.concurrent.TimeoutException
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.control.NonFatal
+
+import kafka.admin.AdminUtils
+import kafka.api.Request
+import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
+import kafka.serializer.StringEncoder
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.ZkUtils
+import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.Time
+import org.apache.spark.util.Utils
+
+/**
+ * This is a helper class for Kafka test suites. This has the functionality to set up
+ * and tear down local Kafka servers, and to push data using Kafka producers.
+ *
+ * The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
+ */
+private[kafka010] class KafkaTestUtils extends Logging {
+
+ // Zookeeper related configurations
+ private val zkHost = "localhost"
+ private var zkPort: Int = 0
+ private val zkConnectionTimeout = 60000
+ private val zkSessionTimeout = 6000
+
+ private var zookeeper: EmbeddedZookeeper = _
+
+ private var zkUtils: ZkUtils = _
+
+ // Kafka broker related configurations
+ private val brokerHost = "localhost"
+ private var brokerPort = 9092
+ private var brokerConf: KafkaConfig = _
+
+ // Kafka broker server
+ private var server: KafkaServer = _
+
+ // Kafka producer
+ private var producer: Producer[String, String] = _
+
+ // Flag to test whether the system is correctly started
+ private var zkReady = false
+ private var brokerReady = false
+
+ def zkAddress: String = {
+ assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address")
+ s"$zkHost:$zkPort"
+ }
+
+ def brokerAddress: String = {
+ assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address")
+ s"$brokerHost:$brokerPort"
+ }
+
+ def zookeeperClient: ZkUtils = {
+ assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
+ Option(zkUtils).getOrElse(
+ throw new IllegalStateException("Zookeeper client is not yet initialized"))
+ }
+
+ // Set up the Embedded Zookeeper server and get the proper Zookeeper port
+ private def setupEmbeddedZookeeper(): Unit = {
+ // Zookeeper server startup
+ zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+ // Get the actual zookeeper binding port
+ zkPort = zookeeper.actualPort
+ zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false)
+ zkReady = true
+ }
+
+ // Set up the Embedded Kafka server
+ private def setupEmbeddedKafkaServer(): Unit = {
+ assert(zkReady, "Zookeeper should be set up beforehand")
+
+ // Kafka broker startup
+ Utils.startServiceOnPort(brokerPort, port => {
+ brokerPort = port
+ brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
+ server = new KafkaServer(brokerConf)
+ server.startup()
+ (server, port)
+ }, new SparkConf(), "KafkaBroker")
+
+ brokerReady = true
+ }
+
+ /** setup the whole embedded servers, including Zookeeper and Kafka brokers */
+ def setup(): Unit = {
+ setupEmbeddedZookeeper()
+ setupEmbeddedKafkaServer()
+ }
+
+ /** Teardown the whole servers, including Kafka broker and Zookeeper */
+ def teardown(): Unit = {
+ brokerReady = false
+ zkReady = false
+
+ if (producer != null) {
+ producer.close()
+ producer = null
+ }
+
+ if (server != null) {
+ server.shutdown()
+ server = null
+ }
+
+ brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+
+ if (zkUtils != null) {
+ zkUtils.close()
+ zkUtils = null
+ }
+
+ if (zookeeper != null) {
+ zookeeper.shutdown()
+ zookeeper = null
+ }
+ }
+
+ /** Create a Kafka topic and wait until it is propagated to the whole cluster */
+ def createTopic(topic: String, partitions: Int): Unit = {
+ AdminUtils.createTopic(zkUtils, topic, partitions, 1)
+ // wait until metadata is propagated
+ (0 until partitions).foreach { p =>
+ waitUntilMetadataIsPropagated(topic, p)
+ }
+ }
+
+ /** Create a Kafka topic and wait until it is propagated to the whole cluster */
+ def createTopic(topic: String): Unit = {
+ createTopic(topic, 1)
+ }
+
+ /** Java-friendly function for sending messages to the Kafka broker */
+ def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
+ sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
+ }
+
+ /** Send the messages to the Kafka broker */
+ def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
+ val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray
+ sendMessages(topic, messages)
+ }
+
+ /** Send the array of messages to the Kafka broker */
+ def sendMessages(topic: String, messages: Array[String]): Unit = {
+ producer = new Producer[String, String](new ProducerConfig(producerConfiguration))
+ producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
+ producer.close()
+ producer = null
+ }
+
+ private def brokerConfiguration: Properties = {
+ val props = new Properties()
+ props.put("broker.id", "0")
+ props.put("host.name", "localhost")
+ props.put("port", brokerPort.toString)
+ props.put("log.dir", Utils.createTempDir().getAbsolutePath)
+ props.put("zookeeper.connect", zkAddress)
+ props.put("log.flush.interval.messages", "1")
+ props.put("replica.socket.timeout.ms", "1500")
+ props
+ }
+
+ private def producerConfiguration: Properties = {
+ val props = new Properties()
+ props.put("metadata.broker.list", brokerAddress)
+ props.put("serializer.class", classOf[StringEncoder].getName)
+ // wait for all in-sync replicas to ack sends
+ props.put("request.required.acks", "-1")
+ props
+ }
+
+ // A simplified version of scalatest eventually, rewritten here to avoid adding extra test
+ // dependency
+ def eventually[T](timeout: Time, interval: Time)(func: => T): T = {
+ def makeAttempt(): Either[Throwable, T] = {
+ try {
+ Right(func)
+ } catch {
+ case e if NonFatal(e) => Left(e)
+ }
+ }
+
+ val startTime = System.currentTimeMillis()
+ @tailrec
+ def tryAgain(attempt: Int): T = {
+ makeAttempt() match {
+ case Right(result) => result
+ case Left(e) =>
+ val duration = System.currentTimeMillis() - startTime
+ if (duration < timeout.milliseconds) {
+ Thread.sleep(interval.milliseconds)
+ } else {
+ throw new TimeoutException(e.getMessage)
+ }
+
+ tryAgain(attempt + 1)
+ }
+ }
+
+ tryAgain(1)
+ }
+
+ private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
+ def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
+ case Some(partitionState) =>
+ val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
+
+ zkUtils.getLeaderForPartition(topic, partition).isDefined &&
+ Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
+ leaderAndInSyncReplicas.isr.size >= 1
+
+ case _ =>
+ false
+ }
+ eventually(Time(10000), Time(100)) {
+ assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
+ }
+ }
+
+ private class EmbeddedZookeeper(val zkConnect: String) {
+ val snapshotDir = Utils.createTempDir()
+ val logDir = Utils.createTempDir()
+
+ val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
+ val (ip, port) = {
+ val splits = zkConnect.split(":")
+ (splits(0), splits(1).toInt)
+ }
+ val factory = new NIOServerCnxnFactory()
+ factory.configure(new InetSocketAddress(ip, port), 16)
+ factory.startup(zookeeper)
+
+ val actualPort = factory.getLocalPort
+
+ def shutdown() {
+ factory.shutdown()
+ Utils.deleteRecursively(snapshotDir)
+ Utils.deleteRecursively(logDir)
+ }
+ }
+}
+
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
new file mode 100644
index 0000000000..c0524990bc
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext }
+import org.apache.spark.api.java.function.{ Function0 => JFunction0 }
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.{ JavaInputDStream, JavaStreamingContext }
+import org.apache.spark.streaming.dstream._
+
+/**
+ * :: Experimental ::
+ * Companion object for constructing Kafka streams and RDDs
+ */
+@Experimental
+object KafkaUtils extends Logging {
+ /**
+ * :: Experimental ::
+ * Scala constructor for a batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * configuration parameters</a>. Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
+ * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+ * see [[LocationStrategy]] for more details.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+ @Experimental
+ def createRDD[K, V](
+ sc: SparkContext,
+ kafkaParams: ju.Map[String, Object],
+ offsetRanges: Array[OffsetRange],
+ locationStrategy: LocationStrategy
+ ): RDD[ConsumerRecord[K, V]] = {
+ val preferredHosts = locationStrategy match {
+ case PreferBrokers =>
+ throw new AssertionError(
+ "If you want to prefer brokers, you must provide a mapping using PreferFixed " +
+ "A single KafkaRDD does not have a driver consumer and cannot look up brokers for you.")
+ case PreferConsistent => ju.Collections.emptyMap[TopicPartition, String]()
+ case PreferFixed(hostMap) => hostMap
+ }
+ val kp = new ju.HashMap[String, Object](kafkaParams)
+ fixKafkaParams(kp)
+ val osr = offsetRanges.clone()
+
+ new KafkaRDD[K, V](sc, kp, osr, preferredHosts, true)
+ }
+
+ /**
+ * :: Experimental ::
+ * Java constructor for a batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param keyClass Class of the keys in the Kafka records
+ * @param valueClass Class of the values in the Kafka records
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * configuration parameters</a>. Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
+ * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+ * see [[LocationStrategy]] for more details.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+ @Experimental
+ def createRDD[K, V](
+ jsc: JavaSparkContext,
+ kafkaParams: ju.Map[String, Object],
+ offsetRanges: Array[OffsetRange],
+ locationStrategy: LocationStrategy
+ ): JavaRDD[ConsumerRecord[K, V]] = {
+
+ new JavaRDD(createRDD[K, V](jsc.sc, kafkaParams, offsetRanges, locationStrategy))
+ }
+
+ /**
+ * :: Experimental ::
+ * Scala constructor for a DStream where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
+ * of messages
+ * per second that each '''partition''' will accept.
+ * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+ * see [[LocationStrategy]] for more details.
+ * @param consumerStrategy In most cases, pass in [[Subscribe]],
+ * see [[ConsumerStrategy]] for more details
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+ @Experimental
+ def createDirectStream[K, V](
+ ssc: StreamingContext,
+ locationStrategy: LocationStrategy,
+ consumerStrategy: ConsumerStrategy[K, V]
+ ): InputDStream[ConsumerRecord[K, V]] = {
+ new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy)
+ }
+
+ /**
+ * :: Experimental ::
+ * Java constructor for a DStream where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * @param keyClass Class of the keys in the Kafka records
+ * @param valueClass Class of the values in the Kafka records
+ * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+ * see [[LocationStrategy]] for more details.
+ * @param consumerStrategy In most cases, pass in [[Subscribe]],
+ * see [[ConsumerStrategy]] for more details
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+ @Experimental
+ def createDirectStream[K, V](
+ jssc: JavaStreamingContext,
+ locationStrategy: LocationStrategy,
+ consumerStrategy: ConsumerStrategy[K, V]
+ ): JavaInputDStream[ConsumerRecord[K, V]] = {
+ new JavaInputDStream(
+ createDirectStream[K, V](
+ jssc.ssc, locationStrategy, consumerStrategy))
+ }
+
+ /**
+ * Tweak kafka params to prevent issues on executors
+ */
+ private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = {
+ logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to false for executor")
+ kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)
+
+ logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")
+ kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
+
+ // driver and executor should be in different consumer groups
+ val groupId = "spark-executor-" + kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+ logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
+ kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+
+ // possible workaround for KAFKA-3135
+ val rbb = kafkaParams.get(ConsumerConfig.RECEIVE_BUFFER_CONFIG)
+ if (null == rbb || rbb.asInstanceOf[java.lang.Integer] < 65536) {
+ logWarning(s"overriding ${ConsumerConfig.RECEIVE_BUFFER_CONFIG} to 65536 see KAFKA-3135")
+ kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
+ }
+ }
+}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
new file mode 100644
index 0000000000..df620300ea
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.annotation.Experimental
+
+
+/**
+ * :: Experimental ::
+ * Choice of how to schedule consumers for a given TopicPartition on an executor.
+ * Kafka 0.10 consumers prefetch messages, so it's important for performance
+ * to keep cached consumers on appropriate executors, not recreate them for every partition.
+ * Choice of location is only a preference, not an absolute; partitions may be scheduled elsewhere.
+ */
+@Experimental
+sealed trait LocationStrategy
+
+/**
+ * :: Experimental ::
+ * Use this only if your executors are on the same nodes as your Kafka brokers.
+ */
+@Experimental
+case object PreferBrokers extends LocationStrategy {
+ def create: PreferBrokers.type = this
+}
+
+/**
+ * :: Experimental ::
+ * Use this in most cases, it will consistently distribute partitions across all executors.
+ */
+@Experimental
+case object PreferConsistent extends LocationStrategy {
+ def create: PreferConsistent.type = this
+}
+
+/**
+ * :: Experimental ::
+ * Use this to place particular TopicPartitions on particular hosts if your load is uneven.
+ * Any TopicPartition not specified in the map will use a consistent location.
+ */
+@Experimental
+case class PreferFixed private(hostMap: ju.Map[TopicPartition, String]) extends LocationStrategy
+
+/**
+ * :: Experimental ::
+ * Use this to place particular TopicPartitions on particular hosts if your load is uneven.
+ * Any TopicPartition not specified in the map will use a consistent location.
+ */
+@Experimental
+object PreferFixed {
+ def apply(hostMap: collection.Map[TopicPartition, String]): PreferFixed = {
+ PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
+ }
+ def create(hostMap: ju.Map[TopicPartition, String]): PreferFixed =
+ PreferFixed(hostMap)
+}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala
new file mode 100644
index 0000000000..c66d3c9b8d
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import org.apache.kafka.clients.consumer.OffsetCommitCallback
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the
+ * offset ranges in RDDs generated by the direct Kafka DStream (see
+ * [[KafkaUtils.createDirectStream]]).
+ * {{{
+ * KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
+ * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ * ...
+ * }
+ * }}}
+ */
+trait HasOffsetRanges {
+ def offsetRanges: Array[OffsetRange]
+}
+
+/**
+ * :: Experimental ::
+ * Represents any object that can commit a collection of [[OffsetRange]]s.
+ * The direct Kafka DStream implements this interface (see
+ * [[KafkaUtils.createDirectStream]]).
+ * {{{
+ * val stream = KafkaUtils.createDirectStream(...)
+ * ...
+ * stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets, new OffsetCommitCallback() {
+ * def onComplete(m: java.util.Map[TopicPartition, OffsetAndMetadata], e: Exception) {
+ * if (null != e) {
+ * // error
+ * } else {
+ * // success
+ * }
+ * }
+ * })
+ * }}}
+ */
+@Experimental
+trait CanCommitOffsets {
+ /**
+ * :: Experimental ::
+ * Queue up offset ranges for commit to Kafka at a future time. Threadsafe.
+ * This is only needed if you intend to store offsets in Kafka, instead of your own store.
+ * @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
+ */
+ @Experimental
+ def commitAsync(offsetRanges: Array[OffsetRange]): Unit
+
+ /**
+ * :: Experimental ::
+ * Queue up offset ranges for commit to Kafka at a future time. Threadsafe.
+ * This is only needed if you intend to store offsets in Kafka, instead of your own store.
+ * @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
+ * @param callback Only the most recently provided callback will be used at commit.
+ */
+ @Experimental
+ def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
+}
+
+/**
+ * Represents a range of offsets from a single Kafka TopicPartition. Instances of this class
+ * can be created with `OffsetRange.create()`.
+ * @param topic Kafka topic name
+ * @param partition Kafka partition id
+ * @param fromOffset Inclusive starting offset
+ * @param untilOffset Exclusive ending offset
+ */
+final class OffsetRange private(
+ val topic: String,
+ val partition: Int,
+ val fromOffset: Long,
+ val untilOffset: Long) extends Serializable {
+ import OffsetRange.OffsetRangeTuple
+
+ /** Kafka TopicPartition object, for convenience */
+ def topicPartition(): TopicPartition = new TopicPartition(topic, partition)
+
+ /** Number of messages this OffsetRange refers to */
+ def count(): Long = untilOffset - fromOffset
+
+ override def equals(obj: Any): Boolean = obj match {
+ case that: OffsetRange =>
+ this.topic == that.topic &&
+ this.partition == that.partition &&
+ this.fromOffset == that.fromOffset &&
+ this.untilOffset == that.untilOffset
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ toTuple.hashCode()
+ }
+
+ override def toString(): String = {
+ s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset])"
+ }
+
+ /** this is to avoid ClassNotFoundException during checkpoint restore */
+ private[streaming]
+ def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset)
+}
+
+/**
+ * Companion object the provides methods to create instances of [[OffsetRange]].
+ */
+object OffsetRange {
+ def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
+ new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+ def create(
+ topicPartition: TopicPartition,
+ fromOffset: Long,
+ untilOffset: Long): OffsetRange =
+ new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset)
+
+ def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
+ new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+ def apply(
+ topicPartition: TopicPartition,
+ fromOffset: Long,
+ untilOffset: Long): OffsetRange =
+ new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset)
+
+ /** this is to avoid ClassNotFoundException during checkpoint restore */
+ private[kafka010]
+ type OffsetRangeTuple = (String, Int, Long, Long)
+
+ private[kafka010]
+ def apply(t: OffsetRangeTuple) =
+ new OffsetRange(t._1, t._2, t._3, t._4)
+}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package-info.java b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package-info.java
new file mode 100644
index 0000000000..ebfcf8764a
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Spark Integration for Kafka 0.10
+ */
+package org.apache.spark.streaming.kafka010;
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
new file mode 100644
index 0000000000..2bfc1e84d7
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+/**
+ * Spark Integration for Kafka 0.10
+ */
+package object kafka
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
new file mode 100644
index 0000000000..aba45f5de6
--- /dev/null
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.collection.JavaConverters;
+
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JavaConsumerStrategySuite implements Serializable {
+
+ @Test
+ public void testConsumerStrategyConstructors() {
+ final String topic1 = "topic1";
+ final Collection<String> topics = Arrays.asList(topic1);
+ final scala.collection.Iterable<String> sTopics =
+ JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
+ final TopicPartition tp1 = new TopicPartition(topic1, 0);
+ final TopicPartition tp2 = new TopicPartition(topic1, 1);
+ final Collection<TopicPartition> parts = Arrays.asList(tp1, tp2);
+ final scala.collection.Iterable<TopicPartition> sParts =
+ JavaConverters.collectionAsScalaIterableConverter(parts).asScala();
+ final Map<String, Object> kafkaParams = new HashMap<String, Object>();
+ kafkaParams.put("bootstrap.servers", "not used");
+ final scala.collection.Map<String, Object> sKafkaParams =
+ JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
+ final Map<TopicPartition, Object> offsets = new HashMap<>();
+ offsets.put(tp1, 23L);
+ final scala.collection.Map<TopicPartition, Object> sOffsets =
+ JavaConverters.mapAsScalaMapConverter(offsets).asScala();
+
+ // make sure constructors can be called from java
+ final ConsumerStrategy<String, String> sub0 =
+ Subscribe.<String, String>apply(topics, kafkaParams, offsets);
+ final ConsumerStrategy<String, String> sub1 =
+ Subscribe.<String, String>apply(sTopics, sKafkaParams, sOffsets);
+ final ConsumerStrategy<String, String> sub2 =
+ Subscribe.<String, String>apply(sTopics, sKafkaParams);
+ final ConsumerStrategy<String, String> sub3 =
+ Subscribe.<String, String>create(topics, kafkaParams, offsets);
+ final ConsumerStrategy<String, String> sub4 =
+ Subscribe.<String, String>create(topics, kafkaParams);
+
+ Assert.assertEquals(
+ sub1.executorKafkaParams().get("bootstrap.servers"),
+ sub3.executorKafkaParams().get("bootstrap.servers"));
+
+ final ConsumerStrategy<String, String> asn0 =
+ Assign.<String, String>apply(parts, kafkaParams, offsets);
+ final ConsumerStrategy<String, String> asn1 =
+ Assign.<String, String>apply(sParts, sKafkaParams, sOffsets);
+ final ConsumerStrategy<String, String> asn2 =
+ Assign.<String, String>apply(sParts, sKafkaParams);
+ final ConsumerStrategy<String, String> asn3 =
+ Assign.<String, String>create(parts, kafkaParams, offsets);
+ final ConsumerStrategy<String, String> asn4 =
+ Assign.<String, String>create(parts, kafkaParams);
+
+ Assert.assertEquals(
+ asn1.executorKafkaParams().get("bootstrap.servers"),
+ asn3.executorKafkaParams().get("bootstrap.servers"));
+ }
+
+}
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
new file mode 100644
index 0000000000..e57ede7afa
--- /dev/null
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+public class JavaDirectKafkaStreamSuite implements Serializable {
+ private transient JavaStreamingContext ssc = null;
+ private transient KafkaTestUtils kafkaTestUtils = null;
+
+ @Before
+ public void setUp() {
+ kafkaTestUtils = new KafkaTestUtils();
+ kafkaTestUtils.setup();
+ SparkConf sparkConf = new SparkConf()
+ .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+ ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
+ }
+
+ @After
+ public void tearDown() {
+ if (ssc != null) {
+ ssc.stop();
+ ssc = null;
+ }
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown();
+ kafkaTestUtils = null;
+ }
+ }
+
+ @Test
+ public void testKafkaStream() throws InterruptedException {
+ final String topic1 = "topic1";
+ final String topic2 = "topic2";
+ // hold a reference to the current offset ranges, so it can be used downstream
+ final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
+
+ String[] topic1data = createTopicAndSendData(topic1);
+ String[] topic2data = createTopicAndSendData(topic2);
+
+ Set<String> sent = new HashSet<>();
+ sent.addAll(Arrays.asList(topic1data));
+ sent.addAll(Arrays.asList(topic2data));
+
+ Random random = new Random();
+
+ final Map<String, Object> kafkaParams = new HashMap<>();
+ kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress());
+ kafkaParams.put("key.deserializer", StringDeserializer.class);
+ kafkaParams.put("value.deserializer", StringDeserializer.class);
+ kafkaParams.put("auto.offset.reset", "earliest");
+ kafkaParams.put("group.id", "java-test-consumer-" + random.nextInt() +
+ "-" + System.currentTimeMillis());
+
+ JavaInputDStream<ConsumerRecord<String, String>> istream1 = KafkaUtils.createDirectStream(
+ ssc,
+ PreferConsistent.create(),
+ Subscribe.<String, String>create(Arrays.asList(topic1), kafkaParams)
+ );
+
+ JavaDStream<String> stream1 = istream1.transform(
+ // Make sure you can get offset ranges from the rdd
+ new Function<JavaRDD<ConsumerRecord<String, String>>,
+ JavaRDD<ConsumerRecord<String, String>>>() {
+ @Override
+ public JavaRDD<ConsumerRecord<String, String>> call(
+ JavaRDD<ConsumerRecord<String, String>> rdd
+ ) {
+ OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+ offsetRanges.set(offsets);
+ Assert.assertEquals(topic1, offsets[0].topic());
+ return rdd;
+ }
+ }
+ ).map(
+ new Function<ConsumerRecord<String, String>, String>() {
+ @Override
+ public String call(ConsumerRecord<String, String> r) {
+ return r.value();
+ }
+ }
+ );
+
+ final Map<String, Object> kafkaParams2 = new HashMap<>(kafkaParams);
+ kafkaParams2.put("group.id", "java-test-consumer-" + random.nextInt() +
+ "-" + System.currentTimeMillis());
+
+ JavaInputDStream<ConsumerRecord<String, String>> istream2 = KafkaUtils.createDirectStream(
+ ssc,
+ PreferConsistent.create(),
+ Subscribe.<String, String>create(Arrays.asList(topic2), kafkaParams2)
+ );
+
+ JavaDStream<String> stream2 = istream2.transform(
+ // Make sure you can get offset ranges from the rdd
+ new Function<JavaRDD<ConsumerRecord<String, String>>,
+ JavaRDD<ConsumerRecord<String, String>>>() {
+ @Override
+ public JavaRDD<ConsumerRecord<String, String>> call(
+ JavaRDD<ConsumerRecord<String, String>> rdd
+ ) {
+ OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+ offsetRanges.set(offsets);
+ Assert.assertEquals(topic2, offsets[0].topic());
+ return rdd;
+ }
+ }
+ ).map(
+ new Function<ConsumerRecord<String, String>, String>() {
+ @Override
+ public String call(ConsumerRecord<String, String> r) {
+ return r.value();
+ }
+ }
+ );
+
+ JavaDStream<String> unifiedStream = stream1.union(stream2);
+
+ final Set<String> result = Collections.synchronizedSet(new HashSet<String>());
+ unifiedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
+ @Override
+ public void call(JavaRDD<String> rdd) {
+ result.addAll(rdd.collect());
+ }
+ }
+ );
+ ssc.start();
+ long startTime = System.currentTimeMillis();
+ boolean matches = false;
+ while (!matches && System.currentTimeMillis() - startTime < 20000) {
+ matches = sent.size() == result.size();
+ Thread.sleep(50);
+ }
+ Assert.assertEquals(sent, result);
+ ssc.stop();
+ }
+
+ private String[] createTopicAndSendData(String topic) {
+ String[] data = { topic + "-1", topic + "-2", topic + "-3"};
+ kafkaTestUtils.createTopic(topic);
+ kafkaTestUtils.sendMessages(topic, data);
+ return data;
+ }
+}
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java
new file mode 100644
index 0000000000..548ba134dc
--- /dev/null
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+public class JavaKafkaRDDSuite implements Serializable {
+ private transient JavaSparkContext sc = null;
+ private transient KafkaTestUtils kafkaTestUtils = null;
+
+ @Before
+ public void setUp() {
+ kafkaTestUtils = new KafkaTestUtils();
+ kafkaTestUtils.setup();
+ SparkConf sparkConf = new SparkConf()
+ .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+ sc = new JavaSparkContext(sparkConf);
+ }
+
+ @After
+ public void tearDown() {
+ if (sc != null) {
+ sc.stop();
+ sc = null;
+ }
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown();
+ kafkaTestUtils = null;
+ }
+ }
+
+ @Test
+ public void testKafkaRDD() throws InterruptedException {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ createTopicAndSendData(topic1);
+ createTopicAndSendData(topic2);
+
+ Map<String, Object> kafkaParams = new HashMap<>();
+ kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress());
+ kafkaParams.put("key.deserializer", StringDeserializer.class);
+ kafkaParams.put("value.deserializer", StringDeserializer.class);
+
+ OffsetRange[] offsetRanges = {
+ OffsetRange.create(topic1, 0, 0, 1),
+ OffsetRange.create(topic2, 0, 0, 1)
+ };
+
+ Map<TopicPartition, String> leaders = new HashMap<>();
+ String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":");
+ String broker = hostAndPort[0];
+ leaders.put(offsetRanges[0].topicPartition(), broker);
+ leaders.put(offsetRanges[1].topicPartition(), broker);
+
+ Function<ConsumerRecord<String, String>, String> handler =
+ new Function<ConsumerRecord<String, String>, String>() {
+ @Override
+ public String call(ConsumerRecord<String, String> r) {
+ return r.value();
+ }
+ };
+
+ JavaRDD<String> rdd1 = KafkaUtils.<String, String>createRDD(
+ sc,
+ kafkaParams,
+ offsetRanges,
+ PreferFixed.create(leaders)
+ ).map(handler);
+
+ JavaRDD<String> rdd2 = KafkaUtils.<String, String>createRDD(
+ sc,
+ kafkaParams,
+ offsetRanges,
+ PreferConsistent.create()
+ ).map(handler);
+
+ // just making sure the java user apis work; the scala tests handle logic corner cases
+ long count1 = rdd1.count();
+ long count2 = rdd2.count();
+ Assert.assertTrue(count1 > 0);
+ Assert.assertEquals(count1, count2);
+ }
+
+ private String[] createTopicAndSendData(String topic) {
+ String[] data = { topic + "-1", topic + "-2", topic + "-3"};
+ kafkaTestUtils.createTopic(topic);
+ kafkaTestUtils.sendMessages(topic, data);
+ return data;
+ }
+}
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java
new file mode 100644
index 0000000000..7873c09e1a
--- /dev/null
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.collection.JavaConverters;
+
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JavaLocationStrategySuite implements Serializable {
+
+ @Test
+ public void testLocationStrategyConstructors() {
+ final String topic1 = "topic1";
+ final TopicPartition tp1 = new TopicPartition(topic1, 0);
+ final TopicPartition tp2 = new TopicPartition(topic1, 1);
+ final Map<TopicPartition, String> hosts = new HashMap<>();
+ hosts.put(tp1, "node1");
+ hosts.put(tp2, "node2");
+ final scala.collection.Map<TopicPartition, String> sHosts =
+ JavaConverters.mapAsScalaMapConverter(hosts).asScala();
+
+ // make sure constructors can be called from java
+ final LocationStrategy c1 = PreferConsistent.create();
+ final LocationStrategy c2 = PreferConsistent$.MODULE$;
+ Assert.assertEquals(c1, c2);
+
+ final LocationStrategy c3 = PreferBrokers.create();
+ final LocationStrategy c4 = PreferBrokers$.MODULE$;
+ Assert.assertEquals(c3, c4);
+
+ final LocationStrategy c5 = PreferFixed.create(hosts);
+ final LocationStrategy c6 = PreferFixed.apply(sHosts);
+ Assert.assertEquals(c5, c6);
+
+ }
+
+}
diff --git a/external/kafka-0-10/src/test/resources/log4j.properties b/external/kafka-0-10/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..75e3b53a09
--- /dev/null
+++ b/external/kafka-0-10/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN
+
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
new file mode 100644
index 0000000000..776d11ad2f
--- /dev/null
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.io.File
+import java.util.{ Arrays, HashMap => JHashMap, Map => JMap }
+import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+import org.apache.spark.util.Utils
+
+class DirectKafkaStreamSuite
+ extends SparkFunSuite
+ with BeforeAndAfter
+ with BeforeAndAfterAll
+ with Eventually
+ with Logging {
+ val sparkConf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName(this.getClass.getSimpleName)
+
+ private var sc: SparkContext = _
+ private var ssc: StreamingContext = _
+ private var testDir: File = _
+
+ private var kafkaTestUtils: KafkaTestUtils = _
+
+ override def beforeAll {
+ kafkaTestUtils = new KafkaTestUtils
+ kafkaTestUtils.setup()
+ }
+
+ override def afterAll {
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown()
+ kafkaTestUtils = null
+ }
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ sc = null
+ }
+ if (sc != null) {
+ sc.stop()
+ }
+ if (testDir != null) {
+ Utils.deleteRecursively(testDir)
+ }
+ }
+
+ def getKafkaParams(extra: (String, Object)*): JHashMap[String, Object] = {
+ val kp = new JHashMap[String, Object]()
+ kp.put("bootstrap.servers", kafkaTestUtils.brokerAddress)
+ kp.put("key.deserializer", classOf[StringDeserializer])
+ kp.put("value.deserializer", classOf[StringDeserializer])
+ kp.put("group.id", s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}")
+ extra.foreach(e => kp.put(e._1, e._2))
+ kp
+ }
+
+ val preferredHosts = PreferConsistent
+
+ test("basic stream receiving with multiple topics and smallest starting offset") {
+ val topics = List("basic1", "basic2", "basic3")
+ val data = Map("a" -> 7, "b" -> 9)
+ topics.foreach { t =>
+ kafkaTestUtils.createTopic(t)
+ kafkaTestUtils.sendMessages(t, data)
+ }
+ val totalSent = data.values.sum * topics.size
+ val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String](
+ ssc, preferredHosts, Subscribe[String, String](topics, kafkaParams.asScala))
+ }
+ val allReceived = new ConcurrentLinkedQueue[(String, String)]()
+
+ // hold a reference to the current offset ranges, so it can be used downstream
+ var offsetRanges = Array[OffsetRange]()
+ val tf = stream.transform { rdd =>
+ // Get the offset ranges in the RDD
+ offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ rdd.map(r => (r.key, r.value))
+ }
+
+ tf.foreachRDD { rdd =>
+ for (o <- offsetRanges) {
+ logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
+ }
+ val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
+ // For each partition, get size of the range in the partition,
+ // and the number of items in the partition
+ val off = offsetRanges(i)
+ val all = iter.toSeq
+ val partSize = all.size
+ val rangeSize = off.untilOffset - off.fromOffset
+ Iterator((partSize, rangeSize))
+ }.collect
+
+ // Verify whether number of elements in each partition
+ // matches with the corresponding offset range
+ collected.foreach { case (partSize, rangeSize) =>
+ assert(partSize === rangeSize, "offset ranges are wrong")
+ }
+ }
+
+ stream.foreachRDD { rdd =>
+ allReceived.addAll(Arrays.asList(rdd.map(r => (r.key, r.value)).collect(): _*))
+ }
+ ssc.start()
+ eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+ assert(allReceived.size === totalSent,
+ "didn't get expected number of messages, messages:\n" +
+ allReceived.asScala.mkString("\n"))
+ }
+ ssc.stop()
+ }
+
+ test("receiving from largest starting offset") {
+ val topic = "latest"
+ val topicPartition = new TopicPartition(topic, 0)
+ val data = Map("a" -> 10)
+ kafkaTestUtils.createTopic(topic)
+ val kafkaParams = getKafkaParams("auto.offset.reset" -> "latest")
+ val kc = new KafkaConsumer(kafkaParams)
+ kc.assign(Arrays.asList(topicPartition))
+ def getLatestOffset(): Long = {
+ kc.seekToEnd(Arrays.asList(topicPartition))
+ kc.position(topicPartition)
+ }
+
+ // Send some initial messages before starting context
+ kafkaTestUtils.sendMessages(topic, data)
+ eventually(timeout(10 seconds), interval(20 milliseconds)) {
+ assert(getLatestOffset() > 3)
+ }
+ val offsetBeforeStart = getLatestOffset()
+ kc.close()
+
+ // Setup context and kafka stream with largest offset
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val stream = withClue("Error creating direct stream") {
+ val s = new DirectKafkaInputDStream[String, String](
+ ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala))
+ s.consumer.poll(0)
+ assert(
+ s.consumer.position(topicPartition) >= offsetBeforeStart,
+ "Start offset not from latest"
+ )
+ s
+ }
+
+ val collectedData = new ConcurrentLinkedQueue[String]()
+ stream.map { _.value }.foreachRDD { rdd =>
+ collectedData.addAll(Arrays.asList(rdd.collect(): _*))
+ }
+ ssc.start()
+ val newData = Map("b" -> 10)
+ kafkaTestUtils.sendMessages(topic, newData)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ collectedData.contains("b")
+ }
+ assert(!collectedData.contains("a"))
+ }
+
+
+ test("creating stream by offset") {
+ val topic = "offset"
+ val topicPartition = new TopicPartition(topic, 0)
+ val data = Map("a" -> 10)
+ kafkaTestUtils.createTopic(topic)
+ val kafkaParams = getKafkaParams("auto.offset.reset" -> "latest")
+ val kc = new KafkaConsumer(kafkaParams)
+ kc.assign(Arrays.asList(topicPartition))
+ def getLatestOffset(): Long = {
+ kc.seekToEnd(Arrays.asList(topicPartition))
+ kc.position(topicPartition)
+ }
+
+ // Send some initial messages before starting context
+ kafkaTestUtils.sendMessages(topic, data)
+ eventually(timeout(10 seconds), interval(20 milliseconds)) {
+ assert(getLatestOffset() >= 10)
+ }
+ val offsetBeforeStart = getLatestOffset()
+ kc.close()
+
+ // Setup context and kafka stream with largest offset
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val stream = withClue("Error creating direct stream") {
+ val s = new DirectKafkaInputDStream[String, String](ssc, preferredHosts,
+ Assign[String, String](
+ List(topicPartition),
+ kafkaParams.asScala,
+ Map(topicPartition -> 11L)))
+ s.consumer.poll(0)
+ assert(
+ s.consumer.position(topicPartition) >= offsetBeforeStart,
+ "Start offset not from latest"
+ )
+ s
+ }
+
+ val collectedData = new ConcurrentLinkedQueue[String]()
+ stream.map(_.value).foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
+ ssc.start()
+ val newData = Map("b" -> 10)
+ kafkaTestUtils.sendMessages(topic, newData)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ collectedData.contains("b")
+ }
+ assert(!collectedData.contains("a"))
+ }
+
+ // Test to verify the offset ranges can be recovered from the checkpoints
+ test("offset recovery") {
+ val topic = "recovery"
+ kafkaTestUtils.createTopic(topic)
+ testDir = Utils.createTempDir()
+
+ val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+
+ // Send data to Kafka
+ def sendData(data: Seq[Int]) {
+ val strings = data.map { _.toString}
+ kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
+ }
+
+ // Setup the streaming context
+ ssc = new StreamingContext(sparkConf, Milliseconds(100))
+ val kafkaStream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String](
+ ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala))
+ }
+ val keyedStream = kafkaStream.map { r => "key" -> r.value.toInt }
+ val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
+ Some(values.sum + state.getOrElse(0))
+ }
+ ssc.checkpoint(testDir.getAbsolutePath)
+
+ // This is ensure all the data is eventually receiving only once
+ stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
+ rdd.collect().headOption.foreach { x =>
+ DirectKafkaStreamSuite.total.set(x._2)
+ }
+ }
+
+ ssc.start()
+
+ // Send some data
+ for (i <- (1 to 10).grouped(4)) {
+ sendData(i)
+ }
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
+ }
+
+ ssc.stop()
+
+ // Verify that offset ranges were generated
+ val offsetRangesBeforeStop = getOffsetRanges(kafkaStream)
+ assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated")
+ assert(
+ offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 },
+ "starting offset not zero"
+ )
+
+ logInfo("====== RESTARTING ========")
+
+ // Recover context from checkpoints
+ ssc = new StreamingContext(testDir.getAbsolutePath)
+ val recoveredStream =
+ ssc.graph.getInputStreams().head.asInstanceOf[DStream[ConsumerRecord[String, String]]]
+
+ // Verify offset ranges have been recovered
+ val recoveredOffsetRanges = getOffsetRanges(recoveredStream).map { x => (x._1, x._2.toSet) }
+ assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
+ val earlierOffsetRanges = offsetRangesBeforeStop.map { x => (x._1, x._2.toSet) }
+ assert(
+ recoveredOffsetRanges.forall { or =>
+ earlierOffsetRanges.contains((or._1, or._2))
+ },
+ "Recovered ranges are not the same as the ones generated\n" +
+ earlierOffsetRanges + "\n" + recoveredOffsetRanges
+ )
+ // Restart context, give more data and verify the total at the end
+ // If the total is write that means each records has been received only once
+ ssc.start()
+ for (i <- (11 to 20).grouped(4)) {
+ sendData(i)
+ }
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
+ }
+ ssc.stop()
+ }
+
+ // Test to verify the offsets can be recovered from Kafka
+ test("offset recovery from kafka") {
+ val topic = "recoveryfromkafka"
+ kafkaTestUtils.createTopic(topic)
+
+ val kafkaParams = getKafkaParams(
+ "auto.offset.reset" -> "earliest",
+ ("enable.auto.commit", false: java.lang.Boolean)
+ )
+
+ val collectedData = new ConcurrentLinkedQueue[String]()
+ val committed = new JHashMap[TopicPartition, OffsetAndMetadata]()
+
+ // Send data to Kafka and wait for it to be received
+ def sendDataAndWaitForReceive(data: Seq[Int]) {
+ val strings = data.map { _.toString}
+ kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ assert(strings.forall { collectedData.contains })
+ }
+ }
+
+ // Setup the streaming context
+ ssc = new StreamingContext(sparkConf, Milliseconds(100))
+ withClue("Error creating direct stream") {
+ val kafkaStream = KafkaUtils.createDirectStream[String, String](
+ ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala))
+ kafkaStream.foreachRDD { (rdd: RDD[ConsumerRecord[String, String]], time: Time) =>
+ val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ val data = rdd.map(_.value).collect()
+ collectedData.addAll(Arrays.asList(data: _*))
+ kafkaStream.asInstanceOf[CanCommitOffsets]
+ .commitAsync(offsets, new OffsetCommitCallback() {
+ def onComplete(m: JMap[TopicPartition, OffsetAndMetadata], e: Exception) {
+ if (null != e) {
+ logError("commit failed", e)
+ } else {
+ committed.putAll(m)
+ }
+ }
+ })
+ }
+ }
+ ssc.start()
+ // Send some data and wait for them to be received
+ for (i <- (1 to 10).grouped(4)) {
+ sendDataAndWaitForReceive(i)
+ }
+ ssc.stop()
+ assert(! committed.isEmpty)
+ val consumer = new KafkaConsumer[String, String](kafkaParams)
+ consumer.subscribe(Arrays.asList(topic))
+ consumer.poll(0)
+ committed.asScala.foreach {
+ case (k, v) =>
+ // commits are async, not exactly once
+ assert(v.offset > 0)
+ assert(consumer.position(k) >= v.offset)
+ }
+ }
+
+
+ test("Direct Kafka stream report input information") {
+ val topic = "report-test"
+ val data = Map("a" -> 7, "b" -> 9)
+ kafkaTestUtils.createTopic(topic)
+ kafkaTestUtils.sendMessages(topic, data)
+
+ val totalSent = data.values.sum
+ val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+
+ import DirectKafkaStreamSuite._
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val collector = new InputInfoCollector
+ ssc.addStreamingListener(collector)
+
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String](
+ ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala))
+ }
+
+ val allReceived = new ConcurrentLinkedQueue[(String, String)]
+
+ stream.map(r => (r.key, r.value))
+ .foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
+ ssc.start()
+ eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+ assert(allReceived.size === totalSent,
+ "didn't get expected number of messages, messages:\n" +
+ allReceived.asScala.mkString("\n"))
+
+ // Calculate all the record number collected in the StreamingListener.
+ assert(collector.numRecordsSubmitted.get() === totalSent)
+ assert(collector.numRecordsStarted.get() === totalSent)
+ assert(collector.numRecordsCompleted.get() === totalSent)
+ }
+ ssc.stop()
+ }
+
+ test("maxMessagesPerPartition with backpressure disabled") {
+ val topic = "maxMessagesPerPartition"
+ val kafkaStream = getDirectKafkaStream(topic, None)
+
+ val input = Map(new TopicPartition(topic, 0) -> 50L, new TopicPartition(topic, 1) -> 50L)
+ assert(kafkaStream.maxMessagesPerPartition(input).get ==
+ Map(new TopicPartition(topic, 0) -> 10L, new TopicPartition(topic, 1) -> 10L))
+ }
+
+ test("maxMessagesPerPartition with no lag") {
+ val topic = "maxMessagesPerPartition"
+ val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 100))
+ val kafkaStream = getDirectKafkaStream(topic, rateController)
+
+ val input = Map(new TopicPartition(topic, 0) -> 0L, new TopicPartition(topic, 1) -> 0L)
+ assert(kafkaStream.maxMessagesPerPartition(input).isEmpty)
+ }
+
+ test("maxMessagesPerPartition respects max rate") {
+ val topic = "maxMessagesPerPartition"
+ val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 1000))
+ val kafkaStream = getDirectKafkaStream(topic, rateController)
+
+ val input = Map(new TopicPartition(topic, 0) -> 1000L, new TopicPartition(topic, 1) -> 1000L)
+ assert(kafkaStream.maxMessagesPerPartition(input).get ==
+ Map(new TopicPartition(topic, 0) -> 10L, new TopicPartition(topic, 1) -> 10L))
+ }
+
+ test("using rate controller") {
+ val topic = "backpressure"
+ val topicPartitions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
+ kafkaTestUtils.createTopic(topic, 2)
+ val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+ val executorKafkaParams = new JHashMap[String, Object](kafkaParams)
+ KafkaUtils.fixKafkaParams(executorKafkaParams)
+
+ val batchIntervalMilliseconds = 100
+ val estimator = new ConstantEstimator(100)
+ val messages = Map("foo" -> 200)
+ kafkaTestUtils.sendMessages(topic, messages)
+
+ val sparkConf = new SparkConf()
+ // Safe, even with streaming, because we're using the direct API.
+ // Using 1 core is useful to make the test more predictable.
+ .setMaster("local[1]")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.kafka.maxRatePerPartition", "100")
+
+ // Setup the streaming context
+ ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
+
+ val kafkaStream = withClue("Error creating direct stream") {
+ new DirectKafkaInputDStream[String, String](
+ ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) {
+ override protected[streaming] val rateController =
+ Some(new DirectKafkaRateController(id, estimator))
+ }.map(r => (r.key, r.value))
+ }
+
+ val collectedData = new ConcurrentLinkedQueue[Array[String]]()
+
+ // Used for assertion failure messages.
+ def dataToString: String =
+ collectedData.asScala.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")
+
+ // This is to collect the raw data received from Kafka
+ kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
+ val data = rdd.map { _._2 }.collect()
+ collectedData.add(data)
+ }
+
+ ssc.start()
+
+ // Try different rate limits.
+ // Wait for arrays of data to appear matching the rate.
+ Seq(100, 50, 20).foreach { rate =>
+ collectedData.clear() // Empty this buffer on each pass.
+ estimator.updateRate(rate) // Set a new rate.
+ // Expect blocks of data equal to "rate", scaled by the interval length in secs.
+ val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
+ eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
+ // Assert that rate estimator values are used to determine maxMessagesPerPartition.
+ // Funky "-" in message makes the complete assertion message read better.
+ assert(collectedData.asScala.exists(_.size == expectedSize),
+ s" - No arrays of size $expectedSize for rate $rate found in $dataToString")
+ }
+ }
+
+ ssc.stop()
+ }
+
+ /** Get the generated offset ranges from the DirectKafkaStream */
+ private def getOffsetRanges[K, V](
+ kafkaStream: DStream[ConsumerRecord[K, V]]): Seq[(Time, Array[OffsetRange])] = {
+ kafkaStream.generatedRDDs.mapValues { rdd =>
+ rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ }.toSeq.sortBy { _._1 }
+ }
+
+ private def getDirectKafkaStream(topic: String, mockRateController: Option[RateController]) = {
+ val batchIntervalMilliseconds = 100
+
+ val sparkConf = new SparkConf()
+ .setMaster("local[1]")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.kafka.maxRatePerPartition", "100")
+
+ // Setup the streaming context
+ ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
+
+ val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+ val ekp = new JHashMap[String, Object](kafkaParams)
+ KafkaUtils.fixKafkaParams(ekp)
+
+ val s = new DirectKafkaInputDStream[String, String](
+ ssc,
+ preferredHosts,
+ new ConsumerStrategy[String, String] {
+ def executorKafkaParams = ekp
+ def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[String, String] = {
+ val consumer = new KafkaConsumer[String, String](kafkaParams)
+ val tps = List(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
+ consumer.assign(Arrays.asList(tps: _*))
+ tps.foreach(tp => consumer.seek(tp, 0))
+ consumer
+ }
+ }
+ ) {
+ override protected[streaming] val rateController = mockRateController
+ }
+ // manual start necessary because we arent consuming the stream, just checking its state
+ s.start()
+ s
+ }
+}
+
+object DirectKafkaStreamSuite {
+ val total = new AtomicLong(-1L)
+
+ class InputInfoCollector extends StreamingListener {
+ val numRecordsSubmitted = new AtomicLong(0L)
+ val numRecordsStarted = new AtomicLong(0L)
+ val numRecordsCompleted = new AtomicLong(0L)
+
+ override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
+ numRecordsSubmitted.addAndGet(batchSubmitted.batchInfo.numRecords)
+ }
+
+ override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
+ numRecordsStarted.addAndGet(batchStarted.batchInfo.numRecords)
+ }
+
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
+ numRecordsCompleted.addAndGet(batchCompleted.batchInfo.numRecords)
+ }
+ }
+}
+
+private[streaming] class ConstantEstimator(@volatile private var rate: Long)
+ extends RateEstimator {
+
+ def updateRate(newRate: Long): Unit = {
+ rate = newRate
+ }
+
+ def compute(
+ time: Long,
+ elements: Long,
+ processingDelay: Long,
+ schedulingDelay: Long): Option[Double] = Some(rate)
+}
+
+private[streaming] class ConstantRateController(id: Int, estimator: RateEstimator, rate: Long)
+ extends RateController(id, estimator) {
+ override def publish(rate: Long): Unit = ()
+ override def getLatestRate(): Long = rate
+}
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
new file mode 100644
index 0000000000..3d2546ddd9
--- /dev/null
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark._
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+
+class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
+
+ private var kafkaTestUtils: KafkaTestUtils = _
+
+ private val sparkConf = new SparkConf().setMaster("local[4]")
+ .setAppName(this.getClass.getSimpleName)
+ private var sc: SparkContext = _
+
+ override def beforeAll {
+ sc = new SparkContext(sparkConf)
+ kafkaTestUtils = new KafkaTestUtils
+ kafkaTestUtils.setup()
+ }
+
+ override def afterAll {
+ if (sc != null) {
+ sc.stop
+ sc = null
+ }
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown()
+ kafkaTestUtils = null
+ }
+ }
+
+ private def getKafkaParams() = Map[String, Object](
+ "bootstrap.servers" -> kafkaTestUtils.brokerAddress,
+ "key.deserializer" -> classOf[StringDeserializer],
+ "value.deserializer" -> classOf[StringDeserializer],
+ "group.id" -> s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}"
+ ).asJava
+
+ private val preferredHosts = PreferConsistent
+
+ test("basic usage") {
+ val topic = s"topicbasic-${Random.nextInt}-${System.currentTimeMillis}"
+ kafkaTestUtils.createTopic(topic)
+ val messages = Array("the", "quick", "brown", "fox")
+ kafkaTestUtils.sendMessages(topic, messages)
+
+ val kafkaParams = getKafkaParams()
+
+ val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
+
+ val rdd = KafkaUtils.createRDD[String, String](sc, kafkaParams, offsetRanges, preferredHosts)
+ .map(_.value)
+
+ val received = rdd.collect.toSet
+ assert(received === messages.toSet)
+
+ // size-related method optimizations return sane results
+ assert(rdd.count === messages.size)
+ assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
+ assert(!rdd.isEmpty)
+ assert(rdd.take(1).size === 1)
+ assert(rdd.take(1).head === messages.head)
+ assert(rdd.take(messages.size + 10).size === messages.size)
+
+ val emptyRdd = KafkaUtils.createRDD[String, String](
+ sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)), preferredHosts)
+
+ assert(emptyRdd.isEmpty)
+
+ // invalid offset ranges throw exceptions
+ val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1))
+ intercept[SparkException] {
+ val result = KafkaUtils.createRDD[String, String](sc, kafkaParams, badRanges, preferredHosts)
+ .map(_.value)
+ .collect()
+ }
+ }
+
+ test("iterator boundary conditions") {
+ // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
+ val topic = s"topicboundary-${Random.nextInt}-${System.currentTimeMillis}"
+ val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
+ kafkaTestUtils.createTopic(topic)
+
+ val kafkaParams = getKafkaParams()
+
+ // this is the "lots of messages" case
+ kafkaTestUtils.sendMessages(topic, sent)
+ var sentCount = sent.values.sum
+
+ val rdd = KafkaUtils.createRDD[String, String](sc, kafkaParams,
+ Array(OffsetRange(topic, 0, 0, sentCount)), preferredHosts)
+
+ val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum
+
+ assert(rangeCount === sentCount, "offset range didn't include all sent messages")
+ assert(rdd.map(_.offset).collect.sorted === (0 until sentCount).toArray,
+ "didn't get all sent messages")
+
+ // this is the "0 messages" case
+ val rdd2 = KafkaUtils.createRDD[String, String](sc, kafkaParams,
+ Array(OffsetRange(topic, 0, sentCount, sentCount)), preferredHosts)
+
+ // shouldn't get anything, since message is sent after rdd was defined
+ val sentOnlyOne = Map("d" -> 1)
+
+ kafkaTestUtils.sendMessages(topic, sentOnlyOne)
+
+ assert(rdd2.map(_.value).collect.size === 0, "got messages when there shouldn't be any")
+
+ // this is the "exactly 1 message" case, namely the single message from sentOnlyOne above
+ val rdd3 = KafkaUtils.createRDD[String, String](sc, kafkaParams,
+ Array(OffsetRange(topic, 0, sentCount, sentCount + 1)), preferredHosts)
+
+ // send lots of messages after rdd was defined, they shouldn't show up
+ kafkaTestUtils.sendMessages(topic, Map("extra" -> 22))
+
+ assert(rdd3.map(_.value).collect.head === sentOnlyOne.keys.head,
+ "didn't get exactly one message")
+ }
+
+ test("executor sorting") {
+ val kafkaParams = new ju.HashMap[String, Object](getKafkaParams())
+ kafkaParams.put("auto.offset.reset", "none")
+ val rdd = new KafkaRDD[String, String](
+ sc,
+ kafkaParams,
+ Array(OffsetRange("unused", 0, 1, 2)),
+ ju.Collections.emptyMap[TopicPartition, String](),
+ true)
+ val a3 = ExecutorCacheTaskLocation("a", "3")
+ val a4 = ExecutorCacheTaskLocation("a", "4")
+ val b1 = ExecutorCacheTaskLocation("b", "1")
+ val b2 = ExecutorCacheTaskLocation("b", "2")
+
+ val correct = Array(b2, b1, a4, a3)
+
+ correct.permutations.foreach { p =>
+ assert(p.sortWith(rdd.compareExecutors) === correct)
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 89ed87ff9e..c99d786b14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,6 +109,8 @@
<module>launcher</module>
<module>external/kafka-0-8</module>
<module>external/kafka-0-8-assembly</module>
+ <module>external/kafka-0-10</module>
+ <module>external/kafka-0-10-assembly</module>
</modules>
<properties>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 4c01ad3c33..8e3dcc2f38 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -44,9 +44,9 @@ object BuildCommons {
).map(ProjectRef(buildLocation, _))
val streamingProjects@Seq(
- streaming, streamingFlumeSink, streamingFlume, streamingKafka
+ streaming, streamingFlumeSink, streamingFlume, streamingKafka, streamingKafka010
) = Seq(
- "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka-0-8"
+ "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka-0-8", "streaming-kafka-0-10"
).map(ProjectRef(buildLocation, _))
val allProjects@Seq(
@@ -61,8 +61,8 @@ object BuildCommons {
Seq("yarn", "java8-tests", "ganglia-lgpl", "streaming-kinesis-asl",
"docker-integration-tests").map(ProjectRef(buildLocation, _))
- val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKinesisAslAssembly) =
- Seq("network-yarn", "streaming-flume-assembly", "streaming-kafka-0-8-assembly", "streaming-kinesis-asl-assembly")
+ val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) =
+ Seq("network-yarn", "streaming-flume-assembly", "streaming-kafka-0-8-assembly", "streaming-kafka-0-10-assembly", "streaming-kinesis-asl-assembly")
.map(ProjectRef(buildLocation, _))
val copyJarsProjects@Seq(assembly, examples) = Seq("assembly", "examples")
@@ -352,7 +352,7 @@ object SparkBuild extends PomBuild {
val mimaProjects = allProjects.filterNot { x =>
Seq(
spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn,
- unsafe, tags, sketch, mllibLocal
+ unsafe, tags, sketch, mllibLocal, streamingKafka010
).contains(x)
}
@@ -608,7 +608,7 @@ object Assembly {
.getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
},
jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) =>
- if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-0-8-assembly") || mName.contains("streaming-kinesis-asl-assembly")) {
+ if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-0-8-assembly") || mName.contains("streaming-kafka-0-10-assembly") || mName.contains("streaming-kinesis-asl-assembly")) {
// This must match the same name used in maven (see external/kafka-0-8-assembly/pom.xml)
s"${mName}-${v}.jar"
} else {