From fbfd0ab9d70f557c38c7bb8e704475bf19adaf02 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Fri, 1 Jul 2016 00:53:36 -0700 Subject: [SPARK-12177][STREAMING][KAFKA] limit api surface area ## What changes were proposed in this pull request? This is an alternative to the refactoring proposed by https://github.com/apache/spark/pull/13996 ## How was this patch tested? unit tests also tested under scala 2.10 via mvn -Dscala-2.10 Author: cody koeninger Closes #13998 from koeninger/kafka-0-10-refactor. --- .../streaming/kafka010/ConsumerStrategy.scala | 187 ++++++++++----------- .../kafka010/DirectKafkaInputDStream.scala | 2 +- .../apache/spark/streaming/kafka010/KafkaRDD.scala | 4 +- .../spark/streaming/kafka010/KafkaTestUtils.scala | 5 +- .../spark/streaming/kafka010/KafkaUtils.scala | 36 ++-- .../streaming/kafka010/LocationStrategy.scala | 74 ++++---- .../apache/spark/streaming/kafka010/package.scala | 2 +- .../kafka010/JavaConsumerStrategySuite.java | 32 ++-- .../kafka010/JavaDirectKafkaStreamSuite.java | 8 +- .../streaming/kafka010/JavaKafkaRDDSuite.java | 9 +- .../kafka010/JavaLocationStrategySuite.java | 19 ++- .../kafka010/DirectKafkaStreamSuite.scala | 35 ++-- .../spark/streaming/kafka010/KafkaRDDSuite.scala | 2 +- 13 files changed, 222 insertions(+), 193 deletions(-) (limited to 'external/kafka-0-10') diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index 079a07dbc2..70c3f1a98d 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.kafka010 -import java.{ util => ju } +import java.{ lang => jl, util => ju } import scala.collection.JavaConverters._ @@ -30,15 +30,16 @@ import org.apache.spark.annotation.Experimental /** * :: Experimental :: * Choice of how to create and configure underlying Kafka Consumers on driver and executors. + * See [[ConsumerStrategies]] to obtain instances. * Kafka 0.10 consumers can require additional, sometimes complex, setup after object * instantiation. This interface encapsulates that process, and allows it to be checkpointed. * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ @Experimental -trait ConsumerStrategy[K, V] { +abstract class ConsumerStrategy[K, V] { /** - * Kafka + * Kafka * configuration parameters to be used on executors. Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ @@ -51,15 +52,14 @@ trait ConsumerStrategy[K, V] { * has successfully read. Will be empty on initial start, possibly non-empty on restart from * checkpoint. */ - def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] + def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] } /** - * :: Experimental :: * Subscribe to a collection of topics. * @param topics collection of topics to subscribe * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set @@ -68,16 +68,15 @@ trait ConsumerStrategy[K, V] { * TopicPartition, the committed offset (if applicable) or kafka param * auto.offset.reset will be used. */ -@Experimental -case class Subscribe[K, V] private( - topics: ju.Collection[java.lang.String], +private case class Subscribe[K, V]( + topics: ju.Collection[jl.String], kafkaParams: ju.Map[String, Object], - offsets: ju.Map[TopicPartition, Long] + offsets: ju.Map[TopicPartition, jl.Long] ) extends ConsumerStrategy[K, V] { def executorKafkaParams: ju.Map[String, Object] = kafkaParams - def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = { + def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = { val consumer = new KafkaConsumer[K, V](kafkaParams) consumer.subscribe(topics) if (currentOffsets.isEmpty) { @@ -90,18 +89,52 @@ case class Subscribe[K, V] private( } } +/** + * Assign a fixed collection of TopicPartitions + * @param topicPartitions collection of TopicPartitions to assign + * @param kafkaParams Kafka + * + * configuration parameters to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsets: offsets to begin at on initial startup. If no offset is given for a + * TopicPartition, the committed offset (if applicable) or kafka param + * auto.offset.reset will be used. + */ +private case class Assign[K, V]( + topicPartitions: ju.Collection[TopicPartition], + kafkaParams: ju.Map[String, Object], + offsets: ju.Map[TopicPartition, jl.Long] + ) extends ConsumerStrategy[K, V] { + + def executorKafkaParams: ju.Map[String, Object] = kafkaParams + + def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = { + val consumer = new KafkaConsumer[K, V](kafkaParams) + consumer.assign(topicPartitions) + if (currentOffsets.isEmpty) { + offsets.asScala.foreach { case (topicPartition, offset) => + consumer.seek(topicPartition, offset) + } + } + + consumer + } +} + /** * :: Experimental :: - * Companion object for creating [[Subscribe]] strategy + * object for obtaining instances of [[ConsumerStrategy]] */ @Experimental -object Subscribe { +object ConsumerStrategies { /** * :: Experimental :: * Subscribe to a collection of topics. * @param topics collection of topics to subscribe * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set @@ -111,14 +144,14 @@ object Subscribe { * auto.offset.reset will be used. */ @Experimental - def apply[K, V]( - topics: Iterable[java.lang.String], + def Subscribe[K, V]( + topics: Iterable[jl.String], kafkaParams: collection.Map[String, Object], - offsets: collection.Map[TopicPartition, Long]): Subscribe[K, V] = { - Subscribe[K, V]( + offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { + new Subscribe[K, V]( new ju.ArrayList(topics.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, Long](offsets.asJava)) + new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava)) } /** @@ -126,20 +159,20 @@ object Subscribe { * Subscribe to a collection of topics. * @param topics collection of topics to subscribe * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ @Experimental - def apply[K, V]( - topics: Iterable[java.lang.String], - kafkaParams: collection.Map[String, Object]): Subscribe[K, V] = { - Subscribe[K, V]( + def Subscribe[K, V]( + topics: Iterable[jl.String], + kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = { + new Subscribe[K, V]( new ju.ArrayList(topics.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), - ju.Collections.emptyMap[TopicPartition, Long]()) + ju.Collections.emptyMap[TopicPartition, jl.Long]()) } /** @@ -147,7 +180,7 @@ object Subscribe { * Subscribe to a collection of topics. * @param topics collection of topics to subscribe * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set @@ -157,11 +190,11 @@ object Subscribe { * auto.offset.reset will be used. */ @Experimental - def create[K, V]( - topics: ju.Collection[java.lang.String], + def Subscribe[K, V]( + topics: ju.Collection[jl.String], kafkaParams: ju.Map[String, Object], - offsets: ju.Map[TopicPartition, Long]): Subscribe[K, V] = { - Subscribe[K, V](topics, kafkaParams, offsets) + offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = { + new Subscribe[K, V](topics, kafkaParams, offsets) } /** @@ -169,69 +202,25 @@ object Subscribe { * Subscribe to a collection of topics. * @param topics collection of topics to subscribe * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ @Experimental - def create[K, V]( - topics: ju.Collection[java.lang.String], - kafkaParams: ju.Map[String, Object]): Subscribe[K, V] = { - Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]()) - } - -} - -/** - * :: Experimental :: - * Assign a fixed collection of TopicPartitions - * @param topicPartitions collection of TopicPartitions to assign - * @param kafkaParams Kafka - * - * configuration parameters to be used on driver. The same params will be used on executors, - * with minor automatic modifications applied. - * Requires "bootstrap.servers" to be set - * with Kafka broker(s) specified in host1:port1,host2:port2 form. - * @param offsets: offsets to begin at on initial startup. If no offset is given for a - * TopicPartition, the committed offset (if applicable) or kafka param - * auto.offset.reset will be used. - */ -@Experimental -case class Assign[K, V] private( - topicPartitions: ju.Collection[TopicPartition], - kafkaParams: ju.Map[String, Object], - offsets: ju.Map[TopicPartition, Long] - ) extends ConsumerStrategy[K, V] { - - def executorKafkaParams: ju.Map[String, Object] = kafkaParams - - def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = { - val consumer = new KafkaConsumer[K, V](kafkaParams) - consumer.assign(topicPartitions) - if (currentOffsets.isEmpty) { - offsets.asScala.foreach { case (topicPartition, offset) => - consumer.seek(topicPartition, offset) - } - } - - consumer + def Subscribe[K, V]( + topics: ju.Collection[jl.String], + kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = { + new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]()) } -} -/** - * :: Experimental :: - * Companion object for creating [[Assign]] strategy - */ -@Experimental -object Assign { /** * :: Experimental :: * Assign a fixed collection of TopicPartitions * @param topicPartitions collection of TopicPartitions to assign * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set @@ -241,14 +230,14 @@ object Assign { * auto.offset.reset will be used. */ @Experimental - def apply[K, V]( + def Assign[K, V]( topicPartitions: Iterable[TopicPartition], kafkaParams: collection.Map[String, Object], - offsets: collection.Map[TopicPartition, Long]): Assign[K, V] = { - Assign[K, V]( + offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { + new Assign[K, V]( new ju.ArrayList(topicPartitions.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, Long](offsets.asJava)) + new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava)) } /** @@ -256,20 +245,20 @@ object Assign { * Assign a fixed collection of TopicPartitions * @param topicPartitions collection of TopicPartitions to assign * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ @Experimental - def apply[K, V]( + def Assign[K, V]( topicPartitions: Iterable[TopicPartition], - kafkaParams: collection.Map[String, Object]): Assign[K, V] = { - Assign[K, V]( + kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = { + new Assign[K, V]( new ju.ArrayList(topicPartitions.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), - ju.Collections.emptyMap[TopicPartition, Long]()) + ju.Collections.emptyMap[TopicPartition, jl.Long]()) } /** @@ -277,7 +266,7 @@ object Assign { * Assign a fixed collection of TopicPartitions * @param topicPartitions collection of TopicPartitions to assign * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set @@ -287,11 +276,11 @@ object Assign { * auto.offset.reset will be used. */ @Experimental - def create[K, V]( + def Assign[K, V]( topicPartitions: ju.Collection[TopicPartition], kafkaParams: ju.Map[String, Object], - offsets: ju.Map[TopicPartition, Long]): Assign[K, V] = { - Assign[K, V](topicPartitions, kafkaParams, offsets) + offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = { + new Assign[K, V](topicPartitions, kafkaParams, offsets) } /** @@ -299,16 +288,20 @@ object Assign { * Assign a fixed collection of TopicPartitions * @param topicPartitions collection of TopicPartitions to assign * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ @Experimental - def create[K, V]( + def Assign[K, V]( topicPartitions: ju.Collection[TopicPartition], - kafkaParams: ju.Map[String, Object]): Assign[K, V] = { - Assign[K, V](topicPartitions, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]()) + kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = { + new Assign[K, V]( + topicPartitions, + kafkaParams, + ju.Collections.emptyMap[TopicPartition, jl.Long]()) } + } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index acd1841d53..13827f68f2 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -71,7 +71,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( @transient private var kc: Consumer[K, V] = null def consumer(): Consumer[K, V] = this.synchronized { if (null == kc) { - kc = consumerStrategy.onStart(currentOffsets) + kc = consumerStrategy.onStart(currentOffsets.mapValues(l => new java.lang.Long(l)).asJava) } kc } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index c15c163449..5b5a9ac48c 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -36,7 +36,7 @@ import org.apache.spark.storage.StorageLevel * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. * @param kafkaParams Kafka - * + * * configuration parameters. Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD @@ -66,7 +66,7 @@ private[spark] class KafkaRDD[K, V]( " must be set to false for executor kafka params, else offsets may commit before processing") // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time? - private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256) + private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512) private val cacheInitialCapacity = conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16) private val cacheMaxCapacity = diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index 13c08430db..19192e4b95 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -61,7 +61,7 @@ private[kafka010] class KafkaTestUtils extends Logging { // Kafka broker related configurations private val brokerHost = "localhost" - private var brokerPort = 9092 + private var brokerPort = 0 private var brokerConf: KafkaConfig = _ // Kafka broker server @@ -110,7 +110,8 @@ private[kafka010] class KafkaTestUtils extends Logging { brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) server = new KafkaServer(brokerConf) server.startup() - (server, port) + brokerPort = server.boundPort() + (server, brokerPort) }, new SparkConf(), "KafkaBroker") brokerReady = true diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala index c0524990bc..b2190bfa05 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala @@ -34,7 +34,7 @@ import org.apache.spark.streaming.dstream._ /** * :: Experimental :: - * Companion object for constructing Kafka streams and RDDs + * object for constructing Kafka streams and RDDs */ @Experimental object KafkaUtils extends Logging { @@ -44,12 +44,12 @@ object KafkaUtils extends Logging { * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. * @param kafkaParams Kafka - * + * * configuration parameters. Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD - * @param locationStrategy In most cases, pass in [[PreferConsistent]], - * see [[LocationStrategy]] for more details. + * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent, + * see [[LocationStrategies]] for more details. * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ @@ -83,12 +83,12 @@ object KafkaUtils extends Logging { * @param keyClass Class of the keys in the Kafka records * @param valueClass Class of the values in the Kafka records * @param kafkaParams Kafka - * + * * configuration parameters. Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD - * @param locationStrategy In most cases, pass in [[PreferConsistent]], - * see [[LocationStrategy]] for more details. + * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent, + * see [[LocationStrategies]] for more details. * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ @@ -110,10 +110,10 @@ object KafkaUtils extends Logging { * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number * of messages * per second that each '''partition''' will accept. - * @param locationStrategy In most cases, pass in [[PreferConsistent]], - * see [[LocationStrategy]] for more details. - * @param consumerStrategy In most cases, pass in [[Subscribe]], - * see [[ConsumerStrategy]] for more details + * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent, + * see [[LocationStrategies]] for more details. + * @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe, + * see [[ConsumerStrategies]] for more details * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ @@ -132,10 +132,10 @@ object KafkaUtils extends Logging { * each given Kafka topic/partition corresponds to an RDD partition. * @param keyClass Class of the keys in the Kafka records * @param valueClass Class of the values in the Kafka records - * @param locationStrategy In most cases, pass in [[PreferConsistent]], - * see [[LocationStrategy]] for more details. - * @param consumerStrategy In most cases, pass in [[Subscribe]], - * see [[ConsumerStrategy]] for more details + * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent, + * see [[LocationStrategies]] for more details. + * @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe, + * see [[ConsumerStrategies]] for more details * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ @@ -161,7 +161,11 @@ object KafkaUtils extends Logging { kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") // driver and executor should be in different consumer groups - val groupId = "spark-executor-" + kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) + val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) + if (null == originalGroupId) { + logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it") + } + val groupId = "spark-executor-" + originalGroupId logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}") kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala index df620300ea..c9a8a13f51 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala @@ -29,49 +29,57 @@ import org.apache.spark.annotation.Experimental /** * :: Experimental :: * Choice of how to schedule consumers for a given TopicPartition on an executor. + * See [[LocationStrategies]] to obtain instances. * Kafka 0.10 consumers prefetch messages, so it's important for performance * to keep cached consumers on appropriate executors, not recreate them for every partition. * Choice of location is only a preference, not an absolute; partitions may be scheduled elsewhere. */ @Experimental -sealed trait LocationStrategy +sealed abstract class LocationStrategy -/** - * :: Experimental :: - * Use this only if your executors are on the same nodes as your Kafka brokers. - */ -@Experimental -case object PreferBrokers extends LocationStrategy { - def create: PreferBrokers.type = this -} +private case object PreferBrokers extends LocationStrategy -/** - * :: Experimental :: - * Use this in most cases, it will consistently distribute partitions across all executors. - */ -@Experimental -case object PreferConsistent extends LocationStrategy { - def create: PreferConsistent.type = this -} +private case object PreferConsistent extends LocationStrategy -/** - * :: Experimental :: - * Use this to place particular TopicPartitions on particular hosts if your load is uneven. - * Any TopicPartition not specified in the map will use a consistent location. - */ -@Experimental -case class PreferFixed private(hostMap: ju.Map[TopicPartition, String]) extends LocationStrategy +private case class PreferFixed(hostMap: ju.Map[TopicPartition, String]) extends LocationStrategy /** - * :: Experimental :: - * Use this to place particular TopicPartitions on particular hosts if your load is uneven. - * Any TopicPartition not specified in the map will use a consistent location. + * :: Experimental :: object to obtain instances of [[LocationStrategy]] + * */ @Experimental -object PreferFixed { - def apply(hostMap: collection.Map[TopicPartition, String]): PreferFixed = { - PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava)) - } - def create(hostMap: ju.Map[TopicPartition, String]): PreferFixed = - PreferFixed(hostMap) +object LocationStrategies { + /** + * :: Experimental :: + * Use this only if your executors are on the same nodes as your Kafka brokers. + */ + @Experimental + def PreferBrokers: LocationStrategy = + org.apache.spark.streaming.kafka010.PreferBrokers + + /** + * :: Experimental :: + * Use this in most cases, it will consistently distribute partitions across all executors. + */ + @Experimental + def PreferConsistent: LocationStrategy = + org.apache.spark.streaming.kafka010.PreferConsistent + + /** + * :: Experimental :: + * Use this to place particular TopicPartitions on particular hosts if your load is uneven. + * Any TopicPartition not specified in the map will use a consistent location. + */ + @Experimental + def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy = + new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava)) + + /** + * :: Experimental :: + * Use this to place particular TopicPartitions on particular hosts if your load is uneven. + * Any TopicPartition not specified in the map will use a consistent location. + */ + @Experimental + def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy = + new PreferFixed(hostMap) } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala index 2bfc1e84d7..09db6d6062 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala @@ -20,4 +20,4 @@ package org.apache.spark.streaming /** * Spark Integration for Kafka 0.10 */ -package object kafka +package object kafka010 //scalastyle:ignore diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java index 8d7c05b5a6..ac8d64b180 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java @@ -44,37 +44,39 @@ public class JavaConsumerStrategySuite implements Serializable { kafkaParams.put("bootstrap.servers", "not used"); final scala.collection.Map sKafkaParams = JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(); - final Map offsets = new HashMap<>(); + final Map offsets = new HashMap<>(); offsets.put(tp1, 23L); final scala.collection.Map sOffsets = - JavaConverters.mapAsScalaMapConverter(offsets).asScala(); + JavaConverters.mapAsScalaMapConverter(offsets).asScala().mapValues( + new scala.runtime.AbstractFunction1() { + @Override + public Object apply(Long x) { + return (Object) x; + } + } + ); - // make sure constructors can be called from java - // final ConsumerStrategy sub0 = // does not compile in Scala 2.10 - // Subscribe.apply(topics, kafkaParams, offsets); final ConsumerStrategy sub1 = - Subscribe.apply(sTopics, sKafkaParams, sOffsets); + ConsumerStrategies.Subscribe(sTopics, sKafkaParams, sOffsets); final ConsumerStrategy sub2 = - Subscribe.apply(sTopics, sKafkaParams); + ConsumerStrategies.Subscribe(sTopics, sKafkaParams); final ConsumerStrategy sub3 = - Subscribe.create(topics, kafkaParams, offsets); + ConsumerStrategies.Subscribe(topics, kafkaParams, offsets); final ConsumerStrategy sub4 = - Subscribe.create(topics, kafkaParams); + ConsumerStrategies.Subscribe(topics, kafkaParams); Assert.assertEquals( sub1.executorKafkaParams().get("bootstrap.servers"), sub3.executorKafkaParams().get("bootstrap.servers")); - // final ConsumerStrategy asn0 = // does not compile in Scala 2.10 - // Assign.apply(parts, kafkaParams, offsets); final ConsumerStrategy asn1 = - Assign.apply(sParts, sKafkaParams, sOffsets); + ConsumerStrategies.Assign(sParts, sKafkaParams, sOffsets); final ConsumerStrategy asn2 = - Assign.apply(sParts, sKafkaParams); + ConsumerStrategies.Assign(sParts, sKafkaParams); final ConsumerStrategy asn3 = - Assign.create(parts, kafkaParams, offsets); + ConsumerStrategies.Assign(parts, kafkaParams, offsets); final ConsumerStrategy asn4 = - Assign.create(parts, kafkaParams); + ConsumerStrategies.Assign(parts, kafkaParams); Assert.assertEquals( asn1.executorKafkaParams().get("bootstrap.servers"), diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java index e57ede7afa..dc9c13ba86 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java @@ -90,8 +90,8 @@ public class JavaDirectKafkaStreamSuite implements Serializable { JavaInputDStream> istream1 = KafkaUtils.createDirectStream( ssc, - PreferConsistent.create(), - Subscribe.create(Arrays.asList(topic1), kafkaParams) + LocationStrategies.PreferConsistent(), + ConsumerStrategies.Subscribe(Arrays.asList(topic1), kafkaParams) ); JavaDStream stream1 = istream1.transform( @@ -123,8 +123,8 @@ public class JavaDirectKafkaStreamSuite implements Serializable { JavaInputDStream> istream2 = KafkaUtils.createDirectStream( ssc, - PreferConsistent.create(), - Subscribe.create(Arrays.asList(topic2), kafkaParams2) + LocationStrategies.PreferConsistent(), + ConsumerStrategies.Subscribe(Arrays.asList(topic2), kafkaParams2) ); JavaDStream stream2 = istream2.transform( diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java index 548ba134dc..87bfe1514e 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java @@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka010; import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import java.util.Random; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.TopicPartition; @@ -65,6 +66,8 @@ public class JavaKafkaRDDSuite implements Serializable { String topic1 = "topic1"; String topic2 = "topic2"; + Random random = new Random(); + createTopicAndSendData(topic1); createTopicAndSendData(topic2); @@ -72,6 +75,8 @@ public class JavaKafkaRDDSuite implements Serializable { kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress()); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); + kafkaParams.put("group.id", "java-test-consumer-" + random.nextInt() + + "-" + System.currentTimeMillis()); OffsetRange[] offsetRanges = { OffsetRange.create(topic1, 0, 0, 1), @@ -96,14 +101,14 @@ public class JavaKafkaRDDSuite implements Serializable { sc, kafkaParams, offsetRanges, - PreferFixed.create(leaders) + LocationStrategies.PreferFixed(leaders) ).map(handler); JavaRDD rdd2 = KafkaUtils.createRDD( sc, kafkaParams, offsetRanges, - PreferConsistent.create() + LocationStrategies.PreferConsistent() ).map(handler); // just making sure the java user apis work; the scala tests handle logic corner cases diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java index 7873c09e1a..41ccb0ebe7 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java @@ -41,18 +41,19 @@ public class JavaLocationStrategySuite implements Serializable { JavaConverters.mapAsScalaMapConverter(hosts).asScala(); // make sure constructors can be called from java - final LocationStrategy c1 = PreferConsistent.create(); - final LocationStrategy c2 = PreferConsistent$.MODULE$; - Assert.assertEquals(c1, c2); + final LocationStrategy c1 = LocationStrategies.PreferConsistent(); + final LocationStrategy c2 = LocationStrategies.PreferConsistent(); + Assert.assertSame(c1, c2); - final LocationStrategy c3 = PreferBrokers.create(); - final LocationStrategy c4 = PreferBrokers$.MODULE$; - Assert.assertEquals(c3, c4); + final LocationStrategy c3 = LocationStrategies.PreferBrokers(); + final LocationStrategy c4 = LocationStrategies.PreferBrokers(); + Assert.assertSame(c3, c4); - final LocationStrategy c5 = PreferFixed.create(hosts); - final LocationStrategy c6 = PreferFixed.apply(sHosts); - Assert.assertEquals(c5, c6); + Assert.assertNotSame(c1, c3); + final LocationStrategy c5 = LocationStrategies.PreferFixed(hosts); + final LocationStrategy c6 = LocationStrategies.PreferFixed(sHosts); + Assert.assertEquals(c5, c6); } } diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 776d11ad2f..0a53259802 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.kafka010 import java.io.File +import java.lang.{ Long => JLong } import java.util.{ Arrays, HashMap => JHashMap, Map => JMap } import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.ConcurrentLinkedQueue @@ -93,7 +94,7 @@ class DirectKafkaStreamSuite kp } - val preferredHosts = PreferConsistent + val preferredHosts = LocationStrategies.PreferConsistent test("basic stream receiving with multiple topics and smallest starting offset") { val topics = List("basic1", "basic2", "basic3") @@ -108,7 +109,9 @@ class DirectKafkaStreamSuite ssc = new StreamingContext(sparkConf, Milliseconds(200)) val stream = withClue("Error creating direct stream") { KafkaUtils.createDirectStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](topics, kafkaParams.asScala)) + ssc, + preferredHosts, + ConsumerStrategies.Subscribe[String, String](topics, kafkaParams.asScala)) } val allReceived = new ConcurrentLinkedQueue[(String, String)]() @@ -178,7 +181,9 @@ class DirectKafkaStreamSuite ssc = new StreamingContext(sparkConf, Milliseconds(200)) val stream = withClue("Error creating direct stream") { val s = new DirectKafkaInputDStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) + ssc, + preferredHosts, + ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala)) s.consumer.poll(0) assert( s.consumer.position(topicPartition) >= offsetBeforeStart, @@ -225,8 +230,10 @@ class DirectKafkaStreamSuite // Setup context and kafka stream with largest offset ssc = new StreamingContext(sparkConf, Milliseconds(200)) val stream = withClue("Error creating direct stream") { - val s = new DirectKafkaInputDStream[String, String](ssc, preferredHosts, - Assign[String, String]( + val s = new DirectKafkaInputDStream[String, String]( + ssc, + preferredHosts, + ConsumerStrategies.Assign[String, String]( List(topicPartition), kafkaParams.asScala, Map(topicPartition -> 11L))) @@ -267,7 +274,9 @@ class DirectKafkaStreamSuite ssc = new StreamingContext(sparkConf, Milliseconds(100)) val kafkaStream = withClue("Error creating direct stream") { KafkaUtils.createDirectStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) + ssc, + preferredHosts, + ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala)) } val keyedStream = kafkaStream.map { r => "key" -> r.value.toInt } val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) => @@ -360,7 +369,9 @@ class DirectKafkaStreamSuite ssc = new StreamingContext(sparkConf, Milliseconds(100)) withClue("Error creating direct stream") { val kafkaStream = KafkaUtils.createDirectStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) + ssc, + preferredHosts, + ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala)) kafkaStream.foreachRDD { (rdd: RDD[ConsumerRecord[String, String]], time: Time) => val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val data = rdd.map(_.value).collect() @@ -412,7 +423,9 @@ class DirectKafkaStreamSuite val stream = withClue("Error creating direct stream") { KafkaUtils.createDirectStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) + ssc, + preferredHosts, + ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala)) } val allReceived = new ConcurrentLinkedQueue[(String, String)] @@ -486,7 +499,9 @@ class DirectKafkaStreamSuite val kafkaStream = withClue("Error creating direct stream") { new DirectKafkaInputDStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) { + ssc, + preferredHosts, + ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala)) { override protected[streaming] val rateController = Some(new DirectKafkaRateController(id, estimator)) }.map(r => (r.key, r.value)) @@ -552,7 +567,7 @@ class DirectKafkaStreamSuite preferredHosts, new ConsumerStrategy[String, String] { def executorKafkaParams = ekp - def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[String, String] = { + def onStart(currentOffsets: JMap[TopicPartition, JLong]): Consumer[String, String] = { val consumer = new KafkaConsumer[String, String](kafkaParams) val tps = List(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) consumer.assign(Arrays.asList(tps: _*)) diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index 3d2546ddd9..be373af059 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -62,7 +62,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { "group.id" -> s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}" ).asJava - private val preferredHosts = PreferConsistent + private val preferredHosts = LocationStrategies.PreferConsistent test("basic usage") { val topic = s"topicbasic-${Random.nextInt}-${System.currentTimeMillis}" -- cgit v1.2.3