aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10/src/main/scala')
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala187
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala2
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala4
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala5
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala36
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala74
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala2
7 files changed, 158 insertions, 152 deletions
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index 079a07dbc2..70c3f1a98d 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.kafka010
-import java.{ util => ju }
+import java.{ lang => jl, util => ju }
import scala.collection.JavaConverters._
@@ -30,15 +30,16 @@ import org.apache.spark.annotation.Experimental
/**
* :: Experimental ::
* Choice of how to create and configure underlying Kafka Consumers on driver and executors.
+ * See [[ConsumerStrategies]] to obtain instances.
* Kafka 0.10 consumers can require additional, sometimes complex, setup after object
* instantiation. This interface encapsulates that process, and allows it to be checkpointed.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
-trait ConsumerStrategy[K, V] {
+abstract class ConsumerStrategy[K, V] {
/**
- * Kafka <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * Kafka <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on executors. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@@ -51,15 +52,14 @@ trait ConsumerStrategy[K, V] {
* has successfully read. Will be empty on initial start, possibly non-empty on restart from
* checkpoint.
*/
- def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
+ def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V]
}
/**
- * :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
@@ -68,16 +68,15 @@ trait ConsumerStrategy[K, V] {
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
-@Experimental
-case class Subscribe[K, V] private(
- topics: ju.Collection[java.lang.String],
+private case class Subscribe[K, V](
+ topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object],
- offsets: ju.Map[TopicPartition, Long]
+ offsets: ju.Map[TopicPartition, jl.Long]
) extends ConsumerStrategy[K, V] {
def executorKafkaParams: ju.Map[String, Object] = kafkaParams
- def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
+ def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.subscribe(topics)
if (currentOffsets.isEmpty) {
@@ -91,17 +90,51 @@ case class Subscribe[K, V] private(
}
/**
+ * Assign a fixed collection of TopicPartitions
+ * @param topicPartitions collection of TopicPartitions to assign
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ * Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsets: offsets to begin at on initial startup. If no offset is given for a
+ * TopicPartition, the committed offset (if applicable) or kafka param
+ * auto.offset.reset will be used.
+ */
+private case class Assign[K, V](
+ topicPartitions: ju.Collection[TopicPartition],
+ kafkaParams: ju.Map[String, Object],
+ offsets: ju.Map[TopicPartition, jl.Long]
+ ) extends ConsumerStrategy[K, V] {
+
+ def executorKafkaParams: ju.Map[String, Object] = kafkaParams
+
+ def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
+ val consumer = new KafkaConsumer[K, V](kafkaParams)
+ consumer.assign(topicPartitions)
+ if (currentOffsets.isEmpty) {
+ offsets.asScala.foreach { case (topicPartition, offset) =>
+ consumer.seek(topicPartition, offset)
+ }
+ }
+
+ consumer
+ }
+}
+
+/**
* :: Experimental ::
- * Companion object for creating [[Subscribe]] strategy
+ * object for obtaining instances of [[ConsumerStrategy]]
*/
@Experimental
-object Subscribe {
+object ConsumerStrategies {
/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
@@ -111,14 +144,14 @@ object Subscribe {
* auto.offset.reset will be used.
*/
@Experimental
- def apply[K, V](
- topics: Iterable[java.lang.String],
+ def Subscribe[K, V](
+ topics: Iterable[jl.String],
kafkaParams: collection.Map[String, Object],
- offsets: collection.Map[TopicPartition, Long]): Subscribe[K, V] = {
- Subscribe[K, V](
+ offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
+ new Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
- new ju.HashMap[TopicPartition, Long](offsets.asJava))
+ new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
}
/**
@@ -126,20 +159,20 @@ object Subscribe {
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
- def apply[K, V](
- topics: Iterable[java.lang.String],
- kafkaParams: collection.Map[String, Object]): Subscribe[K, V] = {
- Subscribe[K, V](
+ def Subscribe[K, V](
+ topics: Iterable[jl.String],
+ kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
+ new Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
- ju.Collections.emptyMap[TopicPartition, Long]())
+ ju.Collections.emptyMap[TopicPartition, jl.Long]())
}
/**
@@ -147,7 +180,7 @@ object Subscribe {
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
@@ -157,11 +190,11 @@ object Subscribe {
* auto.offset.reset will be used.
*/
@Experimental
- def create[K, V](
- topics: ju.Collection[java.lang.String],
+ def Subscribe[K, V](
+ topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object],
- offsets: ju.Map[TopicPartition, Long]): Subscribe[K, V] = {
- Subscribe[K, V](topics, kafkaParams, offsets)
+ offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
+ new Subscribe[K, V](topics, kafkaParams, offsets)
}
/**
@@ -169,69 +202,25 @@ object Subscribe {
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
- def create[K, V](
- topics: ju.Collection[java.lang.String],
- kafkaParams: ju.Map[String, Object]): Subscribe[K, V] = {
- Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
- }
-
-}
-
-/**
- * :: Experimental ::
- * Assign a fixed collection of TopicPartitions
- * @param topicPartitions collection of TopicPartitions to assign
- * @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
- * configuration parameters</a> to be used on driver. The same params will be used on executors,
- * with minor automatic modifications applied.
- * Requires "bootstrap.servers" to be set
- * with Kafka broker(s) specified in host1:port1,host2:port2 form.
- * @param offsets: offsets to begin at on initial startup. If no offset is given for a
- * TopicPartition, the committed offset (if applicable) or kafka param
- * auto.offset.reset will be used.
- */
-@Experimental
-case class Assign[K, V] private(
- topicPartitions: ju.Collection[TopicPartition],
- kafkaParams: ju.Map[String, Object],
- offsets: ju.Map[TopicPartition, Long]
- ) extends ConsumerStrategy[K, V] {
-
- def executorKafkaParams: ju.Map[String, Object] = kafkaParams
-
- def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
- val consumer = new KafkaConsumer[K, V](kafkaParams)
- consumer.assign(topicPartitions)
- if (currentOffsets.isEmpty) {
- offsets.asScala.foreach { case (topicPartition, offset) =>
- consumer.seek(topicPartition, offset)
- }
- }
-
- consumer
+ def Subscribe[K, V](
+ topics: ju.Collection[jl.String],
+ kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
+ new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]())
}
-}
-/**
- * :: Experimental ::
- * Companion object for creating [[Assign]] strategy
- */
-@Experimental
-object Assign {
/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
@@ -241,14 +230,14 @@ object Assign {
* auto.offset.reset will be used.
*/
@Experimental
- def apply[K, V](
+ def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object],
- offsets: collection.Map[TopicPartition, Long]): Assign[K, V] = {
- Assign[K, V](
+ offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
+ new Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
- new ju.HashMap[TopicPartition, Long](offsets.asJava))
+ new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
}
/**
@@ -256,20 +245,20 @@ object Assign {
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
- def apply[K, V](
+ def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
- kafkaParams: collection.Map[String, Object]): Assign[K, V] = {
- Assign[K, V](
+ kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
+ new Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
- ju.Collections.emptyMap[TopicPartition, Long]())
+ ju.Collections.emptyMap[TopicPartition, jl.Long]())
}
/**
@@ -277,7 +266,7 @@ object Assign {
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
@@ -287,11 +276,11 @@ object Assign {
* auto.offset.reset will be used.
*/
@Experimental
- def create[K, V](
+ def Assign[K, V](
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object],
- offsets: ju.Map[TopicPartition, Long]): Assign[K, V] = {
- Assign[K, V](topicPartitions, kafkaParams, offsets)
+ offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
+ new Assign[K, V](topicPartitions, kafkaParams, offsets)
}
/**
@@ -299,16 +288,20 @@ object Assign {
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
- def create[K, V](
+ def Assign[K, V](
topicPartitions: ju.Collection[TopicPartition],
- kafkaParams: ju.Map[String, Object]): Assign[K, V] = {
- Assign[K, V](topicPartitions, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
+ kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
+ new Assign[K, V](
+ topicPartitions,
+ kafkaParams,
+ ju.Collections.emptyMap[TopicPartition, jl.Long]())
}
+
}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index acd1841d53..13827f68f2 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -71,7 +71,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
@transient private var kc: Consumer[K, V] = null
def consumer(): Consumer[K, V] = this.synchronized {
if (null == kc) {
- kc = consumerStrategy.onStart(currentOffsets)
+ kc = consumerStrategy.onStart(currentOffsets.mapValues(l => new java.lang.Long(l)).asJava)
}
kc
}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index c15c163449..5b5a9ac48c 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -36,7 +36,7 @@ import org.apache.spark.storage.StorageLevel
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a>. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
@@ -66,7 +66,7 @@ private[spark] class KafkaRDD[K, V](
" must be set to false for executor kafka params, else offsets may commit before processing")
// TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
- private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
+ private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512)
private val cacheInitialCapacity =
conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
private val cacheMaxCapacity =
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index 13c08430db..19192e4b95 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -61,7 +61,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
// Kafka broker related configurations
private val brokerHost = "localhost"
- private var brokerPort = 9092
+ private var brokerPort = 0
private var brokerConf: KafkaConfig = _
// Kafka broker server
@@ -110,7 +110,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
server = new KafkaServer(brokerConf)
server.startup()
- (server, port)
+ brokerPort = server.boundPort()
+ (server, brokerPort)
}, new SparkConf(), "KafkaBroker")
brokerReady = true
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
index c0524990bc..b2190bfa05 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
@@ -34,7 +34,7 @@ import org.apache.spark.streaming.dstream._
/**
* :: Experimental ::
- * Companion object for constructing Kafka streams and RDDs
+ * object for constructing Kafka streams and RDDs
*/
@Experimental
object KafkaUtils extends Logging {
@@ -44,12 +44,12 @@ object KafkaUtils extends Logging {
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a>. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
- * @param locationStrategy In most cases, pass in [[PreferConsistent]],
- * see [[LocationStrategy]] for more details.
+ * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
+ * see [[LocationStrategies]] for more details.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@@ -83,12 +83,12 @@ object KafkaUtils extends Logging {
* @param keyClass Class of the keys in the Kafka records
* @param valueClass Class of the values in the Kafka records
* @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a>. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
- * @param locationStrategy In most cases, pass in [[PreferConsistent]],
- * see [[LocationStrategy]] for more details.
+ * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
+ * see [[LocationStrategies]] for more details.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@@ -110,10 +110,10 @@ object KafkaUtils extends Logging {
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
* of messages
* per second that each '''partition''' will accept.
- * @param locationStrategy In most cases, pass in [[PreferConsistent]],
- * see [[LocationStrategy]] for more details.
- * @param consumerStrategy In most cases, pass in [[Subscribe]],
- * see [[ConsumerStrategy]] for more details
+ * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
+ * see [[LocationStrategies]] for more details.
+ * @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe,
+ * see [[ConsumerStrategies]] for more details
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@@ -132,10 +132,10 @@ object KafkaUtils extends Logging {
* each given Kafka topic/partition corresponds to an RDD partition.
* @param keyClass Class of the keys in the Kafka records
* @param valueClass Class of the values in the Kafka records
- * @param locationStrategy In most cases, pass in [[PreferConsistent]],
- * see [[LocationStrategy]] for more details.
- * @param consumerStrategy In most cases, pass in [[Subscribe]],
- * see [[ConsumerStrategy]] for more details
+ * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
+ * see [[LocationStrategies]] for more details.
+ * @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe,
+ * see [[ConsumerStrategies]] for more details
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@@ -161,7 +161,11 @@ object KafkaUtils extends Logging {
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
// driver and executor should be in different consumer groups
- val groupId = "spark-executor-" + kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+ val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+ if (null == originalGroupId) {
+ logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")
+ }
+ val groupId = "spark-executor-" + originalGroupId
logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
index df620300ea..c9a8a13f51 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
@@ -29,49 +29,57 @@ import org.apache.spark.annotation.Experimental
/**
* :: Experimental ::
* Choice of how to schedule consumers for a given TopicPartition on an executor.
+ * See [[LocationStrategies]] to obtain instances.
* Kafka 0.10 consumers prefetch messages, so it's important for performance
* to keep cached consumers on appropriate executors, not recreate them for every partition.
* Choice of location is only a preference, not an absolute; partitions may be scheduled elsewhere.
*/
@Experimental
-sealed trait LocationStrategy
+sealed abstract class LocationStrategy
-/**
- * :: Experimental ::
- * Use this only if your executors are on the same nodes as your Kafka brokers.
- */
-@Experimental
-case object PreferBrokers extends LocationStrategy {
- def create: PreferBrokers.type = this
-}
+private case object PreferBrokers extends LocationStrategy
-/**
- * :: Experimental ::
- * Use this in most cases, it will consistently distribute partitions across all executors.
- */
-@Experimental
-case object PreferConsistent extends LocationStrategy {
- def create: PreferConsistent.type = this
-}
+private case object PreferConsistent extends LocationStrategy
-/**
- * :: Experimental ::
- * Use this to place particular TopicPartitions on particular hosts if your load is uneven.
- * Any TopicPartition not specified in the map will use a consistent location.
- */
-@Experimental
-case class PreferFixed private(hostMap: ju.Map[TopicPartition, String]) extends LocationStrategy
+private case class PreferFixed(hostMap: ju.Map[TopicPartition, String]) extends LocationStrategy
/**
- * :: Experimental ::
- * Use this to place particular TopicPartitions on particular hosts if your load is uneven.
- * Any TopicPartition not specified in the map will use a consistent location.
+ * :: Experimental :: object to obtain instances of [[LocationStrategy]]
+ *
*/
@Experimental
-object PreferFixed {
- def apply(hostMap: collection.Map[TopicPartition, String]): PreferFixed = {
- PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
- }
- def create(hostMap: ju.Map[TopicPartition, String]): PreferFixed =
- PreferFixed(hostMap)
+object LocationStrategies {
+ /**
+ * :: Experimental ::
+ * Use this only if your executors are on the same nodes as your Kafka brokers.
+ */
+ @Experimental
+ def PreferBrokers: LocationStrategy =
+ org.apache.spark.streaming.kafka010.PreferBrokers
+
+ /**
+ * :: Experimental ::
+ * Use this in most cases, it will consistently distribute partitions across all executors.
+ */
+ @Experimental
+ def PreferConsistent: LocationStrategy =
+ org.apache.spark.streaming.kafka010.PreferConsistent
+
+ /**
+ * :: Experimental ::
+ * Use this to place particular TopicPartitions on particular hosts if your load is uneven.
+ * Any TopicPartition not specified in the map will use a consistent location.
+ */
+ @Experimental
+ def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy =
+ new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
+
+ /**
+ * :: Experimental ::
+ * Use this to place particular TopicPartitions on particular hosts if your load is uneven.
+ * Any TopicPartition not specified in the map will use a consistent location.
+ */
+ @Experimental
+ def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy =
+ new PreferFixed(hostMap)
}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
index 2bfc1e84d7..09db6d6062 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
@@ -20,4 +20,4 @@ package org.apache.spark.streaming
/**
* Spark Integration for Kafka 0.10
*/
-package object kafka
+package object kafka010 //scalastyle:ignore