aboutsummaryrefslogtreecommitdiff
path: root/external/kafka/src/main
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2015-02-11 00:13:27 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-02-11 00:13:27 -0800
commit658687b25491047f30ee8558733d11e5a0572070 (patch)
tree9f4d67993bcdf9b331a65a917d79b487bbf4cf7b /external/kafka/src/main
parentc2131c0cdc57a4871ea23cd71e27e066d3c9a42c (diff)
downloadspark-658687b25491047f30ee8558733d11e5a0572070.tar.gz
spark-658687b25491047f30ee8558733d11e5a0572070.tar.bz2
spark-658687b25491047f30ee8558733d11e5a0572070.zip
[SPARK-4964] [Streaming] refactor createRDD to take leaders via map instead of array
Author: cody koeninger <cody@koeninger.org> Closes #4511 from koeninger/kafkaRdd-leader-to-broker and squashes the following commits: f7151d4 [cody koeninger] [SPARK-4964] test refactoring 6f8680b [cody koeninger] [SPARK-4964] add test of the scala api for KafkaUtils.createRDD f81e016 [cody koeninger] [SPARK-4964] leave KafkaStreamSuite host and port as private 5173f3f [cody koeninger] [SPARK-4964] test the Java variations of createRDD e9cece4 [cody koeninger] [SPARK-4964] pass leaders as a map to ensure 1 leader per TopicPartition
Diffstat (limited to 'external/kafka/src/main')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala (renamed from external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala)57
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala44
2 files changed, 64 insertions, 37 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
index c129a26836..5a74febb4b 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
@@ -17,41 +17,52 @@
package org.apache.spark.streaming.kafka
-import kafka.common.TopicAndPartition
-
import org.apache.spark.annotation.Experimental
/**
* :: Experimental ::
- * Represent the host info for the leader of a Kafka partition.
+ * Represent the host and port info for a Kafka broker.
+ * Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID
*/
@Experimental
-final class Leader private(
- /** Kafka topic name */
- val topic: String,
- /** Kafka partition id */
- val partition: Int,
- /** Leader's hostname */
+final class Broker private(
+ /** Broker's hostname */
val host: String,
- /** Leader's port */
- val port: Int) extends Serializable
+ /** Broker's port */
+ val port: Int) extends Serializable {
+ override def equals(obj: Any): Boolean = obj match {
+ case that: Broker =>
+ this.host == that.host &&
+ this.port == that.port
+ case _ => false
+ }
+
+ override def hashCode: Int = {
+ 41 * (41 + host.hashCode) + port
+ }
+
+ override def toString(): String = {
+ s"Broker($host, $port)"
+ }
+}
/**
* :: Experimental ::
- * Companion object the provides methods to create instances of [[Leader]].
+ * Companion object that provides methods to create instances of [[Broker]].
*/
@Experimental
-object Leader {
- def create(topic: String, partition: Int, host: String, port: Int): Leader =
- new Leader(topic, partition, host, port)
-
- def create(topicAndPartition: TopicAndPartition, host: String, port: Int): Leader =
- new Leader(topicAndPartition.topic, topicAndPartition.partition, host, port)
-
- def apply(topic: String, partition: Int, host: String, port: Int): Leader =
- new Leader(topic, partition, host, port)
+object Broker {
+ def create(host: String, port: Int): Broker =
+ new Broker(host, port)
- def apply(topicAndPartition: TopicAndPartition, host: String, port: Int): Leader =
- new Leader(topicAndPartition.topic, topicAndPartition.partition, host, port)
+ def apply(host: String, port: Int): Broker =
+ new Broker(host, port)
+ def unapply(broker: Broker): Option[(String, Int)] = {
+ if (broker == null) {
+ None
+ } else {
+ Some((broker.host, broker.port))
+ }
+ }
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 7a2c3abdcc..af04bc6576 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -154,6 +154,19 @@ object KafkaUtils {
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}
+ /** get leaders for the given offset ranges, or throw an exception */
+ private def leadersForRanges(
+ kafkaParams: Map[String, String],
+ offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, Int)] = {
+ val kc = new KafkaCluster(kafkaParams)
+ val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet
+ val leaders = kc.findLeaders(topics).fold(
+ errs => throw new SparkException(errs.mkString("\n")),
+ ok => ok
+ )
+ leaders
+ }
+
/**
* Create a RDD from Kafka using offset ranges for each topic and partition.
*
@@ -176,12 +189,7 @@ object KafkaUtils {
offsetRanges: Array[OffsetRange]
): RDD[(K, V)] = {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
- val kc = new KafkaCluster(kafkaParams)
- val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet
- val leaders = kc.findLeaders(topics).fold(
- errs => throw new SparkException(errs.mkString("\n")),
- ok => ok
- )
+ val leaders = leadersForRanges(kafkaParams, offsetRanges)
new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
}
@@ -198,7 +206,8 @@ object KafkaUtils {
* host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
- * @param leaders Kafka leaders for each offset range in batch
+ * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
+ * in which case leaders will be looked up on the driver.
* @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
@@ -211,12 +220,17 @@ object KafkaUtils {
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange],
- leaders: Array[Leader],
+ leaders: Map[TopicAndPartition, Broker],
messageHandler: MessageAndMetadata[K, V] => R
): RDD[R] = {
- val leaderMap = leaders
- .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
- .toMap
+ val leaderMap = if (leaders.isEmpty) {
+ leadersForRanges(kafkaParams, offsetRanges)
+ } else {
+ // This could be avoided by refactoring KafkaRDD.leaders and KafkaCluster to use Broker
+ leaders.map {
+ case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
+ }.toMap
+ }
new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
}
@@ -263,7 +277,8 @@ object KafkaUtils {
* host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
- * @param leaders Kafka leaders for each offset range in batch
+ * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
+ * in which case leaders will be looked up on the driver.
* @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
@@ -276,7 +291,7 @@ object KafkaUtils {
recordClass: Class[R],
kafkaParams: JMap[String, String],
offsetRanges: Array[OffsetRange],
- leaders: Array[Leader],
+ leaders: JMap[TopicAndPartition, Broker],
messageHandler: JFunction[MessageAndMetadata[K, V], R]
): JavaRDD[R] = {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
@@ -284,8 +299,9 @@ object KafkaUtils {
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+ val leaderMap = Map(leaders.toSeq: _*)
createRDD[K, V, KD, VD, R](
- jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _)
+ jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaderMap, messageHandler.call _)
}
/**