aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-02-09 22:45:48 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-02-09 22:45:48 -0800
commitc15134632e74e3dee05eda20c6ef79915e15d02e (patch)
tree4c97e1c6b7951d97950a7ff45c43b79d60733ede /external
parentef2f55b97f58fa06acb30e9e0172fb66fba383bc (diff)
downloadspark-c15134632e74e3dee05eda20c6ef79915e15d02e.tar.gz
spark-c15134632e74e3dee05eda20c6ef79915e15d02e.tar.bz2
spark-c15134632e74e3dee05eda20c6ef79915e15d02e.zip
[SPARK-4964][Streaming][Kafka] More updates to Exactly-once Kafka stream
Changes - Added example - Added a critical unit test that verifies that offset ranges can be recovered through checkpoints Might add more changes. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #4384 from tdas/new-kafka-fixes and squashes the following commits: 7c931c3 [Tathagata Das] Small update 3ed9284 [Tathagata Das] updated scala doc 83d0402 [Tathagata Das] Added JavaDirectKafkaWordCount example. 26df23c [Tathagata Das] Updates based on PR comments from Cody e4abf69 [Tathagata Das] Scala doc improvements and stuff. bb65232 [Tathagata Das] Fixed test bug and refactored KafkaStreamSuite 50f2b56 [Tathagata Das] Added Java API and added more Scala and Java unit tests. Also updated docs. e73589c [Tathagata Das] Minor changes. 4986784 [Tathagata Das] Added unit test to kafka offset recovery 6a91cab [Tathagata Das] Added example
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala5
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala3
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala12
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala23
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala353
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala21
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala53
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java159
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java5
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala302
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala24
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala92
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala8
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala62
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala4
15 files changed, 864 insertions, 262 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index c7bca43eb8..04e65cb3d7 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -50,14 +50,13 @@ import org.apache.spark.streaming.dstream._
* @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
* starting point of the stream
* @param messageHandler function for translating each message into the desired type
- * @param maxRetries maximum number of times in a row to retry getting leaders' offsets
*/
private[streaming]
class DirectKafkaInputDStream[
K: ClassTag,
V: ClassTag,
- U <: Decoder[_]: ClassTag,
- T <: Decoder[_]: ClassTag,
+ U <: Decoder[K]: ClassTag,
+ T <: Decoder[V]: ClassTag,
R: ClassTag](
@transient ssc_ : StreamingContext,
val kafkaParams: Map[String, String],
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
index ccc62bfe8f..2f7e0ab39f 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -332,6 +332,9 @@ object KafkaCluster {
extends ConsumerConfig(originalProps) {
val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
val hpa = hp.split(":")
+ if (hpa.size == 1) {
+ throw new SparkException(s"Broker not the in correct format of <host>:<port> [$brokers]")
+ }
(hpa(0), hpa(1).toInt)
}
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
index 50bf7cbdb8..d56cc01be9 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -36,14 +36,12 @@ import kafka.utils.VerifiableProperties
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>.
- * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
- * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
- * @param batch Each KafkaRDDPartition in the batch corresponds to a
- * range of offsets for a given Kafka topic/partition
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
* @param messageHandler function for translating each message into the desired type
*/
-private[spark]
+private[kafka]
class KafkaRDD[
K: ClassTag,
V: ClassTag,
@@ -183,7 +181,7 @@ class KafkaRDD[
}
}
-private[spark]
+private[kafka]
object KafkaRDD {
import KafkaCluster.LeaderOffset
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
index 36372e08f6..a842a6f177 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
@@ -26,7 +26,7 @@ import org.apache.spark.Partition
* @param host preferred kafka host, i.e. the leader at the time the rdd was created
* @param port preferred kafka host's port
*/
-private[spark]
+private[kafka]
class KafkaRDDPartition(
val index: Int,
val topic: String,
@@ -36,24 +36,3 @@ class KafkaRDDPartition(
val host: String,
val port: Int
) extends Partition
-
-private[spark]
-object KafkaRDDPartition {
- def apply(
- index: Int,
- topic: String,
- partition: Int,
- fromOffset: Long,
- untilOffset: Long,
- host: String,
- port: Int
- ): KafkaRDDPartition = new KafkaRDDPartition(
- index,
- topic,
- partition,
- fromOffset,
- untilOffset,
- host,
- port
- )
-}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index f8aa6c5c62..7a2c3abdcc 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -18,7 +18,9 @@
package org.apache.spark.streaming.kafka
import java.lang.{Integer => JInt}
+import java.lang.{Long => JLong}
import java.util.{Map => JMap}
+import java.util.{Set => JSet}
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
@@ -27,18 +29,19 @@ import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.{Decoder, StringDecoder}
-
+import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.api.java.{JavaPairInputDStream, JavaInputDStream, JavaPairReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
+import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
object KafkaUtils {
/**
- * Create an input stream that pulls messages from a Kafka Broker.
+ * Create an input stream that pulls messages from Kafka Brokers.
* @param ssc StreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId The group id for this consumer
@@ -62,7 +65,7 @@ object KafkaUtils {
}
/**
- * Create an input stream that pulls messages from a Kafka Broker.
+ * Create an input stream that pulls messages from Kafka Brokers.
* @param ssc StreamingContext object
* @param kafkaParams Map of kafka configuration parameters,
* see http://kafka.apache.org/08/configuration.html
@@ -81,7 +84,7 @@ object KafkaUtils {
}
/**
- * Create an input stream that pulls messages from a Kafka Broker.
+ * Create an input stream that pulls messages from Kafka Brokers.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
@@ -99,7 +102,7 @@ object KafkaUtils {
}
/**
- * Create an input stream that pulls messages from a Kafka Broker.
+ * Create an input stream that pulls messages from Kafka Brokers.
* @param jssc JavaStreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
@@ -119,10 +122,10 @@ object KafkaUtils {
}
/**
- * Create an input stream that pulls messages from a Kafka Broker.
+ * Create an input stream that pulls messages from Kafka Brokers.
* @param jssc JavaStreamingContext object
- * @param keyTypeClass Key type of RDD
- * @param valueTypeClass value type of RDD
+ * @param keyTypeClass Key type of DStream
+ * @param valueTypeClass value type of Dstream
* @param keyDecoderClass Type of kafka key decoder
* @param valueDecoderClass Type of kafka value decoder
* @param kafkaParams Map of kafka configuration parameters,
@@ -151,14 +154,14 @@ object KafkaUtils {
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}
- /** A batch-oriented interface for consuming from Kafka.
- * Starting and ending offsets are specified in advance,
- * so that you can control exactly-once semantics.
+ /**
+ * Create a RDD from Kafka using offset ranges for each topic and partition.
+ *
* @param sc SparkContext object
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>.
- * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
- * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
*/
@@ -166,12 +169,12 @@ object KafkaUtils {
def createRDD[
K: ClassTag,
V: ClassTag,
- U <: Decoder[_]: ClassTag,
- T <: Decoder[_]: ClassTag] (
+ KD <: Decoder[K]: ClassTag,
+ VD <: Decoder[V]: ClassTag](
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange]
- ): RDD[(K, V)] = {
+ ): RDD[(K, V)] = {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val kc = new KafkaCluster(kafkaParams)
val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet
@@ -179,121 +182,196 @@ object KafkaUtils {
errs => throw new SparkException(errs.mkString("\n")),
ok => ok
)
- new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
+ new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
}
- /** A batch-oriented interface for consuming from Kafka.
- * Starting and ending offsets are specified in advance,
- * so that you can control exactly-once semantics.
+ /**
+ * :: Experimental ::
+ * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
+ * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
+ * as the metadata.
+ *
* @param sc SparkContext object
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>.
- * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
- * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
* @param leaders Kafka leaders for each offset range in batch
- * @param messageHandler function for translating each message into the desired type
+ * @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
def createRDD[
K: ClassTag,
V: ClassTag,
- U <: Decoder[_]: ClassTag,
- T <: Decoder[_]: ClassTag,
- R: ClassTag] (
+ KD <: Decoder[K]: ClassTag,
+ VD <: Decoder[V]: ClassTag,
+ R: ClassTag](
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange],
leaders: Array[Leader],
messageHandler: MessageAndMetadata[K, V] => R
- ): RDD[R] = {
-
+ ): RDD[R] = {
val leaderMap = leaders
.map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
.toMap
- new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
+ new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
}
+
/**
- * This stream can guarantee that each message from Kafka is included in transformations
- * (as opposed to output actions) exactly once, even in most failure situations.
+ * Create a RDD from Kafka using offset ranges for each topic and partition.
*
- * Points to note:
- *
- * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them
- * as the fromOffsets parameter on restart.
- * Kafka must have sufficient log retention to obtain messages after failure.
- *
- * Getting offsets from the stream - see programming guide
+ * @param jsc JavaSparkContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param offsetRanges Each OffsetRange in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ */
+ @Experimental
+ def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
+ jsc: JavaSparkContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ keyDecoderClass: Class[KD],
+ valueDecoderClass: Class[VD],
+ kafkaParams: JMap[String, String],
+ offsetRanges: Array[OffsetRange]
+ ): JavaPairRDD[K, V] = {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+ implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+ new JavaPairRDD(createRDD[K, V, KD, VD](
+ jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
+ }
+
+ /**
+ * :: Experimental ::
+ * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
+ * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
+ * as the metadata.
*
-. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors
- * that depend on Zookeeper, you must store offsets in ZK yourself.
+ * @param jsc JavaSparkContext object
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param offsetRanges Each OffsetRange in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ * @param leaders Kafka leaders for each offset range in batch
+ * @param messageHandler Function for translating each message and metadata into the desired type
+ */
+ @Experimental
+ def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
+ jsc: JavaSparkContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ keyDecoderClass: Class[KD],
+ valueDecoderClass: Class[VD],
+ recordClass: Class[R],
+ kafkaParams: JMap[String, String],
+ offsetRanges: Array[OffsetRange],
+ leaders: Array[Leader],
+ messageHandler: JFunction[MessageAndMetadata[K, V], R]
+ ): JavaRDD[R] = {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+ implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+ implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+ createRDD[K, V, KD, VD, R](
+ jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _)
+ }
+
+ /**
+ * :: Experimental ::
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
*
- * End-to-end semantics - This does not guarantee that any output operation will push each record
- * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and
- * outputting exactly once), you have to either ensure that the output operation is
- * idempotent, or transactionally store offsets with the output. See the programming guide for
- * more details.
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself. For interoperability with Kafka monitoring tools that depend on
+ * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
*
* @param ssc StreamingContext object
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>.
- * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
- * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
- * @param messageHandler function for translating each message into the desired type
- * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
- * starting point of the stream
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the stream
+ * @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
def createDirectStream[
K: ClassTag,
V: ClassTag,
- U <: Decoder[_]: ClassTag,
- T <: Decoder[_]: ClassTag,
+ KD <: Decoder[K]: ClassTag,
+ VD <: Decoder[V]: ClassTag,
R: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R] = {
- new DirectKafkaInputDStream[K, V, U, T, R](
+ new DirectKafkaInputDStream[K, V, KD, VD, R](
ssc, kafkaParams, fromOffsets, messageHandler)
}
/**
- * This stream can guarantee that each message from Kafka is included in transformations
- * (as opposed to output actions) exactly once, even in most failure situations.
+ * :: Experimental ::
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
*
* Points to note:
- *
- * Failure Recovery - You must checkpoint this stream.
- * Kafka must have sufficient log retention to obtain messages after failure.
- *
- * Getting offsets from the stream - see programming guide
- *
-. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors
- * that depend on Zookeeper, you must store offsets in ZK yourself.
- *
- * End-to-end semantics - This does not guarantee that any output operation will push each record
- * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and
- * outputting exactly once), you have to ensure that the output operation is idempotent.
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself. For interoperability with Kafka monitoring tools that depend on
+ * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
*
* @param ssc StreamingContext object
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>.
- * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
- * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
- * If starting without a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+ * host1:port1,host2:port2 form.
+ * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
* to determine where the stream starts (defaults to "largest")
- * @param topics names of the topics to consume
+ * @param topics Names of the topics to consume
*/
@Experimental
def createDirectStream[
K: ClassTag,
V: ClassTag,
- U <: Decoder[_]: ClassTag,
- T <: Decoder[_]: ClassTag] (
+ KD <: Decoder[K]: ClassTag,
+ VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
@@ -313,11 +391,128 @@ object KafkaUtils {
val fromOffsets = leaderOffsets.map { case (tp, lo) =>
(tp, lo.offset)
}
- new DirectKafkaInputDStream[K, V, U, T, (K, V)](
+ new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, fromOffsets, messageHandler)
}).fold(
errs => throw new SparkException(errs.mkString("\n")),
ok => ok
)
}
+
+ /**
+ * :: Experimental ::
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
+ *
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself. For interoperability with Kafka monitoring tools that depend on
+ * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param keyClass Class of the keys in the Kafka records
+ * @param valueClass Class of the values in the Kafka records
+ * @param keyDecoderClass Class of the key decoder
+ * @param valueDecoderClass Class of the value decoder
+ * @param recordClass Class of the records in DStream
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+ * host1:port1,host2:port2 form.
+ * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the stream
+ * @param messageHandler Function for translating each message and metadata into the desired type
+ */
+ @Experimental
+ def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
+ jssc: JavaStreamingContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ keyDecoderClass: Class[KD],
+ valueDecoderClass: Class[VD],
+ recordClass: Class[R],
+ kafkaParams: JMap[String, String],
+ fromOffsets: JMap[TopicAndPartition, JLong],
+ messageHandler: JFunction[MessageAndMetadata[K, V], R]
+ ): JavaInputDStream[R] = {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+ implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+ implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+ createDirectStream[K, V, KD, VD, R](
+ jssc.ssc,
+ Map(kafkaParams.toSeq: _*),
+ Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
+ messageHandler.call _
+ )
+ }
+
+ /**
+ * :: Experimental ::
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
+ *
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself. For interoperability with Kafka monitoring tools that depend on
+ * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param keyClass Class of the keys in the Kafka records
+ * @param valueClass Class of the values in the Kafka records
+ * @param keyDecoderClass Class of the key decoder
+ * @param valueDecoderClass Class type of the value decoder
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+ * host1:port1,host2:port2 form.
+ * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
+ * to determine where the stream starts (defaults to "largest")
+ * @param topics Names of the topics to consume
+ */
+ @Experimental
+ def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
+ jssc: JavaStreamingContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ keyDecoderClass: Class[KD],
+ valueDecoderClass: Class[VD],
+ kafkaParams: JMap[String, String],
+ topics: JSet[String]
+ ): JavaPairInputDStream[K, V] = {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+ implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+ createDirectStream[K, V, KD, VD](
+ jssc.ssc,
+ Map(kafkaParams.toSeq: _*),
+ Set(topics.toSeq: _*)
+ )
+ }
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
index 3454d92e72..c129a26836 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
@@ -19,17 +19,28 @@ package org.apache.spark.streaming.kafka
import kafka.common.TopicAndPartition
-/** Host info for the leader of a Kafka TopicAndPartition */
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Represent the host info for the leader of a Kafka partition.
+ */
+@Experimental
final class Leader private(
- /** kafka topic name */
+ /** Kafka topic name */
val topic: String,
- /** kafka partition id */
+ /** Kafka partition id */
val partition: Int,
- /** kafka hostname */
+ /** Leader's hostname */
val host: String,
- /** kafka host's port */
+ /** Leader's port */
val port: Int) extends Serializable
+/**
+ * :: Experimental ::
+ * Companion object the provides methods to create instances of [[Leader]].
+ */
+@Experimental
object Leader {
def create(topic: String, partition: Int, host: String, port: Int): Leader =
new Leader(topic, partition, host, port)
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
index 334c12e462..9c3dfeb8f5 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
@@ -19,16 +19,35 @@ package org.apache.spark.streaming.kafka
import kafka.common.TopicAndPartition
-/** Something that has a collection of OffsetRanges */
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Represents any object that has a collection of [[OffsetRange]]s. This can be used access the
+ * offset ranges in RDDs generated by the direct Kafka DStream (see
+ * [[KafkaUtils.createDirectStream()]]).
+ * {{{
+ * KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
+ * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ * ...
+ * }
+ * }}}
+ */
+@Experimental
trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}
-/** Represents a range of offsets from a single Kafka TopicAndPartition */
+/**
+ * :: Experimental ::
+ * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class
+ * can be created with `OffsetRange.create()`.
+ */
+@Experimental
final class OffsetRange private(
- /** kafka topic name */
+ /** Kafka topic name */
val topic: String,
- /** kafka partition id */
+ /** Kafka partition id */
val partition: Int,
/** inclusive starting offset */
val fromOffset: Long,
@@ -36,11 +55,33 @@ final class OffsetRange private(
val untilOffset: Long) extends Serializable {
import OffsetRange.OffsetRangeTuple
+ override def equals(obj: Any): Boolean = obj match {
+ case that: OffsetRange =>
+ this.topic == that.topic &&
+ this.partition == that.partition &&
+ this.fromOffset == that.fromOffset &&
+ this.untilOffset == that.untilOffset
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ toTuple.hashCode()
+ }
+
+ override def toString(): String = {
+ s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset]"
+ }
+
/** this is to avoid ClassNotFoundException during checkpoint restore */
private[streaming]
def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset)
}
+/**
+ * :: Experimental ::
+ * Companion object the provides methods to create instances of [[OffsetRange]].
+ */
+@Experimental
object OffsetRange {
def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
new OffsetRange(topic, partition, fromOffset, untilOffset)
@@ -61,10 +102,10 @@ object OffsetRange {
new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)
/** this is to avoid ClassNotFoundException during checkpoint restore */
- private[spark]
+ private[kafka]
type OffsetRangeTuple = (String, Int, Long, Long)
- private[streaming]
+ private[kafka]
def apply(t: OffsetRangeTuple) =
new OffsetRange(t._1, t._2, t._3, t._4)
}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
new file mode 100644
index 0000000000..1334cc8fd1
--- /dev/null
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Arrays;
+
+import org.apache.spark.SparkConf;
+
+import scala.Tuple2;
+
+import junit.framework.Assert;
+
+import kafka.common.TopicAndPartition;
+import kafka.message.MessageAndMetadata;
+import kafka.serializer.StringDecoder;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+import org.junit.Test;
+import org.junit.After;
+import org.junit.Before;
+
+public class JavaDirectKafkaStreamSuite implements Serializable {
+ private transient JavaStreamingContext ssc = null;
+ private transient Random random = new Random();
+ private transient KafkaStreamSuiteBase suiteBase = null;
+
+ @Before
+ public void setUp() {
+ suiteBase = new KafkaStreamSuiteBase() { };
+ suiteBase.setupKafka();
+ System.clearProperty("spark.driver.port");
+ SparkConf sparkConf = new SparkConf()
+ .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+ ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
+ }
+
+ @After
+ public void tearDown() {
+ ssc.stop();
+ ssc = null;
+ System.clearProperty("spark.driver.port");
+ suiteBase.tearDownKafka();
+ }
+
+ @Test
+ public void testKafkaStream() throws InterruptedException {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ String[] topic1data = createTopicAndSendData(topic1);
+ String[] topic2data = createTopicAndSendData(topic2);
+
+ HashSet<String> sent = new HashSet<String>();
+ sent.addAll(Arrays.asList(topic1data));
+ sent.addAll(Arrays.asList(topic2data));
+
+ HashMap<String, String> kafkaParams = new HashMap<String, String>();
+ kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress());
+ kafkaParams.put("auto.offset.reset", "smallest");
+
+ JavaDStream<String> stream1 = KafkaUtils.createDirectStream(
+ ssc,
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ kafkaParams,
+ topicToSet(topic1)
+ ).map(
+ new Function<Tuple2<String, String>, String>() {
+ @Override
+ public String call(scala.Tuple2<String, String> kv) throws Exception {
+ return kv._2();
+ }
+ }
+ );
+
+ JavaDStream<String> stream2 = KafkaUtils.createDirectStream(
+ ssc,
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ String.class,
+ kafkaParams,
+ topicOffsetToMap(topic2, (long) 0),
+ new Function<MessageAndMetadata<String, String>, String>() {
+ @Override
+ public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
+ return msgAndMd.message();
+ }
+ }
+ );
+ JavaDStream<String> unifiedStream = stream1.union(stream2);
+
+ final HashSet<String> result = new HashSet<String>();
+ unifiedStream.foreachRDD(
+ new Function<JavaRDD<String>, Void>() {
+ @Override
+ public Void call(org.apache.spark.api.java.JavaRDD<String> rdd) throws Exception {
+ result.addAll(rdd.collect());
+ return null;
+ }
+ }
+ );
+ ssc.start();
+ long startTime = System.currentTimeMillis();
+ boolean matches = false;
+ while (!matches && System.currentTimeMillis() - startTime < 20000) {
+ matches = sent.size() == result.size();
+ Thread.sleep(50);
+ }
+ Assert.assertEquals(sent, result);
+ ssc.stop();
+ }
+
+ private HashSet<String> topicToSet(String topic) {
+ HashSet<String> topicSet = new HashSet<String>();
+ topicSet.add(topic);
+ return topicSet;
+ }
+
+ private HashMap<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) {
+ HashMap<TopicAndPartition, Long> topicMap = new HashMap<TopicAndPartition, Long>();
+ topicMap.put(new TopicAndPartition(topic, 0), offsetToStart);
+ return topicMap;
+ }
+
+ private String[] createTopicAndSendData(String topic) {
+ String[] data = { topic + "-1", topic + "-2", topic + "-3"};
+ suiteBase.createTopic(topic);
+ suiteBase.sendMessages(topic, data);
+ return data;
+ }
+}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 6e1abf3f38..208cc51b29 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -79,9 +79,10 @@ public class JavaKafkaStreamSuite implements Serializable {
suiteBase.createTopic(topic);
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
- suiteBase.produceAndSendMessage(topic,
+ suiteBase.sendMessages(topic,
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
- Predef.<Tuple2<String, Object>>conforms()));
+ Predef.<Tuple2<String, Object>>conforms())
+ );
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
new file mode 100644
index 0000000000..b25c2120d5
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.io.File
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import kafka.serializer.StringDecoder
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.concurrent.{Eventually, Timeouts}
+
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
+import org.apache.spark.streaming.dstream.{DStream, InputDStream}
+import org.apache.spark.util.Utils
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+
+class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
+ with BeforeAndAfter with BeforeAndAfterAll with Eventually {
+ val sparkConf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName(this.getClass.getSimpleName)
+
+ var sc: SparkContext = _
+ var ssc: StreamingContext = _
+ var testDir: File = _
+
+ override def beforeAll {
+ setupKafka()
+ }
+
+ override def afterAll {
+ tearDownKafka()
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ sc = null
+ }
+ if (sc != null) {
+ sc.stop()
+ }
+ if (testDir != null) {
+ Utils.deleteRecursively(testDir)
+ }
+ }
+
+
+ test("basic stream receiving with multiple topics and smallest starting offset") {
+ val topics = Set("basic1", "basic2", "basic3")
+ val data = Map("a" -> 7, "b" -> 9)
+ topics.foreach { t =>
+ createTopic(t)
+ sendMessages(t, data)
+ }
+ val kafkaParams = Map(
+ "metadata.broker.list" -> s"$brokerAddress",
+ "auto.offset.reset" -> "smallest"
+ )
+
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics)
+ }
+ var total = 0L
+
+ stream.foreachRDD { rdd =>
+ // Get the offset ranges in the RDD
+ val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
+ // For each partition, get size of the range in the partition,
+ // and the number of items in the partition
+ val off = offsets(i)
+ val all = iter.toSeq
+ val partSize = all.size
+ val rangeSize = off.untilOffset - off.fromOffset
+ Iterator((partSize, rangeSize))
+ }.collect
+
+ // Verify whether number of elements in each partition
+ // matches with the corresponding offset range
+ collected.foreach { case (partSize, rangeSize) =>
+ assert(partSize === rangeSize, "offset ranges are wrong")
+ }
+ total += collected.size // Add up all the collected items
+ }
+ ssc.start()
+ eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+ assert(total === data.values.sum * topics.size, "didn't get all messages")
+ }
+ ssc.stop()
+ }
+
+ test("receiving from largest starting offset") {
+ val topic = "largest"
+ val topicPartition = TopicAndPartition(topic, 0)
+ val data = Map("a" -> 10)
+ createTopic(topic)
+ val kafkaParams = Map(
+ "metadata.broker.list" -> s"$brokerAddress",
+ "auto.offset.reset" -> "largest"
+ )
+ val kc = new KafkaCluster(kafkaParams)
+ def getLatestOffset(): Long = {
+ kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
+ }
+
+ // Send some initial messages before starting context
+ sendMessages(topic, data)
+ eventually(timeout(10 seconds), interval(20 milliseconds)) {
+ assert(getLatestOffset() > 3)
+ }
+ val offsetBeforeStart = getLatestOffset()
+
+ // Setup context and kafka stream with largest offset
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, Set(topic))
+ }
+ assert(
+ stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
+ .fromOffsets(topicPartition) >= offsetBeforeStart,
+ "Start offset not from latest"
+ )
+
+ val collectedData = new mutable.ArrayBuffer[String]()
+ stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() }
+ ssc.start()
+ val newData = Map("b" -> 10)
+ sendMessages(topic, newData)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ collectedData.contains("b")
+ }
+ assert(!collectedData.contains("a"))
+ }
+
+
+ test("creating stream by offset") {
+ val topic = "offset"
+ val topicPartition = TopicAndPartition(topic, 0)
+ val data = Map("a" -> 10)
+ createTopic(topic)
+ val kafkaParams = Map(
+ "metadata.broker.list" -> s"$brokerAddress",
+ "auto.offset.reset" -> "largest"
+ )
+ val kc = new KafkaCluster(kafkaParams)
+ def getLatestOffset(): Long = {
+ kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
+ }
+
+ // Send some initial messages before starting context
+ sendMessages(topic, data)
+ eventually(timeout(10 seconds), interval(20 milliseconds)) {
+ assert(getLatestOffset() >= 10)
+ }
+ val offsetBeforeStart = getLatestOffset()
+
+ // Setup context and kafka stream with largest offset
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
+ ssc, kafkaParams, Map(topicPartition -> 11L),
+ (m: MessageAndMetadata[String, String]) => m.message())
+ }
+ assert(
+ stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
+ .fromOffsets(topicPartition) >= offsetBeforeStart,
+ "Start offset not from latest"
+ )
+
+ val collectedData = new mutable.ArrayBuffer[String]()
+ stream.foreachRDD { rdd => collectedData ++= rdd.collect() }
+ ssc.start()
+ val newData = Map("b" -> 10)
+ sendMessages(topic, newData)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ collectedData.contains("b")
+ }
+ assert(!collectedData.contains("a"))
+ }
+
+ // Test to verify the offset ranges can be recovered from the checkpoints
+ test("offset recovery") {
+ val topic = "recovery"
+ createTopic(topic)
+ testDir = Utils.createTempDir()
+
+ val kafkaParams = Map(
+ "metadata.broker.list" -> s"$brokerAddress",
+ "auto.offset.reset" -> "smallest"
+ )
+
+ // Send data to Kafka and wait for it to be received
+ def sendDataAndWaitForReceive(data: Seq[Int]) {
+ val strings = data.map { _.toString}
+ sendMessages(topic, strings.map { _ -> 1}.toMap)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains })
+ }
+ }
+
+ // Setup the streaming context
+ ssc = new StreamingContext(sparkConf, Milliseconds(100))
+ val kafkaStream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, Set(topic))
+ }
+ val keyedStream = kafkaStream.map { v => "key" -> v._2.toInt }
+ val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
+ Some(values.sum + state.getOrElse(0))
+ }
+ ssc.checkpoint(testDir.getAbsolutePath)
+
+ // This is to collect the raw data received from Kafka
+ kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
+ val data = rdd.map { _._2 }.collect()
+ DirectKafkaStreamSuite.collectedData.appendAll(data)
+ }
+
+ // This is ensure all the data is eventually receiving only once
+ stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
+ rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = x._2 }
+ }
+ ssc.start()
+
+ // Send some data and wait for them to be received
+ for (i <- (1 to 10).grouped(4)) {
+ sendDataAndWaitForReceive(i)
+ }
+
+ // Verify that offset ranges were generated
+ val offsetRangesBeforeStop = getOffsetRanges(kafkaStream)
+ assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated")
+ assert(
+ offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 },
+ "starting offset not zero"
+ )
+ ssc.stop()
+ logInfo("====== RESTARTING ========")
+
+ // Recover context from checkpoints
+ ssc = new StreamingContext(testDir.getAbsolutePath)
+ val recoveredStream = ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]]
+
+ // Verify offset ranges have been recovered
+ val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
+ assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
+ val earlierOffsetRangesAsSets = offsetRangesBeforeStop.map { x => (x._1, x._2.toSet) }
+ assert(
+ recoveredOffsetRanges.forall { or =>
+ earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
+ },
+ "Recovered ranges are not the same as the ones generated"
+ )
+
+ // Restart context, give more data and verify the total at the end
+ // If the total is write that means each records has been received only once
+ ssc.start()
+ sendDataAndWaitForReceive(11 to 20)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ assert(DirectKafkaStreamSuite.total === (1 to 20).sum)
+ }
+ ssc.stop()
+ }
+
+ /** Get the generated offset ranges from the DirectKafkaStream */
+ private def getOffsetRanges[K, V](
+ kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = {
+ kafkaStream.generatedRDDs.mapValues { rdd =>
+ rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges
+ }.toSeq.sortBy { _._1 }
+ }
+}
+
+object DirectKafkaStreamSuite {
+ val collectedData = new mutable.ArrayBuffer[String]()
+ var total = -1L
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
index e57c8f6987..fc9275b720 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
@@ -19,33 +19,29 @@ package org.apache.spark.streaming.kafka
import scala.util.Random
-import org.scalatest.BeforeAndAfter
import kafka.common.TopicAndPartition
+import org.scalatest.BeforeAndAfterAll
-class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
- val brokerHost = "localhost"
-
- val kafkaParams = Map("metadata.broker.list" -> s"$brokerHost:$brokerPort")
-
- val kc = new KafkaCluster(kafkaParams)
-
+class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll {
val topic = "kcsuitetopic" + Random.nextInt(10000)
-
val topicAndPartition = TopicAndPartition(topic, 0)
+ var kc: KafkaCluster = null
- before {
+ override def beforeAll() {
setupKafka()
createTopic(topic)
- produceAndSendMessage(topic, Map("a" -> 1))
+ sendMessages(topic, Map("a" -> 1))
+ kc = new KafkaCluster(Map("metadata.broker.list" -> s"$brokerAddress"))
}
- after {
+ override def afterAll() {
tearDownKafka()
}
test("metadata apis") {
- val leader = kc.findLeaders(Set(topicAndPartition)).right.get
- assert(leader(topicAndPartition) === (brokerHost, brokerPort), "didn't get leader")
+ val leader = kc.findLeaders(Set(topicAndPartition)).right.get(topicAndPartition)
+ val leaderAddress = s"${leader._1}:${leader._2}"
+ assert(leaderAddress === brokerAddress, "didn't get leader")
val parts = kc.getPartitions(Set(topic)).right.get
assert(parts(topicAndPartition), "didn't get partitions")
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala
deleted file mode 100644
index 0891ce344f..0000000000
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.util.Random
-import scala.concurrent.duration._
-
-import org.scalatest.BeforeAndAfter
-import org.scalatest.concurrent.Eventually
-
-import kafka.serializer.StringDecoder
-
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-
-class KafkaDirectStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
- val sparkConf = new SparkConf()
- .setMaster("local[4]")
- .setAppName(this.getClass.getSimpleName)
-
- val brokerHost = "localhost"
-
- val kafkaParams = Map(
- "metadata.broker.list" -> s"$brokerHost:$brokerPort",
- "auto.offset.reset" -> "smallest"
- )
-
- var ssc: StreamingContext = _
-
- before {
- setupKafka()
-
- ssc = new StreamingContext(sparkConf, Milliseconds(500))
- }
-
- after {
- if (ssc != null) {
- ssc.stop()
- }
- tearDownKafka()
- }
-
- test("multi topic stream") {
- val topics = Set("newA", "newB")
- val data = Map("a" -> 7, "b" -> 9)
- topics.foreach { t =>
- createTopic(t)
- produceAndSendMessage(t, data)
- }
- val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
- ssc, kafkaParams, topics)
- var total = 0L;
-
- stream.foreachRDD { rdd =>
- val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
- val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
- val off = offsets(i)
- val all = iter.toSeq
- val partSize = all.size
- val rangeSize = off.untilOffset - off.fromOffset
- all.map { _ =>
- (partSize, rangeSize)
- }.toIterator
- }.collect
- collected.foreach { case (partSize, rangeSize) =>
- assert(partSize === rangeSize, "offset ranges are wrong")
- }
- total += collected.size
- }
- ssc.start()
- eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
- assert(total === data.values.sum * topics.size, "didn't get all messages")
- }
- ssc.stop()
- }
-}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
index 9b9e3f5fce..6774db854a 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -46,9 +46,9 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
- produceAndSendMessage(topic, sent)
+ sendMessages(topic, sent)
- val kafkaParams = Map("metadata.broker.list" -> s"localhost:$brokerPort",
+ val kafkaParams = Map("metadata.broker.list" -> brokerAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}")
val kc = new KafkaCluster(kafkaParams)
@@ -65,14 +65,14 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
val rdd2 = getRdd(kc, Set(topic))
val sent2 = Map("d" -> 1)
- produceAndSendMessage(topic, sent2)
+ sendMessages(topic, sent2)
// this is the "0 messages" case
// make sure we dont get anything, since messages were sent after rdd was defined
assert(rdd2.isDefined)
assert(rdd2.get.count === 0)
val rdd3 = getRdd(kc, Set(topic))
- produceAndSendMessage(topic, Map("extra" -> 22))
+ sendMessages(topic, Map("extra" -> 22))
// this is the "exactly 1 message" case
// make sure we get exactly one message, despite there being lots more available
assert(rdd3.isDefined)
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index f207dc6d4f..e4966eebb9 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -48,30 +48,41 @@ import org.apache.spark.util.Utils
*/
abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging {
- var zkAddress: String = _
- var zkClient: ZkClient = _
-
private val zkHost = "localhost"
+ private var zkPort: Int = 0
private val zkConnectionTimeout = 6000
private val zkSessionTimeout = 6000
private var zookeeper: EmbeddedZookeeper = _
- private var zkPort: Int = 0
- protected var brokerPort = 9092
+ private val brokerHost = "localhost"
+ private var brokerPort = 9092
private var brokerConf: KafkaConfig = _
private var server: KafkaServer = _
private var producer: Producer[String, String] = _
+ private var zkReady = false
+ private var brokerReady = false
+
+ protected var zkClient: ZkClient = _
+
+ def zkAddress: String = {
+ assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address")
+ s"$zkHost:$zkPort"
+ }
+
+ def brokerAddress: String = {
+ assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address")
+ s"$brokerHost:$brokerPort"
+ }
def setupKafka() {
// Zookeeper server startup
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
- zkAddress = s"$zkHost:$zkPort"
- logInfo("==================== 0 ====================")
+ zkReady = true
+ logInfo("==================== Zookeeper Started ====================")
- zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout,
- ZKStringSerializer)
- logInfo("==================== 1 ====================")
+ zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
+ logInfo("==================== Zookeeper Client Created ====================")
// Kafka broker startup
var bindSuccess: Boolean = false
@@ -80,9 +91,8 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
val brokerProps = getBrokerConfig()
brokerConf = new KafkaConfig(brokerProps)
server = new KafkaServer(brokerConf)
- logInfo("==================== 2 ====================")
server.startup()
- logInfo("==================== 3 ====================")
+ logInfo("==================== Kafka Broker Started ====================")
bindSuccess = true
} catch {
case e: KafkaException =>
@@ -94,10 +104,13 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
}
Thread.sleep(2000)
- logInfo("==================== 4 ====================")
+ logInfo("==================== Kafka + Zookeeper Ready ====================")
+ brokerReady = true
}
def tearDownKafka() {
+ brokerReady = false
+ zkReady = false
if (producer != null) {
producer.close()
producer = null
@@ -121,26 +134,23 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
}
}
- private def createTestMessage(topic: String, sent: Map[String, Int])
- : Seq[KeyedMessage[String, String]] = {
- val messages = for ((s, freq) <- sent; i <- 0 until freq) yield {
- new KeyedMessage[String, String](topic, s)
- }
- messages.toSeq
- }
-
def createTopic(topic: String) {
AdminUtils.createTopic(zkClient, topic, 1, 1)
- logInfo("==================== 5 ====================")
// wait until metadata is propagated
waitUntilMetadataIsPropagated(topic, 0)
+ logInfo(s"==================== Topic $topic Created ====================")
}
- def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
+ def sendMessages(topic: String, messageToFreq: Map[String, Int]) {
+ val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray
+ sendMessages(topic, messages)
+ }
+
+ def sendMessages(topic: String, messages: Array[String]) {
producer = new Producer[String, String](new ProducerConfig(getProducerConfig()))
- producer.send(createTestMessage(topic, sent): _*)
+ producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
producer.close()
- logInfo("==================== 6 ====================")
+ logInfo(s"==================== Sent Messages: ${messages.mkString(", ")} ====================")
}
private def getBrokerConfig(): Properties = {
@@ -218,7 +228,7 @@ class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
- produceAndSendMessage(topic, sent)
+ sendMessages(topic, sent)
val kafkaParams = Map("zookeeper.connect" -> zkAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}",
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
index 64ccc92c81..fc53c23abd 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
@@ -79,7 +79,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
test("Reliable Kafka input stream with single topic") {
var topic = "test-topic"
createTopic(topic)
- produceAndSendMessage(topic, data)
+ sendMessages(topic, data)
// Verify whether the offset of this group/topic/partition is 0 before starting.
assert(getCommitOffset(groupId, topic, 0) === None)
@@ -111,7 +111,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
topics.foreach { case (t, _) =>
createTopic(t)
- produceAndSendMessage(t, data)
+ sendMessages(t, data)
}
// Before started, verify all the group/topic/partition offsets are 0.