aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2016-07-01 00:53:36 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-07-01 00:53:36 -0700
commitfbfd0ab9d70f557c38c7bb8e704475bf19adaf02 (patch)
treeb5488034454172cc918fccda177e5b953d6f973e /external/kafka-0-10
parent14cf61e909598d9f6b9c3b920de7299e9bc828e0 (diff)
downloadspark-fbfd0ab9d70f557c38c7bb8e704475bf19adaf02.tar.gz
spark-fbfd0ab9d70f557c38c7bb8e704475bf19adaf02.tar.bz2
spark-fbfd0ab9d70f557c38c7bb8e704475bf19adaf02.zip
[SPARK-12177][STREAMING][KAFKA] limit api surface area
## What changes were proposed in this pull request? This is an alternative to the refactoring proposed by https://github.com/apache/spark/pull/13996 ## How was this patch tested? unit tests also tested under scala 2.10 via mvn -Dscala-2.10 Author: cody koeninger <cody@koeninger.org> Closes #13998 from koeninger/kafka-0-10-refactor.
Diffstat (limited to 'external/kafka-0-10')
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala187
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala2
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala4
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala5
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala36
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala74
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala2
-rw-r--r--external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java32
-rw-r--r--external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java8
-rw-r--r--external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java9
-rw-r--r--external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java19
-rw-r--r--external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala35
-rw-r--r--external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala2
13 files changed, 222 insertions, 193 deletions
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index 079a07dbc2..70c3f1a98d 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.kafka010
-import java.{ util => ju }
+import java.{ lang => jl, util => ju }
import scala.collection.JavaConverters._
@@ -30,15 +30,16 @@ import org.apache.spark.annotation.Experimental
/**
* :: Experimental ::
* Choice of how to create and configure underlying Kafka Consumers on driver and executors.
+ * See [[ConsumerStrategies]] to obtain instances.
* Kafka 0.10 consumers can require additional, sometimes complex, setup after object
* instantiation. This interface encapsulates that process, and allows it to be checkpointed.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
-trait ConsumerStrategy[K, V] {
+abstract class ConsumerStrategy[K, V] {
/**
- * Kafka <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * Kafka <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on executors. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@@ -51,15 +52,14 @@ trait ConsumerStrategy[K, V] {
* has successfully read. Will be empty on initial start, possibly non-empty on restart from
* checkpoint.
*/
- def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
+ def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V]
}
/**
- * :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
@@ -68,16 +68,15 @@ trait ConsumerStrategy[K, V] {
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
-@Experimental
-case class Subscribe[K, V] private(
- topics: ju.Collection[java.lang.String],
+private case class Subscribe[K, V](
+ topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object],
- offsets: ju.Map[TopicPartition, Long]
+ offsets: ju.Map[TopicPartition, jl.Long]
) extends ConsumerStrategy[K, V] {
def executorKafkaParams: ju.Map[String, Object] = kafkaParams
- def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
+ def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.subscribe(topics)
if (currentOffsets.isEmpty) {
@@ -91,17 +90,51 @@ case class Subscribe[K, V] private(
}
/**
+ * Assign a fixed collection of TopicPartitions
+ * @param topicPartitions collection of TopicPartitions to assign
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ * Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsets: offsets to begin at on initial startup. If no offset is given for a
+ * TopicPartition, the committed offset (if applicable) or kafka param
+ * auto.offset.reset will be used.
+ */
+private case class Assign[K, V](
+ topicPartitions: ju.Collection[TopicPartition],
+ kafkaParams: ju.Map[String, Object],
+ offsets: ju.Map[TopicPartition, jl.Long]
+ ) extends ConsumerStrategy[K, V] {
+
+ def executorKafkaParams: ju.Map[String, Object] = kafkaParams
+
+ def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
+ val consumer = new KafkaConsumer[K, V](kafkaParams)
+ consumer.assign(topicPartitions)
+ if (currentOffsets.isEmpty) {
+ offsets.asScala.foreach { case (topicPartition, offset) =>
+ consumer.seek(topicPartition, offset)
+ }
+ }
+
+ consumer
+ }
+}
+
+/**
* :: Experimental ::
- * Companion object for creating [[Subscribe]] strategy
+ * object for obtaining instances of [[ConsumerStrategy]]
*/
@Experimental
-object Subscribe {
+object ConsumerStrategies {
/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
@@ -111,14 +144,14 @@ object Subscribe {
* auto.offset.reset will be used.
*/
@Experimental
- def apply[K, V](
- topics: Iterable[java.lang.String],
+ def Subscribe[K, V](
+ topics: Iterable[jl.String],
kafkaParams: collection.Map[String, Object],
- offsets: collection.Map[TopicPartition, Long]): Subscribe[K, V] = {
- Subscribe[K, V](
+ offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
+ new Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
- new ju.HashMap[TopicPartition, Long](offsets.asJava))
+ new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
}
/**
@@ -126,20 +159,20 @@ object Subscribe {
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
- def apply[K, V](
- topics: Iterable[java.lang.String],
- kafkaParams: collection.Map[String, Object]): Subscribe[K, V] = {
- Subscribe[K, V](
+ def Subscribe[K, V](
+ topics: Iterable[jl.String],
+ kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
+ new Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
- ju.Collections.emptyMap[TopicPartition, Long]())
+ ju.Collections.emptyMap[TopicPartition, jl.Long]())
}
/**
@@ -147,7 +180,7 @@ object Subscribe {
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
@@ -157,11 +190,11 @@ object Subscribe {
* auto.offset.reset will be used.
*/
@Experimental
- def create[K, V](
- topics: ju.Collection[java.lang.String],
+ def Subscribe[K, V](
+ topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object],
- offsets: ju.Map[TopicPartition, Long]): Subscribe[K, V] = {
- Subscribe[K, V](topics, kafkaParams, offsets)
+ offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
+ new Subscribe[K, V](topics, kafkaParams, offsets)
}
/**
@@ -169,69 +202,25 @@ object Subscribe {
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
- def create[K, V](
- topics: ju.Collection[java.lang.String],
- kafkaParams: ju.Map[String, Object]): Subscribe[K, V] = {
- Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
- }
-
-}
-
-/**
- * :: Experimental ::
- * Assign a fixed collection of TopicPartitions
- * @param topicPartitions collection of TopicPartitions to assign
- * @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
- * configuration parameters</a> to be used on driver. The same params will be used on executors,
- * with minor automatic modifications applied.
- * Requires "bootstrap.servers" to be set
- * with Kafka broker(s) specified in host1:port1,host2:port2 form.
- * @param offsets: offsets to begin at on initial startup. If no offset is given for a
- * TopicPartition, the committed offset (if applicable) or kafka param
- * auto.offset.reset will be used.
- */
-@Experimental
-case class Assign[K, V] private(
- topicPartitions: ju.Collection[TopicPartition],
- kafkaParams: ju.Map[String, Object],
- offsets: ju.Map[TopicPartition, Long]
- ) extends ConsumerStrategy[K, V] {
-
- def executorKafkaParams: ju.Map[String, Object] = kafkaParams
-
- def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
- val consumer = new KafkaConsumer[K, V](kafkaParams)
- consumer.assign(topicPartitions)
- if (currentOffsets.isEmpty) {
- offsets.asScala.foreach { case (topicPartition, offset) =>
- consumer.seek(topicPartition, offset)
- }
- }
-
- consumer
+ def Subscribe[K, V](
+ topics: ju.Collection[jl.String],
+ kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
+ new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]())
}
-}
-/**
- * :: Experimental ::
- * Companion object for creating [[Assign]] strategy
- */
-@Experimental
-object Assign {
/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
@@ -241,14 +230,14 @@ object Assign {
* auto.offset.reset will be used.
*/
@Experimental
- def apply[K, V](
+ def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object],
- offsets: collection.Map[TopicPartition, Long]): Assign[K, V] = {
- Assign[K, V](
+ offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
+ new Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
- new ju.HashMap[TopicPartition, Long](offsets.asJava))
+ new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
}
/**
@@ -256,20 +245,20 @@ object Assign {
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
- def apply[K, V](
+ def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
- kafkaParams: collection.Map[String, Object]): Assign[K, V] = {
- Assign[K, V](
+ kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
+ new Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
- ju.Collections.emptyMap[TopicPartition, Long]())
+ ju.Collections.emptyMap[TopicPartition, jl.Long]())
}
/**
@@ -277,7 +266,7 @@ object Assign {
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
@@ -287,11 +276,11 @@ object Assign {
* auto.offset.reset will be used.
*/
@Experimental
- def create[K, V](
+ def Assign[K, V](
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object],
- offsets: ju.Map[TopicPartition, Long]): Assign[K, V] = {
- Assign[K, V](topicPartitions, kafkaParams, offsets)
+ offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
+ new Assign[K, V](topicPartitions, kafkaParams, offsets)
}
/**
@@ -299,16 +288,20 @@ object Assign {
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
- def create[K, V](
+ def Assign[K, V](
topicPartitions: ju.Collection[TopicPartition],
- kafkaParams: ju.Map[String, Object]): Assign[K, V] = {
- Assign[K, V](topicPartitions, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
+ kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
+ new Assign[K, V](
+ topicPartitions,
+ kafkaParams,
+ ju.Collections.emptyMap[TopicPartition, jl.Long]())
}
+
}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index acd1841d53..13827f68f2 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -71,7 +71,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
@transient private var kc: Consumer[K, V] = null
def consumer(): Consumer[K, V] = this.synchronized {
if (null == kc) {
- kc = consumerStrategy.onStart(currentOffsets)
+ kc = consumerStrategy.onStart(currentOffsets.mapValues(l => new java.lang.Long(l)).asJava)
}
kc
}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index c15c163449..5b5a9ac48c 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -36,7 +36,7 @@ import org.apache.spark.storage.StorageLevel
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a>. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
@@ -66,7 +66,7 @@ private[spark] class KafkaRDD[K, V](
" must be set to false for executor kafka params, else offsets may commit before processing")
// TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
- private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
+ private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512)
private val cacheInitialCapacity =
conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
private val cacheMaxCapacity =
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index 13c08430db..19192e4b95 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -61,7 +61,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
// Kafka broker related configurations
private val brokerHost = "localhost"
- private var brokerPort = 9092
+ private var brokerPort = 0
private var brokerConf: KafkaConfig = _
// Kafka broker server
@@ -110,7 +110,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
server = new KafkaServer(brokerConf)
server.startup()
- (server, port)
+ brokerPort = server.boundPort()
+ (server, brokerPort)
}, new SparkConf(), "KafkaBroker")
brokerReady = true
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
index c0524990bc..b2190bfa05 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
@@ -34,7 +34,7 @@ import org.apache.spark.streaming.dstream._
/**
* :: Experimental ::
- * Companion object for constructing Kafka streams and RDDs
+ * object for constructing Kafka streams and RDDs
*/
@Experimental
object KafkaUtils extends Logging {
@@ -44,12 +44,12 @@ object KafkaUtils extends Logging {
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a>. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
- * @param locationStrategy In most cases, pass in [[PreferConsistent]],
- * see [[LocationStrategy]] for more details.
+ * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
+ * see [[LocationStrategies]] for more details.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@@ -83,12 +83,12 @@ object KafkaUtils extends Logging {
* @param keyClass Class of the keys in the Kafka records
* @param valueClass Class of the values in the Kafka records
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a>. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
- * @param locationStrategy In most cases, pass in [[PreferConsistent]],
- * see [[LocationStrategy]] for more details.
+ * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
+ * see [[LocationStrategies]] for more details.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@@ -110,10 +110,10 @@ object KafkaUtils extends Logging {
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
* of messages
* per second that each '''partition''' will accept.
- * @param locationStrategy In most cases, pass in [[PreferConsistent]],
- * see [[LocationStrategy]] for more details.
- * @param consumerStrategy In most cases, pass in [[Subscribe]],
- * see [[ConsumerStrategy]] for more details
+ * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
+ * see [[LocationStrategies]] for more details.
+ * @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe,
+ * see [[ConsumerStrategies]] for more details
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@@ -132,10 +132,10 @@ object KafkaUtils extends Logging {
* each given Kafka topic/partition corresponds to an RDD partition.
* @param keyClass Class of the keys in the Kafka records
* @param valueClass Class of the values in the Kafka records
- * @param locationStrategy In most cases, pass in [[PreferConsistent]],
- * see [[LocationStrategy]] for more details.
- * @param consumerStrategy In most cases, pass in [[Subscribe]],
- * see [[ConsumerStrategy]] for more details
+ * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
+ * see [[LocationStrategies]] for more details.
+ * @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe,
+ * see [[ConsumerStrategies]] for more details
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@@ -161,7 +161,11 @@ object KafkaUtils extends Logging {
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
// driver and executor should be in different consumer groups
- val groupId = "spark-executor-" + kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+ val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+ if (null == originalGroupId) {
+ logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")
+ }
+ val groupId = "spark-executor-" + originalGroupId
logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
index df620300ea..c9a8a13f51 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
@@ -29,49 +29,57 @@ import org.apache.spark.annotation.Experimental
/**
* :: Experimental ::
* Choice of how to schedule consumers for a given TopicPartition on an executor.
+ * See [[LocationStrategies]] to obtain instances.
* Kafka 0.10 consumers prefetch messages, so it's important for performance
* to keep cached consumers on appropriate executors, not recreate them for every partition.
* Choice of location is only a preference, not an absolute; partitions may be scheduled elsewhere.
*/
@Experimental
-sealed trait LocationStrategy
+sealed abstract class LocationStrategy
-/**
- * :: Experimental ::
- * Use this only if your executors are on the same nodes as your Kafka brokers.
- */
-@Experimental
-case object PreferBrokers extends LocationStrategy {
- def create: PreferBrokers.type = this
-}
+private case object PreferBrokers extends LocationStrategy
-/**
- * :: Experimental ::
- * Use this in most cases, it will consistently distribute partitions across all executors.
- */
-@Experimental
-case object PreferConsistent extends LocationStrategy {
- def create: PreferConsistent.type = this
-}
+private case object PreferConsistent extends LocationStrategy
-/**
- * :: Experimental ::
- * Use this to place particular TopicPartitions on particular hosts if your load is uneven.
- * Any TopicPartition not specified in the map will use a consistent location.
- */
-@Experimental
-case class PreferFixed private(hostMap: ju.Map[TopicPartition, String]) extends LocationStrategy
+private case class PreferFixed(hostMap: ju.Map[TopicPartition, String]) extends LocationStrategy
/**
- * :: Experimental ::
- * Use this to place particular TopicPartitions on particular hosts if your load is uneven.
- * Any TopicPartition not specified in the map will use a consistent location.
+ * :: Experimental :: object to obtain instances of [[LocationStrategy]]
+ *
*/
@Experimental
-object PreferFixed {
- def apply(hostMap: collection.Map[TopicPartition, String]): PreferFixed = {
- PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
- }
- def create(hostMap: ju.Map[TopicPartition, String]): PreferFixed =
- PreferFixed(hostMap)
+object LocationStrategies {
+ /**
+ * :: Experimental ::
+ * Use this only if your executors are on the same nodes as your Kafka brokers.
+ */
+ @Experimental
+ def PreferBrokers: LocationStrategy =
+ org.apache.spark.streaming.kafka010.PreferBrokers
+
+ /**
+ * :: Experimental ::
+ * Use this in most cases, it will consistently distribute partitions across all executors.
+ */
+ @Experimental
+ def PreferConsistent: LocationStrategy =
+ org.apache.spark.streaming.kafka010.PreferConsistent
+
+ /**
+ * :: Experimental ::
+ * Use this to place particular TopicPartitions on particular hosts if your load is uneven.
+ * Any TopicPartition not specified in the map will use a consistent location.
+ */
+ @Experimental
+ def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy =
+ new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
+
+ /**
+ * :: Experimental ::
+ * Use this to place particular TopicPartitions on particular hosts if your load is uneven.
+ * Any TopicPartition not specified in the map will use a consistent location.
+ */
+ @Experimental
+ def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy =
+ new PreferFixed(hostMap)
}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
index 2bfc1e84d7..09db6d6062 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
@@ -20,4 +20,4 @@ package org.apache.spark.streaming
/**
* Spark Integration for Kafka 0.10
*/
-package object kafka
+package object kafka010 //scalastyle:ignore
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
index 8d7c05b5a6..ac8d64b180 100644
--- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
@@ -44,37 +44,39 @@ public class JavaConsumerStrategySuite implements Serializable {
kafkaParams.put("bootstrap.servers", "not used");
final scala.collection.Map<String, Object> sKafkaParams =
JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
- final Map<TopicPartition, Object> offsets = new HashMap<>();
+ final Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(tp1, 23L);
final scala.collection.Map<TopicPartition, Object> sOffsets =
- JavaConverters.mapAsScalaMapConverter(offsets).asScala();
+ JavaConverters.mapAsScalaMapConverter(offsets).asScala().mapValues(
+ new scala.runtime.AbstractFunction1<Long, Object>() {
+ @Override
+ public Object apply(Long x) {
+ return (Object) x;
+ }
+ }
+ );
- // make sure constructors can be called from java
- // final ConsumerStrategy<String, String> sub0 = // does not compile in Scala 2.10
- // Subscribe.<String, String>apply(topics, kafkaParams, offsets);
final ConsumerStrategy<String, String> sub1 =
- Subscribe.<String, String>apply(sTopics, sKafkaParams, sOffsets);
+ ConsumerStrategies.<String, String>Subscribe(sTopics, sKafkaParams, sOffsets);
final ConsumerStrategy<String, String> sub2 =
- Subscribe.<String, String>apply(sTopics, sKafkaParams);
+ ConsumerStrategies.<String, String>Subscribe(sTopics, sKafkaParams);
final ConsumerStrategy<String, String> sub3 =
- Subscribe.<String, String>create(topics, kafkaParams, offsets);
+ ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams, offsets);
final ConsumerStrategy<String, String> sub4 =
- Subscribe.<String, String>create(topics, kafkaParams);
+ ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams);
Assert.assertEquals(
sub1.executorKafkaParams().get("bootstrap.servers"),
sub3.executorKafkaParams().get("bootstrap.servers"));
- // final ConsumerStrategy<String, String> asn0 = // does not compile in Scala 2.10
- // Assign.<String, String>apply(parts, kafkaParams, offsets);
final ConsumerStrategy<String, String> asn1 =
- Assign.<String, String>apply(sParts, sKafkaParams, sOffsets);
+ ConsumerStrategies.<String, String>Assign(sParts, sKafkaParams, sOffsets);
final ConsumerStrategy<String, String> asn2 =
- Assign.<String, String>apply(sParts, sKafkaParams);
+ ConsumerStrategies.<String, String>Assign(sParts, sKafkaParams);
final ConsumerStrategy<String, String> asn3 =
- Assign.<String, String>create(parts, kafkaParams, offsets);
+ ConsumerStrategies.<String, String>Assign(parts, kafkaParams, offsets);
final ConsumerStrategy<String, String> asn4 =
- Assign.<String, String>create(parts, kafkaParams);
+ ConsumerStrategies.<String, String>Assign(parts, kafkaParams);
Assert.assertEquals(
asn1.executorKafkaParams().get("bootstrap.servers"),
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
index e57ede7afa..dc9c13ba86 100644
--- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
@@ -90,8 +90,8 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
JavaInputDStream<ConsumerRecord<String, String>> istream1 = KafkaUtils.createDirectStream(
ssc,
- PreferConsistent.create(),
- Subscribe.<String, String>create(Arrays.asList(topic1), kafkaParams)
+ LocationStrategies.PreferConsistent(),
+ ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topic1), kafkaParams)
);
JavaDStream<String> stream1 = istream1.transform(
@@ -123,8 +123,8 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
JavaInputDStream<ConsumerRecord<String, String>> istream2 = KafkaUtils.createDirectStream(
ssc,
- PreferConsistent.create(),
- Subscribe.<String, String>create(Arrays.asList(topic2), kafkaParams2)
+ LocationStrategies.PreferConsistent(),
+ ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topic2), kafkaParams2)
);
JavaDStream<String> stream2 = istream2.transform(
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java
index 548ba134dc..87bfe1514e 100644
--- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka010;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.Random;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.TopicPartition;
@@ -65,6 +66,8 @@ public class JavaKafkaRDDSuite implements Serializable {
String topic1 = "topic1";
String topic2 = "topic2";
+ Random random = new Random();
+
createTopicAndSendData(topic1);
createTopicAndSendData(topic2);
@@ -72,6 +75,8 @@ public class JavaKafkaRDDSuite implements Serializable {
kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress());
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
+ kafkaParams.put("group.id", "java-test-consumer-" + random.nextInt() +
+ "-" + System.currentTimeMillis());
OffsetRange[] offsetRanges = {
OffsetRange.create(topic1, 0, 0, 1),
@@ -96,14 +101,14 @@ public class JavaKafkaRDDSuite implements Serializable {
sc,
kafkaParams,
offsetRanges,
- PreferFixed.create(leaders)
+ LocationStrategies.PreferFixed(leaders)
).map(handler);
JavaRDD<String> rdd2 = KafkaUtils.<String, String>createRDD(
sc,
kafkaParams,
offsetRanges,
- PreferConsistent.create()
+ LocationStrategies.PreferConsistent()
).map(handler);
// just making sure the java user apis work; the scala tests handle logic corner cases
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java
index 7873c09e1a..41ccb0ebe7 100644
--- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java
@@ -41,18 +41,19 @@ public class JavaLocationStrategySuite implements Serializable {
JavaConverters.mapAsScalaMapConverter(hosts).asScala();
// make sure constructors can be called from java
- final LocationStrategy c1 = PreferConsistent.create();
- final LocationStrategy c2 = PreferConsistent$.MODULE$;
- Assert.assertEquals(c1, c2);
+ final LocationStrategy c1 = LocationStrategies.PreferConsistent();
+ final LocationStrategy c2 = LocationStrategies.PreferConsistent();
+ Assert.assertSame(c1, c2);
- final LocationStrategy c3 = PreferBrokers.create();
- final LocationStrategy c4 = PreferBrokers$.MODULE$;
- Assert.assertEquals(c3, c4);
+ final LocationStrategy c3 = LocationStrategies.PreferBrokers();
+ final LocationStrategy c4 = LocationStrategies.PreferBrokers();
+ Assert.assertSame(c3, c4);
- final LocationStrategy c5 = PreferFixed.create(hosts);
- final LocationStrategy c6 = PreferFixed.apply(sHosts);
- Assert.assertEquals(c5, c6);
+ Assert.assertNotSame(c1, c3);
+ final LocationStrategy c5 = LocationStrategies.PreferFixed(hosts);
+ final LocationStrategy c6 = LocationStrategies.PreferFixed(sHosts);
+ Assert.assertEquals(c5, c6);
}
}
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index 776d11ad2f..0a53259802 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.kafka010
import java.io.File
+import java.lang.{ Long => JLong }
import java.util.{ Arrays, HashMap => JHashMap, Map => JMap }
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.ConcurrentLinkedQueue
@@ -93,7 +94,7 @@ class DirectKafkaStreamSuite
kp
}
- val preferredHosts = PreferConsistent
+ val preferredHosts = LocationStrategies.PreferConsistent
test("basic stream receiving with multiple topics and smallest starting offset") {
val topics = List("basic1", "basic2", "basic3")
@@ -108,7 +109,9 @@ class DirectKafkaStreamSuite
ssc = new StreamingContext(sparkConf, Milliseconds(200))
val stream = withClue("Error creating direct stream") {
KafkaUtils.createDirectStream[String, String](
- ssc, preferredHosts, Subscribe[String, String](topics, kafkaParams.asScala))
+ ssc,
+ preferredHosts,
+ ConsumerStrategies.Subscribe[String, String](topics, kafkaParams.asScala))
}
val allReceived = new ConcurrentLinkedQueue[(String, String)]()
@@ -178,7 +181,9 @@ class DirectKafkaStreamSuite
ssc = new StreamingContext(sparkConf, Milliseconds(200))
val stream = withClue("Error creating direct stream") {
val s = new DirectKafkaInputDStream[String, String](
- ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala))
+ ssc,
+ preferredHosts,
+ ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala))
s.consumer.poll(0)
assert(
s.consumer.position(topicPartition) >= offsetBeforeStart,
@@ -225,8 +230,10 @@ class DirectKafkaStreamSuite
// Setup context and kafka stream with largest offset
ssc = new StreamingContext(sparkConf, Milliseconds(200))
val stream = withClue("Error creating direct stream") {
- val s = new DirectKafkaInputDStream[String, String](ssc, preferredHosts,
- Assign[String, String](
+ val s = new DirectKafkaInputDStream[String, String](
+ ssc,
+ preferredHosts,
+ ConsumerStrategies.Assign[String, String](
List(topicPartition),
kafkaParams.asScala,
Map(topicPartition -> 11L)))
@@ -267,7 +274,9 @@ class DirectKafkaStreamSuite
ssc = new StreamingContext(sparkConf, Milliseconds(100))
val kafkaStream = withClue("Error creating direct stream") {
KafkaUtils.createDirectStream[String, String](
- ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala))
+ ssc,
+ preferredHosts,
+ ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala))
}
val keyedStream = kafkaStream.map { r => "key" -> r.value.toInt }
val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
@@ -360,7 +369,9 @@ class DirectKafkaStreamSuite
ssc = new StreamingContext(sparkConf, Milliseconds(100))
withClue("Error creating direct stream") {
val kafkaStream = KafkaUtils.createDirectStream[String, String](
- ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala))
+ ssc,
+ preferredHosts,
+ ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala))
kafkaStream.foreachRDD { (rdd: RDD[ConsumerRecord[String, String]], time: Time) =>
val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val data = rdd.map(_.value).collect()
@@ -412,7 +423,9 @@ class DirectKafkaStreamSuite
val stream = withClue("Error creating direct stream") {
KafkaUtils.createDirectStream[String, String](
- ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala))
+ ssc,
+ preferredHosts,
+ ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala))
}
val allReceived = new ConcurrentLinkedQueue[(String, String)]
@@ -486,7 +499,9 @@ class DirectKafkaStreamSuite
val kafkaStream = withClue("Error creating direct stream") {
new DirectKafkaInputDStream[String, String](
- ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) {
+ ssc,
+ preferredHosts,
+ ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala)) {
override protected[streaming] val rateController =
Some(new DirectKafkaRateController(id, estimator))
}.map(r => (r.key, r.value))
@@ -552,7 +567,7 @@ class DirectKafkaStreamSuite
preferredHosts,
new ConsumerStrategy[String, String] {
def executorKafkaParams = ekp
- def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[String, String] = {
+ def onStart(currentOffsets: JMap[TopicPartition, JLong]): Consumer[String, String] = {
val consumer = new KafkaConsumer[String, String](kafkaParams)
val tps = List(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
consumer.assign(Arrays.asList(tps: _*))
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
index 3d2546ddd9..be373af059 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
@@ -62,7 +62,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
"group.id" -> s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}"
).asJava
- private val preferredHosts = PreferConsistent
+ private val preferredHosts = LocationStrategies.PreferConsistent
test("basic usage") {
val topic = s"topicbasic-${Random.nextInt}-${System.currentTimeMillis}"