aboutsummaryrefslogtreecommitdiff
path: root/external/kafka/src/main
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-02-09 22:45:48 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-02-09 22:45:48 -0800
commitc15134632e74e3dee05eda20c6ef79915e15d02e (patch)
tree4c97e1c6b7951d97950a7ff45c43b79d60733ede /external/kafka/src/main
parentef2f55b97f58fa06acb30e9e0172fb66fba383bc (diff)
downloadspark-c15134632e74e3dee05eda20c6ef79915e15d02e.tar.gz
spark-c15134632e74e3dee05eda20c6ef79915e15d02e.tar.bz2
spark-c15134632e74e3dee05eda20c6ef79915e15d02e.zip
[SPARK-4964][Streaming][Kafka] More updates to Exactly-once Kafka stream
Changes - Added example - Added a critical unit test that verifies that offset ranges can be recovered through checkpoints Might add more changes. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #4384 from tdas/new-kafka-fixes and squashes the following commits: 7c931c3 [Tathagata Das] Small update 3ed9284 [Tathagata Das] updated scala doc 83d0402 [Tathagata Das] Added JavaDirectKafkaWordCount example. 26df23c [Tathagata Das] Updates based on PR comments from Cody e4abf69 [Tathagata Das] Scala doc improvements and stuff. bb65232 [Tathagata Das] Fixed test bug and refactored KafkaStreamSuite 50f2b56 [Tathagata Das] Added Java API and added more Scala and Java unit tests. Also updated docs. e73589c [Tathagata Das] Minor changes. 4986784 [Tathagata Das] Added unit test to kafka offset recovery 6a91cab [Tathagata Das] Added example
Diffstat (limited to 'external/kafka/src/main')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala5
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala3
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala12
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala23
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala353
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala21
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala53
7 files changed, 348 insertions, 122 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index c7bca43eb8..04e65cb3d7 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -50,14 +50,13 @@ import org.apache.spark.streaming.dstream._
* @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
* starting point of the stream
* @param messageHandler function for translating each message into the desired type
- * @param maxRetries maximum number of times in a row to retry getting leaders' offsets
*/
private[streaming]
class DirectKafkaInputDStream[
K: ClassTag,
V: ClassTag,
- U <: Decoder[_]: ClassTag,
- T <: Decoder[_]: ClassTag,
+ U <: Decoder[K]: ClassTag,
+ T <: Decoder[V]: ClassTag,
R: ClassTag](
@transient ssc_ : StreamingContext,
val kafkaParams: Map[String, String],
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
index ccc62bfe8f..2f7e0ab39f 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -332,6 +332,9 @@ object KafkaCluster {
extends ConsumerConfig(originalProps) {
val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
val hpa = hp.split(":")
+ if (hpa.size == 1) {
+ throw new SparkException(s"Broker not the in correct format of <host>:<port> [$brokers]")
+ }
(hpa(0), hpa(1).toInt)
}
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
index 50bf7cbdb8..d56cc01be9 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -36,14 +36,12 @@ import kafka.utils.VerifiableProperties
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>.
- * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
- * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
- * @param batch Each KafkaRDDPartition in the batch corresponds to a
- * range of offsets for a given Kafka topic/partition
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
* @param messageHandler function for translating each message into the desired type
*/
-private[spark]
+private[kafka]
class KafkaRDD[
K: ClassTag,
V: ClassTag,
@@ -183,7 +181,7 @@ class KafkaRDD[
}
}
-private[spark]
+private[kafka]
object KafkaRDD {
import KafkaCluster.LeaderOffset
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
index 36372e08f6..a842a6f177 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
@@ -26,7 +26,7 @@ import org.apache.spark.Partition
* @param host preferred kafka host, i.e. the leader at the time the rdd was created
* @param port preferred kafka host's port
*/
-private[spark]
+private[kafka]
class KafkaRDDPartition(
val index: Int,
val topic: String,
@@ -36,24 +36,3 @@ class KafkaRDDPartition(
val host: String,
val port: Int
) extends Partition
-
-private[spark]
-object KafkaRDDPartition {
- def apply(
- index: Int,
- topic: String,
- partition: Int,
- fromOffset: Long,
- untilOffset: Long,
- host: String,
- port: Int
- ): KafkaRDDPartition = new KafkaRDDPartition(
- index,
- topic,
- partition,
- fromOffset,
- untilOffset,
- host,
- port
- )
-}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index f8aa6c5c62..7a2c3abdcc 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -18,7 +18,9 @@
package org.apache.spark.streaming.kafka
import java.lang.{Integer => JInt}
+import java.lang.{Long => JLong}
import java.util.{Map => JMap}
+import java.util.{Set => JSet}
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
@@ -27,18 +29,19 @@ import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.{Decoder, StringDecoder}
-
+import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.api.java.{JavaPairInputDStream, JavaInputDStream, JavaPairReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
+import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
object KafkaUtils {
/**
- * Create an input stream that pulls messages from a Kafka Broker.
+ * Create an input stream that pulls messages from Kafka Brokers.
* @param ssc StreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId The group id for this consumer
@@ -62,7 +65,7 @@ object KafkaUtils {
}
/**
- * Create an input stream that pulls messages from a Kafka Broker.
+ * Create an input stream that pulls messages from Kafka Brokers.
* @param ssc StreamingContext object
* @param kafkaParams Map of kafka configuration parameters,
* see http://kafka.apache.org/08/configuration.html
@@ -81,7 +84,7 @@ object KafkaUtils {
}
/**
- * Create an input stream that pulls messages from a Kafka Broker.
+ * Create an input stream that pulls messages from Kafka Brokers.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
@@ -99,7 +102,7 @@ object KafkaUtils {
}
/**
- * Create an input stream that pulls messages from a Kafka Broker.
+ * Create an input stream that pulls messages from Kafka Brokers.
* @param jssc JavaStreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
@@ -119,10 +122,10 @@ object KafkaUtils {
}
/**
- * Create an input stream that pulls messages from a Kafka Broker.
+ * Create an input stream that pulls messages from Kafka Brokers.
* @param jssc JavaStreamingContext object
- * @param keyTypeClass Key type of RDD
- * @param valueTypeClass value type of RDD
+ * @param keyTypeClass Key type of DStream
+ * @param valueTypeClass value type of Dstream
* @param keyDecoderClass Type of kafka key decoder
* @param valueDecoderClass Type of kafka value decoder
* @param kafkaParams Map of kafka configuration parameters,
@@ -151,14 +154,14 @@ object KafkaUtils {
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}
- /** A batch-oriented interface for consuming from Kafka.
- * Starting and ending offsets are specified in advance,
- * so that you can control exactly-once semantics.
+ /**
+ * Create a RDD from Kafka using offset ranges for each topic and partition.
+ *
* @param sc SparkContext object
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>.
- * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
- * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
*/
@@ -166,12 +169,12 @@ object KafkaUtils {
def createRDD[
K: ClassTag,
V: ClassTag,
- U <: Decoder[_]: ClassTag,
- T <: Decoder[_]: ClassTag] (
+ KD <: Decoder[K]: ClassTag,
+ VD <: Decoder[V]: ClassTag](
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange]
- ): RDD[(K, V)] = {
+ ): RDD[(K, V)] = {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val kc = new KafkaCluster(kafkaParams)
val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet
@@ -179,121 +182,196 @@ object KafkaUtils {
errs => throw new SparkException(errs.mkString("\n")),
ok => ok
)
- new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
+ new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
}
- /** A batch-oriented interface for consuming from Kafka.
- * Starting and ending offsets are specified in advance,
- * so that you can control exactly-once semantics.
+ /**
+ * :: Experimental ::
+ * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
+ * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
+ * as the metadata.
+ *
* @param sc SparkContext object
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>.
- * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
- * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
* @param leaders Kafka leaders for each offset range in batch
- * @param messageHandler function for translating each message into the desired type
+ * @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
def createRDD[
K: ClassTag,
V: ClassTag,
- U <: Decoder[_]: ClassTag,
- T <: Decoder[_]: ClassTag,
- R: ClassTag] (
+ KD <: Decoder[K]: ClassTag,
+ VD <: Decoder[V]: ClassTag,
+ R: ClassTag](
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange],
leaders: Array[Leader],
messageHandler: MessageAndMetadata[K, V] => R
- ): RDD[R] = {
-
+ ): RDD[R] = {
val leaderMap = leaders
.map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
.toMap
- new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
+ new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
}
+
/**
- * This stream can guarantee that each message from Kafka is included in transformations
- * (as opposed to output actions) exactly once, even in most failure situations.
+ * Create a RDD from Kafka using offset ranges for each topic and partition.
*
- * Points to note:
- *
- * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them
- * as the fromOffsets parameter on restart.
- * Kafka must have sufficient log retention to obtain messages after failure.
- *
- * Getting offsets from the stream - see programming guide
+ * @param jsc JavaSparkContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param offsetRanges Each OffsetRange in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ */
+ @Experimental
+ def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
+ jsc: JavaSparkContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ keyDecoderClass: Class[KD],
+ valueDecoderClass: Class[VD],
+ kafkaParams: JMap[String, String],
+ offsetRanges: Array[OffsetRange]
+ ): JavaPairRDD[K, V] = {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+ implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+ new JavaPairRDD(createRDD[K, V, KD, VD](
+ jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
+ }
+
+ /**
+ * :: Experimental ::
+ * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
+ * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
+ * as the metadata.
*
-. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors
- * that depend on Zookeeper, you must store offsets in ZK yourself.
+ * @param jsc JavaSparkContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param offsetRanges Each OffsetRange in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ * @param leaders Kafka leaders for each offset range in batch
+ * @param messageHandler Function for translating each message and metadata into the desired type
+ */
+ @Experimental
+ def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
+ jsc: JavaSparkContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ keyDecoderClass: Class[KD],
+ valueDecoderClass: Class[VD],
+ recordClass: Class[R],
+ kafkaParams: JMap[String, String],
+ offsetRanges: Array[OffsetRange],
+ leaders: Array[Leader],
+ messageHandler: JFunction[MessageAndMetadata[K, V], R]
+ ): JavaRDD[R] = {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+ implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+ implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+ createRDD[K, V, KD, VD, R](
+ jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _)
+ }
+
+ /**
+ * :: Experimental ::
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
*
- * End-to-end semantics - This does not guarantee that any output operation will push each record
- * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and
- * outputting exactly once), you have to either ensure that the output operation is
- * idempotent, or transactionally store offsets with the output. See the programming guide for
- * more details.
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself. For interoperability with Kafka monitoring tools that depend on
+ * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
*
* @param ssc StreamingContext object
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>.
- * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
- * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
- * @param messageHandler function for translating each message into the desired type
- * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
- * starting point of the stream
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the stream
+ * @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
def createDirectStream[
K: ClassTag,
V: ClassTag,
- U <: Decoder[_]: ClassTag,
- T <: Decoder[_]: ClassTag,
+ KD <: Decoder[K]: ClassTag,
+ VD <: Decoder[V]: ClassTag,
R: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R] = {
- new DirectKafkaInputDStream[K, V, U, T, R](
+ new DirectKafkaInputDStream[K, V, KD, VD, R](
ssc, kafkaParams, fromOffsets, messageHandler)
}
/**
- * This stream can guarantee that each message from Kafka is included in transformations
- * (as opposed to output actions) exactly once, even in most failure situations.
+ * :: Experimental ::
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
*
* Points to note:
- *
- * Failure Recovery - You must checkpoint this stream.
- * Kafka must have sufficient log retention to obtain messages after failure.
- *
- * Getting offsets from the stream - see programming guide
- *
-. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors
- * that depend on Zookeeper, you must store offsets in ZK yourself.
- *
- * End-to-end semantics - This does not guarantee that any output operation will push each record
- * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and
- * outputting exactly once), you have to ensure that the output operation is idempotent.
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself. For interoperability with Kafka monitoring tools that depend on
+ * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
*
* @param ssc StreamingContext object
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>.
- * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
- * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
- * If starting without a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+ * host1:port1,host2:port2 form.
+ * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
* to determine where the stream starts (defaults to "largest")
- * @param topics names of the topics to consume
+ * @param topics Names of the topics to consume
*/
@Experimental
def createDirectStream[
K: ClassTag,
V: ClassTag,
- U <: Decoder[_]: ClassTag,
- T <: Decoder[_]: ClassTag] (
+ KD <: Decoder[K]: ClassTag,
+ VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
@@ -313,11 +391,128 @@ object KafkaUtils {
val fromOffsets = leaderOffsets.map { case (tp, lo) =>
(tp, lo.offset)
}
- new DirectKafkaInputDStream[K, V, U, T, (K, V)](
+ new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, fromOffsets, messageHandler)
}).fold(
errs => throw new SparkException(errs.mkString("\n")),
ok => ok
)
}
+
+ /**
+ * :: Experimental ::
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
+ *
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself. For interoperability with Kafka monitoring tools that depend on
+ * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param keyClass Class of the keys in the Kafka records
+ * @param valueClass Class of the values in the Kafka records
+ * @param keyDecoderClass Class of the key decoder
+ * @param valueDecoderClass Class of the value decoder
+ * @param recordClass Class of the records in DStream
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+ * host1:port1,host2:port2 form.
+ * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the stream
+ * @param messageHandler Function for translating each message and metadata into the desired type
+ */
+ @Experimental
+ def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
+ jssc: JavaStreamingContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ keyDecoderClass: Class[KD],
+ valueDecoderClass: Class[VD],
+ recordClass: Class[R],
+ kafkaParams: JMap[String, String],
+ fromOffsets: JMap[TopicAndPartition, JLong],
+ messageHandler: JFunction[MessageAndMetadata[K, V], R]
+ ): JavaInputDStream[R] = {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+ implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+ implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+ createDirectStream[K, V, KD, VD, R](
+ jssc.ssc,
+ Map(kafkaParams.toSeq: _*),
+ Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
+ messageHandler.call _
+ )
+ }
+
+ /**
+ * :: Experimental ::
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
+ *
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself. For interoperability with Kafka monitoring tools that depend on
+ * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param keyClass Class of the keys in the Kafka records
+ * @param valueClass Class of the values in the Kafka records
+ * @param keyDecoderClass Class of the key decoder
+ * @param valueDecoderClass Class type of the value decoder
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+ * host1:port1,host2:port2 form.
+ * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
+ * to determine where the stream starts (defaults to "largest")
+ * @param topics Names of the topics to consume
+ */
+ @Experimental
+ def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
+ jssc: JavaStreamingContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ keyDecoderClass: Class[KD],
+ valueDecoderClass: Class[VD],
+ kafkaParams: JMap[String, String],
+ topics: JSet[String]
+ ): JavaPairInputDStream[K, V] = {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+ implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+ createDirectStream[K, V, KD, VD](
+ jssc.ssc,
+ Map(kafkaParams.toSeq: _*),
+ Set(topics.toSeq: _*)
+ )
+ }
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
index 3454d92e72..c129a26836 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
@@ -19,17 +19,28 @@ package org.apache.spark.streaming.kafka
import kafka.common.TopicAndPartition
-/** Host info for the leader of a Kafka TopicAndPartition */
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Represent the host info for the leader of a Kafka partition.
+ */
+@Experimental
final class Leader private(
- /** kafka topic name */
+ /** Kafka topic name */
val topic: String,
- /** kafka partition id */
+ /** Kafka partition id */
val partition: Int,
- /** kafka hostname */
+ /** Leader's hostname */
val host: String,
- /** kafka host's port */
+ /** Leader's port */
val port: Int) extends Serializable
+/**
+ * :: Experimental ::
+ * Companion object the provides methods to create instances of [[Leader]].
+ */
+@Experimental
object Leader {
def create(topic: String, partition: Int, host: String, port: Int): Leader =
new Leader(topic, partition, host, port)
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
index 334c12e462..9c3dfeb8f5 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
@@ -19,16 +19,35 @@ package org.apache.spark.streaming.kafka
import kafka.common.TopicAndPartition
-/** Something that has a collection of OffsetRanges */
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Represents any object that has a collection of [[OffsetRange]]s. This can be used access the
+ * offset ranges in RDDs generated by the direct Kafka DStream (see
+ * [[KafkaUtils.createDirectStream()]]).
+ * {{{
+ * KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
+ * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ * ...
+ * }
+ * }}}
+ */
+@Experimental
trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}
-/** Represents a range of offsets from a single Kafka TopicAndPartition */
+/**
+ * :: Experimental ::
+ * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class
+ * can be created with `OffsetRange.create()`.
+ */
+@Experimental
final class OffsetRange private(
- /** kafka topic name */
+ /** Kafka topic name */
val topic: String,
- /** kafka partition id */
+ /** Kafka partition id */
val partition: Int,
/** inclusive starting offset */
val fromOffset: Long,
@@ -36,11 +55,33 @@ final class OffsetRange private(
val untilOffset: Long) extends Serializable {
import OffsetRange.OffsetRangeTuple
+ override def equals(obj: Any): Boolean = obj match {
+ case that: OffsetRange =>
+ this.topic == that.topic &&
+ this.partition == that.partition &&
+ this.fromOffset == that.fromOffset &&
+ this.untilOffset == that.untilOffset
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ toTuple.hashCode()
+ }
+
+ override def toString(): String = {
+ s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset]"
+ }
+
/** this is to avoid ClassNotFoundException during checkpoint restore */
private[streaming]
def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset)
}
+/**
+ * :: Experimental ::
+ * Companion object the provides methods to create instances of [[OffsetRange]].
+ */
+@Experimental
object OffsetRange {
def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
new OffsetRange(topic, partition, fromOffset, untilOffset)
@@ -61,10 +102,10 @@ object OffsetRange {
new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)
/** this is to avoid ClassNotFoundException during checkpoint restore */
- private[spark]
+ private[kafka]
type OffsetRangeTuple = (String, Int, Long, Long)
- private[streaming]
+ private[kafka]
def apply(t: OffsetRangeTuple) =
new OffsetRange(t._1, t._2, t._3, t._4)
}