From c15134632e74e3dee05eda20c6ef79915e15d02e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 9 Feb 2015 22:45:48 -0800 Subject: [SPARK-4964][Streaming][Kafka] More updates to Exactly-once Kafka stream Changes - Added example - Added a critical unit test that verifies that offset ranges can be recovered through checkpoints Might add more changes. Author: Tathagata Das Closes #4384 from tdas/new-kafka-fixes and squashes the following commits: 7c931c3 [Tathagata Das] Small update 3ed9284 [Tathagata Das] updated scala doc 83d0402 [Tathagata Das] Added JavaDirectKafkaWordCount example. 26df23c [Tathagata Das] Updates based on PR comments from Cody e4abf69 [Tathagata Das] Scala doc improvements and stuff. bb65232 [Tathagata Das] Fixed test bug and refactored KafkaStreamSuite 50f2b56 [Tathagata Das] Added Java API and added more Scala and Java unit tests. Also updated docs. e73589c [Tathagata Das] Minor changes. 4986784 [Tathagata Das] Added unit test to kafka offset recovery 6a91cab [Tathagata Das] Added example --- .../streaming/kafka/DirectKafkaInputDStream.scala | 5 +- .../spark/streaming/kafka/KafkaCluster.scala | 3 + .../apache/spark/streaming/kafka/KafkaRDD.scala | 12 +- .../spark/streaming/kafka/KafkaRDDPartition.scala | 23 +- .../apache/spark/streaming/kafka/KafkaUtils.scala | 353 ++++++++++++++++----- .../org/apache/spark/streaming/kafka/Leader.scala | 21 +- .../apache/spark/streaming/kafka/OffsetRange.scala | 53 +++- .../kafka/JavaDirectKafkaStreamSuite.java | 159 ++++++++++ .../streaming/kafka/JavaKafkaStreamSuite.java | 5 +- .../streaming/kafka/DirectKafkaStreamSuite.scala | 302 ++++++++++++++++++ .../spark/streaming/kafka/KafkaClusterSuite.scala | 24 +- .../streaming/kafka/KafkaDirectStreamSuite.scala | 92 ------ .../spark/streaming/kafka/KafkaRDDSuite.scala | 8 +- .../spark/streaming/kafka/KafkaStreamSuite.scala | 62 ++-- .../streaming/kafka/ReliableKafkaStreamSuite.scala | 4 +- 15 files changed, 864 insertions(+), 262 deletions(-) create mode 100644 external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java create mode 100644 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala delete mode 100644 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala (limited to 'external') diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index c7bca43eb8..04e65cb3d7 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -50,14 +50,13 @@ import org.apache.spark.streaming.dstream._ * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) * starting point of the stream * @param messageHandler function for translating each message into the desired type - * @param maxRetries maximum number of times in a row to retry getting leaders' offsets */ private[streaming] class DirectKafkaInputDStream[ K: ClassTag, V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag, + U <: Decoder[K]: ClassTag, + T <: Decoder[V]: ClassTag, R: ClassTag]( @transient ssc_ : StreamingContext, val kafkaParams: Map[String, String], diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala index ccc62bfe8f..2f7e0ab39f 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala @@ -332,6 +332,9 @@ object KafkaCluster { extends ConsumerConfig(originalProps) { val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp => val hpa = hp.split(":") + if (hpa.size == 1) { + throw new SparkException(s"Broker not the in correct format of : [$brokers]") + } (hpa(0), hpa(1).toInt) } } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala index 50bf7cbdb8..d56cc01be9 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala @@ -36,14 +36,12 @@ import kafka.utils.VerifiableProperties * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. * @param kafkaParams Kafka - * configuration parameters. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. - * @param batch Each KafkaRDDPartition in the batch corresponds to a - * range of offsets for a given Kafka topic/partition + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD * @param messageHandler function for translating each message into the desired type */ -private[spark] +private[kafka] class KafkaRDD[ K: ClassTag, V: ClassTag, @@ -183,7 +181,7 @@ class KafkaRDD[ } } -private[spark] +private[kafka] object KafkaRDD { import KafkaCluster.LeaderOffset diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala index 36372e08f6..a842a6f177 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala @@ -26,7 +26,7 @@ import org.apache.spark.Partition * @param host preferred kafka host, i.e. the leader at the time the rdd was created * @param port preferred kafka host's port */ -private[spark] +private[kafka] class KafkaRDDPartition( val index: Int, val topic: String, @@ -36,24 +36,3 @@ class KafkaRDDPartition( val host: String, val port: Int ) extends Partition - -private[spark] -object KafkaRDDPartition { - def apply( - index: Int, - topic: String, - partition: Int, - fromOffset: Long, - untilOffset: Long, - host: String, - port: Int - ): KafkaRDDPartition = new KafkaRDDPartition( - index, - topic, - partition, - fromOffset, - untilOffset, - host, - port - ) -} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index f8aa6c5c62..7a2c3abdcc 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -18,7 +18,9 @@ package org.apache.spark.streaming.kafka import java.lang.{Integer => JInt} +import java.lang.{Long => JLong} import java.util.{Map => JMap} +import java.util.{Set => JSet} import scala.reflect.ClassTag import scala.collection.JavaConversions._ @@ -27,18 +29,19 @@ import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.{Decoder, StringDecoder} - +import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.api.java.{JavaPairInputDStream, JavaInputDStream, JavaPairReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream} +import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} object KafkaUtils { /** - * Create an input stream that pulls messages from a Kafka Broker. + * Create an input stream that pulls messages from Kafka Brokers. * @param ssc StreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) * @param groupId The group id for this consumer @@ -62,7 +65,7 @@ object KafkaUtils { } /** - * Create an input stream that pulls messages from a Kafka Broker. + * Create an input stream that pulls messages from Kafka Brokers. * @param ssc StreamingContext object * @param kafkaParams Map of kafka configuration parameters, * see http://kafka.apache.org/08/configuration.html @@ -81,7 +84,7 @@ object KafkaUtils { } /** - * Create an input stream that pulls messages from a Kafka Broker. + * Create an input stream that pulls messages from Kafka Brokers. * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) @@ -99,7 +102,7 @@ object KafkaUtils { } /** - * Create an input stream that pulls messages from a Kafka Broker. + * Create an input stream that pulls messages from Kafka Brokers. * @param jssc JavaStreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. @@ -119,10 +122,10 @@ object KafkaUtils { } /** - * Create an input stream that pulls messages from a Kafka Broker. + * Create an input stream that pulls messages from Kafka Brokers. * @param jssc JavaStreamingContext object - * @param keyTypeClass Key type of RDD - * @param valueTypeClass value type of RDD + * @param keyTypeClass Key type of DStream + * @param valueTypeClass value type of Dstream * @param keyDecoderClass Type of kafka key decoder * @param valueDecoderClass Type of kafka value decoder * @param kafkaParams Map of kafka configuration parameters, @@ -151,14 +154,14 @@ object KafkaUtils { jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } - /** A batch-oriented interface for consuming from Kafka. - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. + /** + * Create a RDD from Kafka using offset ranges for each topic and partition. + * * @param sc SparkContext object * @param kafkaParams Kafka - * configuration parameters. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition */ @@ -166,12 +169,12 @@ object KafkaUtils { def createRDD[ K: ClassTag, V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag] ( + KD <: Decoder[K]: ClassTag, + VD <: Decoder[V]: ClassTag]( sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange] - ): RDD[(K, V)] = { + ): RDD[(K, V)] = { val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) val kc = new KafkaCluster(kafkaParams) val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet @@ -179,121 +182,196 @@ object KafkaUtils { errs => throw new SparkException(errs.mkString("\n")), ok => ok ) - new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) } - /** A batch-oriented interface for consuming from Kafka. - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you + * specify the Kafka leader to connect to (to optimize fetching) and access the message as well + * as the metadata. + * * @param sc SparkContext object * @param kafkaParams Kafka - * configuration parameters. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition * @param leaders Kafka leaders for each offset range in batch - * @param messageHandler function for translating each message into the desired type + * @param messageHandler Function for translating each message and metadata into the desired type */ @Experimental def createRDD[ K: ClassTag, V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag, - R: ClassTag] ( + KD <: Decoder[K]: ClassTag, + VD <: Decoder[V]: ClassTag, + R: ClassTag]( sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange], leaders: Array[Leader], messageHandler: MessageAndMetadata[K, V] => R - ): RDD[R] = { - + ): RDD[R] = { val leaderMap = leaders .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port)) .toMap - new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) } + /** - * This stream can guarantee that each message from Kafka is included in transformations - * (as opposed to output actions) exactly once, even in most failure situations. + * Create a RDD from Kafka using offset ranges for each topic and partition. * - * Points to note: - * - * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them - * as the fromOffsets parameter on restart. - * Kafka must have sufficient log retention to obtain messages after failure. - * - * Getting offsets from the stream - see programming guide + * @param jsc JavaSparkContext object + * @param kafkaParams Kafka + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]]( + jsc: JavaSparkContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + kafkaParams: JMap[String, String], + offsetRanges: Array[OffsetRange] + ): JavaPairRDD[K, V] = { + implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) + implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) + implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) + new JavaPairRDD(createRDD[K, V, KD, VD]( + jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges)) + } + + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you + * specify the Kafka leader to connect to (to optimize fetching) and access the message as well + * as the metadata. * -. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors - * that depend on Zookeeper, you must store offsets in ZK yourself. + * @param jsc JavaSparkContext object + * @param kafkaParams Kafka + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler Function for translating each message and metadata into the desired type + */ + @Experimental + def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( + jsc: JavaSparkContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + recordClass: Class[R], + kafkaParams: JMap[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: JFunction[MessageAndMetadata[K, V], R] + ): JavaRDD[R] = { + implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) + implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) + implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) + implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) + createRDD[K, V, KD, VD, R]( + jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _) + } + + /** + * :: Experimental :: + * Create an input stream that directly pulls messages from Kafka Brokers + * without using any receiver. This stream can guarantee that each message + * from Kafka is included in transformations exactly once (see points below). * - * End-to-end semantics - This does not guarantee that any output operation will push each record - * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and - * outputting exactly once), you have to either ensure that the output operation is - * idempotent, or transactionally store offsets with the output. See the programming guide for - * more details. + * Points to note: + * - No receivers: This stream does not use any receiver. It directly queries Kafka + * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked + * by the stream itself. For interoperability with Kafka monitoring tools that depend on + * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + * You can access the offsets used in each batch from the generated RDDs (see + * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). + * - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * in the [[StreamingContext]]. The information on consumed offset can be + * recovered from the checkpoint. See the programming guide for details (constraints, etc.). + * - End-to-end semantics: This stream ensures that every records is effectively received and + * transformed exactly once, but gives no guarantees on whether the transformed data are + * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure + * that the output operation is idempotent, or use transactions to output records atomically. + * See the programming guide for more details. * * @param ssc StreamingContext object * @param kafkaParams Kafka - * configuration parameters. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. - * @param messageHandler function for translating each message into the desired type - * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) - * starting point of the stream + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. + * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler Function for translating each message and metadata into the desired type */ @Experimental def createDirectStream[ K: ClassTag, V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag, + KD <: Decoder[K]: ClassTag, + VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R ): InputDStream[R] = { - new DirectKafkaInputDStream[K, V, U, T, R]( + new DirectKafkaInputDStream[K, V, KD, VD, R]( ssc, kafkaParams, fromOffsets, messageHandler) } /** - * This stream can guarantee that each message from Kafka is included in transformations - * (as opposed to output actions) exactly once, even in most failure situations. + * :: Experimental :: + * Create an input stream that directly pulls messages from Kafka Brokers + * without using any receiver. This stream can guarantee that each message + * from Kafka is included in transformations exactly once (see points below). * * Points to note: - * - * Failure Recovery - You must checkpoint this stream. - * Kafka must have sufficient log retention to obtain messages after failure. - * - * Getting offsets from the stream - see programming guide - * -. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors - * that depend on Zookeeper, you must store offsets in ZK yourself. - * - * End-to-end semantics - This does not guarantee that any output operation will push each record - * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and - * outputting exactly once), you have to ensure that the output operation is idempotent. + * - No receivers: This stream does not use any receiver. It directly queries Kafka + * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked + * by the stream itself. For interoperability with Kafka monitoring tools that depend on + * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + * You can access the offsets used in each batch from the generated RDDs (see + * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). + * - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * in the [[StreamingContext]]. The information on consumed offset can be + * recovered from the checkpoint. See the programming guide for details (constraints, etc.). + * - End-to-end semantics: This stream ensures that every records is effectively received and + * transformed exactly once, but gives no guarantees on whether the transformed data are + * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure + * that the output operation is idempotent, or use transactions to output records atomically. + * See the programming guide for more details. * * @param ssc StreamingContext object * @param kafkaParams Kafka - * configuration parameters. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. - * If starting without a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers), specified in + * host1:port1,host2:port2 form. + * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" * to determine where the stream starts (defaults to "largest") - * @param topics names of the topics to consume + * @param topics Names of the topics to consume */ @Experimental def createDirectStream[ K: ClassTag, V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag] ( + KD <: Decoder[K]: ClassTag, + VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String] @@ -313,11 +391,128 @@ object KafkaUtils { val fromOffsets = leaderOffsets.map { case (tp, lo) => (tp, lo.offset) } - new DirectKafkaInputDStream[K, V, U, T, (K, V)]( + new DirectKafkaInputDStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, fromOffsets, messageHandler) }).fold( errs => throw new SparkException(errs.mkString("\n")), ok => ok ) } + + /** + * :: Experimental :: + * Create an input stream that directly pulls messages from Kafka Brokers + * without using any receiver. This stream can guarantee that each message + * from Kafka is included in transformations exactly once (see points below). + * + * Points to note: + * - No receivers: This stream does not use any receiver. It directly queries Kafka + * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked + * by the stream itself. For interoperability with Kafka monitoring tools that depend on + * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + * You can access the offsets used in each batch from the generated RDDs (see + * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). + * - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * in the [[StreamingContext]]. The information on consumed offset can be + * recovered from the checkpoint. See the programming guide for details (constraints, etc.). + * - End-to-end semantics: This stream ensures that every records is effectively received and + * transformed exactly once, but gives no guarantees on whether the transformed data are + * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure + * that the output operation is idempotent, or use transactions to output records atomically. + * See the programming guide for more details. + * + * @param jssc JavaStreamingContext object + * @param keyClass Class of the keys in the Kafka records + * @param valueClass Class of the values in the Kafka records + * @param keyDecoderClass Class of the key decoder + * @param valueDecoderClass Class of the value decoder + * @param recordClass Class of the records in DStream + * @param kafkaParams Kafka + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers), specified in + * host1:port1,host2:port2 form. + * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler Function for translating each message and metadata into the desired type + */ + @Experimental + def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( + jssc: JavaStreamingContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + recordClass: Class[R], + kafkaParams: JMap[String, String], + fromOffsets: JMap[TopicAndPartition, JLong], + messageHandler: JFunction[MessageAndMetadata[K, V], R] + ): JavaInputDStream[R] = { + implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) + implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) + implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) + implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) + createDirectStream[K, V, KD, VD, R]( + jssc.ssc, + Map(kafkaParams.toSeq: _*), + Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*), + messageHandler.call _ + ) + } + + /** + * :: Experimental :: + * Create an input stream that directly pulls messages from Kafka Brokers + * without using any receiver. This stream can guarantee that each message + * from Kafka is included in transformations exactly once (see points below). + * + * Points to note: + * - No receivers: This stream does not use any receiver. It directly queries Kafka + * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked + * by the stream itself. For interoperability with Kafka monitoring tools that depend on + * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + * You can access the offsets used in each batch from the generated RDDs (see + * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). + * - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * in the [[StreamingContext]]. The information on consumed offset can be + * recovered from the checkpoint. See the programming guide for details (constraints, etc.). + * - End-to-end semantics: This stream ensures that every records is effectively received and + * transformed exactly once, but gives no guarantees on whether the transformed data are + * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure + * that the output operation is idempotent, or use transactions to output records atomically. + * See the programming guide for more details. + * + * @param jssc JavaStreamingContext object + * @param keyClass Class of the keys in the Kafka records + * @param valueClass Class of the values in the Kafka records + * @param keyDecoderClass Class of the key decoder + * @param valueDecoderClass Class type of the value decoder + * @param kafkaParams Kafka + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers), specified in + * host1:port1,host2:port2 form. + * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" + * to determine where the stream starts (defaults to "largest") + * @param topics Names of the topics to consume + */ + @Experimental + def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( + jssc: JavaStreamingContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + kafkaParams: JMap[String, String], + topics: JSet[String] + ): JavaPairInputDStream[K, V] = { + implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) + implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) + implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) + createDirectStream[K, V, KD, VD]( + jssc.ssc, + Map(kafkaParams.toSeq: _*), + Set(topics.toSeq: _*) + ) + } } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala index 3454d92e72..c129a26836 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala @@ -19,17 +19,28 @@ package org.apache.spark.streaming.kafka import kafka.common.TopicAndPartition -/** Host info for the leader of a Kafka TopicAndPartition */ +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Represent the host info for the leader of a Kafka partition. + */ +@Experimental final class Leader private( - /** kafka topic name */ + /** Kafka topic name */ val topic: String, - /** kafka partition id */ + /** Kafka partition id */ val partition: Int, - /** kafka hostname */ + /** Leader's hostname */ val host: String, - /** kafka host's port */ + /** Leader's port */ val port: Int) extends Serializable +/** + * :: Experimental :: + * Companion object the provides methods to create instances of [[Leader]]. + */ +@Experimental object Leader { def create(topic: String, partition: Int, host: String, port: Int): Leader = new Leader(topic, partition, host, port) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala index 334c12e462..9c3dfeb8f5 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala @@ -19,16 +19,35 @@ package org.apache.spark.streaming.kafka import kafka.common.TopicAndPartition -/** Something that has a collection of OffsetRanges */ +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Represents any object that has a collection of [[OffsetRange]]s. This can be used access the + * offset ranges in RDDs generated by the direct Kafka DStream (see + * [[KafkaUtils.createDirectStream()]]). + * {{{ + * KafkaUtils.createDirectStream(...).foreachRDD { rdd => + * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + * ... + * } + * }}} + */ +@Experimental trait HasOffsetRanges { def offsetRanges: Array[OffsetRange] } -/** Represents a range of offsets from a single Kafka TopicAndPartition */ +/** + * :: Experimental :: + * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class + * can be created with `OffsetRange.create()`. + */ +@Experimental final class OffsetRange private( - /** kafka topic name */ + /** Kafka topic name */ val topic: String, - /** kafka partition id */ + /** Kafka partition id */ val partition: Int, /** inclusive starting offset */ val fromOffset: Long, @@ -36,11 +55,33 @@ final class OffsetRange private( val untilOffset: Long) extends Serializable { import OffsetRange.OffsetRangeTuple + override def equals(obj: Any): Boolean = obj match { + case that: OffsetRange => + this.topic == that.topic && + this.partition == that.partition && + this.fromOffset == that.fromOffset && + this.untilOffset == that.untilOffset + case _ => false + } + + override def hashCode(): Int = { + toTuple.hashCode() + } + + override def toString(): String = { + s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset]" + } + /** this is to avoid ClassNotFoundException during checkpoint restore */ private[streaming] def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset) } +/** + * :: Experimental :: + * Companion object the provides methods to create instances of [[OffsetRange]]. + */ +@Experimental object OffsetRange { def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = new OffsetRange(topic, partition, fromOffset, untilOffset) @@ -61,10 +102,10 @@ object OffsetRange { new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset) /** this is to avoid ClassNotFoundException during checkpoint restore */ - private[spark] + private[kafka] type OffsetRangeTuple = (String, Int, Long, Long) - private[streaming] + private[kafka] def apply(t: OffsetRangeTuple) = new OffsetRange(t._1, t._2, t._3, t._4) } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java new file mode 100644 index 0000000000..1334cc8fd1 --- /dev/null +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Random; +import java.util.Arrays; + +import org.apache.spark.SparkConf; + +import scala.Tuple2; + +import junit.framework.Assert; + +import kafka.common.TopicAndPartition; +import kafka.message.MessageAndMetadata; +import kafka.serializer.StringDecoder; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + +import org.junit.Test; +import org.junit.After; +import org.junit.Before; + +public class JavaDirectKafkaStreamSuite implements Serializable { + private transient JavaStreamingContext ssc = null; + private transient Random random = new Random(); + private transient KafkaStreamSuiteBase suiteBase = null; + + @Before + public void setUp() { + suiteBase = new KafkaStreamSuiteBase() { }; + suiteBase.setupKafka(); + System.clearProperty("spark.driver.port"); + SparkConf sparkConf = new SparkConf() + .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); + ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200)); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + System.clearProperty("spark.driver.port"); + suiteBase.tearDownKafka(); + } + + @Test + public void testKafkaStream() throws InterruptedException { + String topic1 = "topic1"; + String topic2 = "topic2"; + + String[] topic1data = createTopicAndSendData(topic1); + String[] topic2data = createTopicAndSendData(topic2); + + HashSet sent = new HashSet(); + sent.addAll(Arrays.asList(topic1data)); + sent.addAll(Arrays.asList(topic2data)); + + HashMap kafkaParams = new HashMap(); + kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress()); + kafkaParams.put("auto.offset.reset", "smallest"); + + JavaDStream stream1 = KafkaUtils.createDirectStream( + ssc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + kafkaParams, + topicToSet(topic1) + ).map( + new Function, String>() { + @Override + public String call(scala.Tuple2 kv) throws Exception { + return kv._2(); + } + } + ); + + JavaDStream stream2 = KafkaUtils.createDirectStream( + ssc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + String.class, + kafkaParams, + topicOffsetToMap(topic2, (long) 0), + new Function, String>() { + @Override + public String call(MessageAndMetadata msgAndMd) throws Exception { + return msgAndMd.message(); + } + } + ); + JavaDStream unifiedStream = stream1.union(stream2); + + final HashSet result = new HashSet(); + unifiedStream.foreachRDD( + new Function, Void>() { + @Override + public Void call(org.apache.spark.api.java.JavaRDD rdd) throws Exception { + result.addAll(rdd.collect()); + return null; + } + } + ); + ssc.start(); + long startTime = System.currentTimeMillis(); + boolean matches = false; + while (!matches && System.currentTimeMillis() - startTime < 20000) { + matches = sent.size() == result.size(); + Thread.sleep(50); + } + Assert.assertEquals(sent, result); + ssc.stop(); + } + + private HashSet topicToSet(String topic) { + HashSet topicSet = new HashSet(); + topicSet.add(topic); + return topicSet; + } + + private HashMap topicOffsetToMap(String topic, Long offsetToStart) { + HashMap topicMap = new HashMap(); + topicMap.put(new TopicAndPartition(topic, 0), offsetToStart); + return topicMap; + } + + private String[] createTopicAndSendData(String topic) { + String[] data = { topic + "-1", topic + "-2", topic + "-3"}; + suiteBase.createTopic(topic); + suiteBase.sendMessages(topic, data); + return data; + } +} diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 6e1abf3f38..208cc51b29 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -79,9 +79,10 @@ public class JavaKafkaStreamSuite implements Serializable { suiteBase.createTopic(topic); HashMap tmp = new HashMap(sent); - suiteBase.produceAndSendMessage(topic, + suiteBase.sendMessages(topic, JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap( - Predef.>conforms())); + Predef.>conforms()) + ); HashMap kafkaParams = new HashMap(); kafkaParams.put("zookeeper.connect", suiteBase.zkAddress()); diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala new file mode 100644 index 0000000000..b25c2120d5 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.io.File + +import scala.collection.mutable +import scala.concurrent.duration._ +import scala.language.postfixOps + +import kafka.serializer.StringDecoder +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import org.scalatest.concurrent.{Eventually, Timeouts} + +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} +import org.apache.spark.streaming.dstream.{DStream, InputDStream} +import org.apache.spark.util.Utils +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata + +class DirectKafkaStreamSuite extends KafkaStreamSuiteBase + with BeforeAndAfter with BeforeAndAfterAll with Eventually { + val sparkConf = new SparkConf() + .setMaster("local[4]") + .setAppName(this.getClass.getSimpleName) + + var sc: SparkContext = _ + var ssc: StreamingContext = _ + var testDir: File = _ + + override def beforeAll { + setupKafka() + } + + override def afterAll { + tearDownKafka() + } + + after { + if (ssc != null) { + ssc.stop() + sc = null + } + if (sc != null) { + sc.stop() + } + if (testDir != null) { + Utils.deleteRecursively(testDir) + } + } + + + test("basic stream receiving with multiple topics and smallest starting offset") { + val topics = Set("basic1", "basic2", "basic3") + val data = Map("a" -> 7, "b" -> 9) + topics.foreach { t => + createTopic(t) + sendMessages(t, data) + } + val kafkaParams = Map( + "metadata.broker.list" -> s"$brokerAddress", + "auto.offset.reset" -> "smallest" + ) + + ssc = new StreamingContext(sparkConf, Milliseconds(200)) + val stream = withClue("Error creating direct stream") { + KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topics) + } + var total = 0L + + stream.foreachRDD { rdd => + // Get the offset ranges in the RDD + val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + val collected = rdd.mapPartitionsWithIndex { (i, iter) => + // For each partition, get size of the range in the partition, + // and the number of items in the partition + val off = offsets(i) + val all = iter.toSeq + val partSize = all.size + val rangeSize = off.untilOffset - off.fromOffset + Iterator((partSize, rangeSize)) + }.collect + + // Verify whether number of elements in each partition + // matches with the corresponding offset range + collected.foreach { case (partSize, rangeSize) => + assert(partSize === rangeSize, "offset ranges are wrong") + } + total += collected.size // Add up all the collected items + } + ssc.start() + eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { + assert(total === data.values.sum * topics.size, "didn't get all messages") + } + ssc.stop() + } + + test("receiving from largest starting offset") { + val topic = "largest" + val topicPartition = TopicAndPartition(topic, 0) + val data = Map("a" -> 10) + createTopic(topic) + val kafkaParams = Map( + "metadata.broker.list" -> s"$brokerAddress", + "auto.offset.reset" -> "largest" + ) + val kc = new KafkaCluster(kafkaParams) + def getLatestOffset(): Long = { + kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset + } + + // Send some initial messages before starting context + sendMessages(topic, data) + eventually(timeout(10 seconds), interval(20 milliseconds)) { + assert(getLatestOffset() > 3) + } + val offsetBeforeStart = getLatestOffset() + + // Setup context and kafka stream with largest offset + ssc = new StreamingContext(sparkConf, Milliseconds(200)) + val stream = withClue("Error creating direct stream") { + KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, Set(topic)) + } + assert( + stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]] + .fromOffsets(topicPartition) >= offsetBeforeStart, + "Start offset not from latest" + ) + + val collectedData = new mutable.ArrayBuffer[String]() + stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() } + ssc.start() + val newData = Map("b" -> 10) + sendMessages(topic, newData) + eventually(timeout(10 seconds), interval(50 milliseconds)) { + collectedData.contains("b") + } + assert(!collectedData.contains("a")) + } + + + test("creating stream by offset") { + val topic = "offset" + val topicPartition = TopicAndPartition(topic, 0) + val data = Map("a" -> 10) + createTopic(topic) + val kafkaParams = Map( + "metadata.broker.list" -> s"$brokerAddress", + "auto.offset.reset" -> "largest" + ) + val kc = new KafkaCluster(kafkaParams) + def getLatestOffset(): Long = { + kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset + } + + // Send some initial messages before starting context + sendMessages(topic, data) + eventually(timeout(10 seconds), interval(20 milliseconds)) { + assert(getLatestOffset() >= 10) + } + val offsetBeforeStart = getLatestOffset() + + // Setup context and kafka stream with largest offset + ssc = new StreamingContext(sparkConf, Milliseconds(200)) + val stream = withClue("Error creating direct stream") { + KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String]( + ssc, kafkaParams, Map(topicPartition -> 11L), + (m: MessageAndMetadata[String, String]) => m.message()) + } + assert( + stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]] + .fromOffsets(topicPartition) >= offsetBeforeStart, + "Start offset not from latest" + ) + + val collectedData = new mutable.ArrayBuffer[String]() + stream.foreachRDD { rdd => collectedData ++= rdd.collect() } + ssc.start() + val newData = Map("b" -> 10) + sendMessages(topic, newData) + eventually(timeout(10 seconds), interval(50 milliseconds)) { + collectedData.contains("b") + } + assert(!collectedData.contains("a")) + } + + // Test to verify the offset ranges can be recovered from the checkpoints + test("offset recovery") { + val topic = "recovery" + createTopic(topic) + testDir = Utils.createTempDir() + + val kafkaParams = Map( + "metadata.broker.list" -> s"$brokerAddress", + "auto.offset.reset" -> "smallest" + ) + + // Send data to Kafka and wait for it to be received + def sendDataAndWaitForReceive(data: Seq[Int]) { + val strings = data.map { _.toString} + sendMessages(topic, strings.map { _ -> 1}.toMap) + eventually(timeout(10 seconds), interval(50 milliseconds)) { + assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains }) + } + } + + // Setup the streaming context + ssc = new StreamingContext(sparkConf, Milliseconds(100)) + val kafkaStream = withClue("Error creating direct stream") { + KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, Set(topic)) + } + val keyedStream = kafkaStream.map { v => "key" -> v._2.toInt } + val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) => + Some(values.sum + state.getOrElse(0)) + } + ssc.checkpoint(testDir.getAbsolutePath) + + // This is to collect the raw data received from Kafka + kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => + val data = rdd.map { _._2 }.collect() + DirectKafkaStreamSuite.collectedData.appendAll(data) + } + + // This is ensure all the data is eventually receiving only once + stateStream.foreachRDD { (rdd: RDD[(String, Int)]) => + rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = x._2 } + } + ssc.start() + + // Send some data and wait for them to be received + for (i <- (1 to 10).grouped(4)) { + sendDataAndWaitForReceive(i) + } + + // Verify that offset ranges were generated + val offsetRangesBeforeStop = getOffsetRanges(kafkaStream) + assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated") + assert( + offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 }, + "starting offset not zero" + ) + ssc.stop() + logInfo("====== RESTARTING ========") + + // Recover context from checkpoints + ssc = new StreamingContext(testDir.getAbsolutePath) + val recoveredStream = ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]] + + // Verify offset ranges have been recovered + val recoveredOffsetRanges = getOffsetRanges(recoveredStream) + assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered") + val earlierOffsetRangesAsSets = offsetRangesBeforeStop.map { x => (x._1, x._2.toSet) } + assert( + recoveredOffsetRanges.forall { or => + earlierOffsetRangesAsSets.contains((or._1, or._2.toSet)) + }, + "Recovered ranges are not the same as the ones generated" + ) + + // Restart context, give more data and verify the total at the end + // If the total is write that means each records has been received only once + ssc.start() + sendDataAndWaitForReceive(11 to 20) + eventually(timeout(10 seconds), interval(50 milliseconds)) { + assert(DirectKafkaStreamSuite.total === (1 to 20).sum) + } + ssc.stop() + } + + /** Get the generated offset ranges from the DirectKafkaStream */ + private def getOffsetRanges[K, V]( + kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = { + kafkaStream.generatedRDDs.mapValues { rdd => + rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges + }.toSeq.sortBy { _._1 } + } +} + +object DirectKafkaStreamSuite { + val collectedData = new mutable.ArrayBuffer[String]() + var total = -1L +} diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala index e57c8f6987..fc9275b720 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala @@ -19,33 +19,29 @@ package org.apache.spark.streaming.kafka import scala.util.Random -import org.scalatest.BeforeAndAfter import kafka.common.TopicAndPartition +import org.scalatest.BeforeAndAfterAll -class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfter { - val brokerHost = "localhost" - - val kafkaParams = Map("metadata.broker.list" -> s"$brokerHost:$brokerPort") - - val kc = new KafkaCluster(kafkaParams) - +class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll { val topic = "kcsuitetopic" + Random.nextInt(10000) - val topicAndPartition = TopicAndPartition(topic, 0) + var kc: KafkaCluster = null - before { + override def beforeAll() { setupKafka() createTopic(topic) - produceAndSendMessage(topic, Map("a" -> 1)) + sendMessages(topic, Map("a" -> 1)) + kc = new KafkaCluster(Map("metadata.broker.list" -> s"$brokerAddress")) } - after { + override def afterAll() { tearDownKafka() } test("metadata apis") { - val leader = kc.findLeaders(Set(topicAndPartition)).right.get - assert(leader(topicAndPartition) === (brokerHost, brokerPort), "didn't get leader") + val leader = kc.findLeaders(Set(topicAndPartition)).right.get(topicAndPartition) + val leaderAddress = s"${leader._1}:${leader._2}" + assert(leaderAddress === brokerAddress, "didn't get leader") val parts = kc.getPartitions(Set(topic)).right.get assert(parts(topicAndPartition), "didn't get partitions") diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala deleted file mode 100644 index 0891ce344f..0000000000 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import scala.util.Random -import scala.concurrent.duration._ - -import org.scalatest.BeforeAndAfter -import org.scalatest.concurrent.Eventually - -import kafka.serializer.StringDecoder - -import org.apache.spark.SparkConf -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Milliseconds, StreamingContext} - -class KafkaDirectStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually { - val sparkConf = new SparkConf() - .setMaster("local[4]") - .setAppName(this.getClass.getSimpleName) - - val brokerHost = "localhost" - - val kafkaParams = Map( - "metadata.broker.list" -> s"$brokerHost:$brokerPort", - "auto.offset.reset" -> "smallest" - ) - - var ssc: StreamingContext = _ - - before { - setupKafka() - - ssc = new StreamingContext(sparkConf, Milliseconds(500)) - } - - after { - if (ssc != null) { - ssc.stop() - } - tearDownKafka() - } - - test("multi topic stream") { - val topics = Set("newA", "newB") - val data = Map("a" -> 7, "b" -> 9) - topics.foreach { t => - createTopic(t) - produceAndSendMessage(t, data) - } - val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, topics) - var total = 0L; - - stream.foreachRDD { rdd => - val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges - val collected = rdd.mapPartitionsWithIndex { (i, iter) => - val off = offsets(i) - val all = iter.toSeq - val partSize = all.size - val rangeSize = off.untilOffset - off.fromOffset - all.map { _ => - (partSize, rangeSize) - }.toIterator - }.collect - collected.foreach { case (partSize, rangeSize) => - assert(partSize === rangeSize, "offset ranges are wrong") - } - total += collected.size - } - ssc.start() - eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { - assert(total === data.values.sum * topics.size, "didn't get all messages") - } - ssc.stop() - } -} diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index 9b9e3f5fce..6774db854a 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -46,9 +46,9 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { val topic = "topic1" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) createTopic(topic) - produceAndSendMessage(topic, sent) + sendMessages(topic, sent) - val kafkaParams = Map("metadata.broker.list" -> s"localhost:$brokerPort", + val kafkaParams = Map("metadata.broker.list" -> brokerAddress, "group.id" -> s"test-consumer-${Random.nextInt(10000)}") val kc = new KafkaCluster(kafkaParams) @@ -65,14 +65,14 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { val rdd2 = getRdd(kc, Set(topic)) val sent2 = Map("d" -> 1) - produceAndSendMessage(topic, sent2) + sendMessages(topic, sent2) // this is the "0 messages" case // make sure we dont get anything, since messages were sent after rdd was defined assert(rdd2.isDefined) assert(rdd2.get.count === 0) val rdd3 = getRdd(kc, Set(topic)) - produceAndSendMessage(topic, Map("extra" -> 22)) + sendMessages(topic, Map("extra" -> 22)) // this is the "exactly 1 message" case // make sure we get exactly one message, despite there being lots more available assert(rdd3.isDefined) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index f207dc6d4f..e4966eebb9 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -48,30 +48,41 @@ import org.apache.spark.util.Utils */ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging { - var zkAddress: String = _ - var zkClient: ZkClient = _ - private val zkHost = "localhost" + private var zkPort: Int = 0 private val zkConnectionTimeout = 6000 private val zkSessionTimeout = 6000 private var zookeeper: EmbeddedZookeeper = _ - private var zkPort: Int = 0 - protected var brokerPort = 9092 + private val brokerHost = "localhost" + private var brokerPort = 9092 private var brokerConf: KafkaConfig = _ private var server: KafkaServer = _ private var producer: Producer[String, String] = _ + private var zkReady = false + private var brokerReady = false + + protected var zkClient: ZkClient = _ + + def zkAddress: String = { + assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address") + s"$zkHost:$zkPort" + } + + def brokerAddress: String = { + assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address") + s"$brokerHost:$brokerPort" + } def setupKafka() { // Zookeeper server startup zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") // Get the actual zookeeper binding port zkPort = zookeeper.actualPort - zkAddress = s"$zkHost:$zkPort" - logInfo("==================== 0 ====================") + zkReady = true + logInfo("==================== Zookeeper Started ====================") - zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout, - ZKStringSerializer) - logInfo("==================== 1 ====================") + zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) + logInfo("==================== Zookeeper Client Created ====================") // Kafka broker startup var bindSuccess: Boolean = false @@ -80,9 +91,8 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin val brokerProps = getBrokerConfig() brokerConf = new KafkaConfig(brokerProps) server = new KafkaServer(brokerConf) - logInfo("==================== 2 ====================") server.startup() - logInfo("==================== 3 ====================") + logInfo("==================== Kafka Broker Started ====================") bindSuccess = true } catch { case e: KafkaException => @@ -94,10 +104,13 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin } Thread.sleep(2000) - logInfo("==================== 4 ====================") + logInfo("==================== Kafka + Zookeeper Ready ====================") + brokerReady = true } def tearDownKafka() { + brokerReady = false + zkReady = false if (producer != null) { producer.close() producer = null @@ -121,26 +134,23 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin } } - private def createTestMessage(topic: String, sent: Map[String, Int]) - : Seq[KeyedMessage[String, String]] = { - val messages = for ((s, freq) <- sent; i <- 0 until freq) yield { - new KeyedMessage[String, String](topic, s) - } - messages.toSeq - } - def createTopic(topic: String) { AdminUtils.createTopic(zkClient, topic, 1, 1) - logInfo("==================== 5 ====================") // wait until metadata is propagated waitUntilMetadataIsPropagated(topic, 0) + logInfo(s"==================== Topic $topic Created ====================") } - def produceAndSendMessage(topic: String, sent: Map[String, Int]) { + def sendMessages(topic: String, messageToFreq: Map[String, Int]) { + val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray + sendMessages(topic, messages) + } + + def sendMessages(topic: String, messages: Array[String]) { producer = new Producer[String, String](new ProducerConfig(getProducerConfig())) - producer.send(createTestMessage(topic, sent): _*) + producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*) producer.close() - logInfo("==================== 6 ====================") + logInfo(s"==================== Sent Messages: ${messages.mkString(", ")} ====================") } private def getBrokerConfig(): Properties = { @@ -218,7 +228,7 @@ class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter { val topic = "topic1" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) createTopic(topic) - produceAndSendMessage(topic, sent) + sendMessages(topic, sent) val kafkaParams = Map("zookeeper.connect" -> zkAddress, "group.id" -> s"test-consumer-${Random.nextInt(10000)}", diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index 64ccc92c81..fc53c23abd 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -79,7 +79,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter test("Reliable Kafka input stream with single topic") { var topic = "test-topic" createTopic(topic) - produceAndSendMessage(topic, data) + sendMessages(topic, data) // Verify whether the offset of this group/topic/partition is 0 before starting. assert(getCommitOffset(groupId, topic, 0) === None) @@ -111,7 +111,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1) topics.foreach { case (t, _) => createTopic(t) - produceAndSendMessage(t, data) + sendMessages(t, data) } // Before started, verify all the group/topic/partition offsets are 0. -- cgit v1.2.3