diff options
Diffstat (limited to 'external/kafka/src/main/scala/org/apache')
7 files changed, 348 insertions, 122 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index c7bca43eb8..04e65cb3d7 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -50,14 +50,13 @@ import org.apache.spark.streaming.dstream._ * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) * starting point of the stream * @param messageHandler function for translating each message into the desired type - * @param maxRetries maximum number of times in a row to retry getting leaders' offsets */ private[streaming] class DirectKafkaInputDStream[ K: ClassTag, V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag, + U <: Decoder[K]: ClassTag, + T <: Decoder[V]: ClassTag, R: ClassTag]( @transient ssc_ : StreamingContext, val kafkaParams: Map[String, String], diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala index ccc62bfe8f..2f7e0ab39f 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala @@ -332,6 +332,9 @@ object KafkaCluster { extends ConsumerConfig(originalProps) { val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp => val hpa = hp.split(":") + if (hpa.size == 1) { + throw new SparkException(s"Broker not the in correct format of <host>:<port> [$brokers]") + } (hpa(0), hpa(1).toInt) } } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala index 50bf7cbdb8..d56cc01be9 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala @@ -36,14 +36,12 @@ import kafka.utils.VerifiableProperties * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> - * configuration parameters</a>. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. - * @param batch Each KafkaRDDPartition in the batch corresponds to a - * range of offsets for a given Kafka topic/partition + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD * @param messageHandler function for translating each message into the desired type */ -private[spark] +private[kafka] class KafkaRDD[ K: ClassTag, V: ClassTag, @@ -183,7 +181,7 @@ class KafkaRDD[ } } -private[spark] +private[kafka] object KafkaRDD { import KafkaCluster.LeaderOffset diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala index 36372e08f6..a842a6f177 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala @@ -26,7 +26,7 @@ import org.apache.spark.Partition * @param host preferred kafka host, i.e. the leader at the time the rdd was created * @param port preferred kafka host's port */ -private[spark] +private[kafka] class KafkaRDDPartition( val index: Int, val topic: String, @@ -36,24 +36,3 @@ class KafkaRDDPartition( val host: String, val port: Int ) extends Partition - -private[spark] -object KafkaRDDPartition { - def apply( - index: Int, - topic: String, - partition: Int, - fromOffset: Long, - untilOffset: Long, - host: String, - port: Int - ): KafkaRDDPartition = new KafkaRDDPartition( - index, - topic, - partition, - fromOffset, - untilOffset, - host, - port - ) -} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index f8aa6c5c62..7a2c3abdcc 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -18,7 +18,9 @@ package org.apache.spark.streaming.kafka import java.lang.{Integer => JInt} +import java.lang.{Long => JLong} import java.util.{Map => JMap} +import java.util.{Set => JSet} import scala.reflect.ClassTag import scala.collection.JavaConversions._ @@ -27,18 +29,19 @@ import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.{Decoder, StringDecoder} - +import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.api.java.{JavaPairInputDStream, JavaInputDStream, JavaPairReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream} +import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} object KafkaUtils { /** - * Create an input stream that pulls messages from a Kafka Broker. + * Create an input stream that pulls messages from Kafka Brokers. * @param ssc StreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) * @param groupId The group id for this consumer @@ -62,7 +65,7 @@ object KafkaUtils { } /** - * Create an input stream that pulls messages from a Kafka Broker. + * Create an input stream that pulls messages from Kafka Brokers. * @param ssc StreamingContext object * @param kafkaParams Map of kafka configuration parameters, * see http://kafka.apache.org/08/configuration.html @@ -81,7 +84,7 @@ object KafkaUtils { } /** - * Create an input stream that pulls messages from a Kafka Broker. + * Create an input stream that pulls messages from Kafka Brokers. * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) @@ -99,7 +102,7 @@ object KafkaUtils { } /** - * Create an input stream that pulls messages from a Kafka Broker. + * Create an input stream that pulls messages from Kafka Brokers. * @param jssc JavaStreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. @@ -119,10 +122,10 @@ object KafkaUtils { } /** - * Create an input stream that pulls messages from a Kafka Broker. + * Create an input stream that pulls messages from Kafka Brokers. * @param jssc JavaStreamingContext object - * @param keyTypeClass Key type of RDD - * @param valueTypeClass value type of RDD + * @param keyTypeClass Key type of DStream + * @param valueTypeClass value type of Dstream * @param keyDecoderClass Type of kafka key decoder * @param valueDecoderClass Type of kafka value decoder * @param kafkaParams Map of kafka configuration parameters, @@ -151,14 +154,14 @@ object KafkaUtils { jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } - /** A batch-oriented interface for consuming from Kafka. - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. + /** + * Create a RDD from Kafka using offset ranges for each topic and partition. + * * @param sc SparkContext object * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> - * configuration parameters</a>. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition */ @@ -166,12 +169,12 @@ object KafkaUtils { def createRDD[ K: ClassTag, V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag] ( + KD <: Decoder[K]: ClassTag, + VD <: Decoder[V]: ClassTag]( sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange] - ): RDD[(K, V)] = { + ): RDD[(K, V)] = { val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) val kc = new KafkaCluster(kafkaParams) val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet @@ -179,121 +182,196 @@ object KafkaUtils { errs => throw new SparkException(errs.mkString("\n")), ok => ok ) - new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) } - /** A batch-oriented interface for consuming from Kafka. - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you + * specify the Kafka leader to connect to (to optimize fetching) and access the message as well + * as the metadata. + * * @param sc SparkContext object * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> - * configuration parameters</a>. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition * @param leaders Kafka leaders for each offset range in batch - * @param messageHandler function for translating each message into the desired type + * @param messageHandler Function for translating each message and metadata into the desired type */ @Experimental def createRDD[ K: ClassTag, V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag, - R: ClassTag] ( + KD <: Decoder[K]: ClassTag, + VD <: Decoder[V]: ClassTag, + R: ClassTag]( sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange], leaders: Array[Leader], messageHandler: MessageAndMetadata[K, V] => R - ): RDD[R] = { - + ): RDD[R] = { val leaderMap = leaders .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port)) .toMap - new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) } + /** - * This stream can guarantee that each message from Kafka is included in transformations - * (as opposed to output actions) exactly once, even in most failure situations. + * Create a RDD from Kafka using offset ranges for each topic and partition. * - * Points to note: - * - * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them - * as the fromOffsets parameter on restart. - * Kafka must have sufficient log retention to obtain messages after failure. - * - * Getting offsets from the stream - see programming guide + * @param jsc JavaSparkContext object + * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]]( + jsc: JavaSparkContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + kafkaParams: JMap[String, String], + offsetRanges: Array[OffsetRange] + ): JavaPairRDD[K, V] = { + implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) + implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) + implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) + new JavaPairRDD(createRDD[K, V, KD, VD]( + jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges)) + } + + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you + * specify the Kafka leader to connect to (to optimize fetching) and access the message as well + * as the metadata. * -. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors - * that depend on Zookeeper, you must store offsets in ZK yourself. + * @param jsc JavaSparkContext object + * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler Function for translating each message and metadata into the desired type + */ + @Experimental + def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( + jsc: JavaSparkContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + recordClass: Class[R], + kafkaParams: JMap[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: JFunction[MessageAndMetadata[K, V], R] + ): JavaRDD[R] = { + implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) + implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) + implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) + implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) + createRDD[K, V, KD, VD, R]( + jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _) + } + + /** + * :: Experimental :: + * Create an input stream that directly pulls messages from Kafka Brokers + * without using any receiver. This stream can guarantee that each message + * from Kafka is included in transformations exactly once (see points below). * - * End-to-end semantics - This does not guarantee that any output operation will push each record - * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and - * outputting exactly once), you have to either ensure that the output operation is - * idempotent, or transactionally store offsets with the output. See the programming guide for - * more details. + * Points to note: + * - No receivers: This stream does not use any receiver. It directly queries Kafka + * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked + * by the stream itself. For interoperability with Kafka monitoring tools that depend on + * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + * You can access the offsets used in each batch from the generated RDDs (see + * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). + * - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * in the [[StreamingContext]]. The information on consumed offset can be + * recovered from the checkpoint. See the programming guide for details (constraints, etc.). + * - End-to-end semantics: This stream ensures that every records is effectively received and + * transformed exactly once, but gives no guarantees on whether the transformed data are + * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure + * that the output operation is idempotent, or use transactions to output records atomically. + * See the programming guide for more details. * * @param ssc StreamingContext object * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> - * configuration parameters</a>. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. - * @param messageHandler function for translating each message into the desired type - * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) - * starting point of the stream + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. + * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler Function for translating each message and metadata into the desired type */ @Experimental def createDirectStream[ K: ClassTag, V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag, + KD <: Decoder[K]: ClassTag, + VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R ): InputDStream[R] = { - new DirectKafkaInputDStream[K, V, U, T, R]( + new DirectKafkaInputDStream[K, V, KD, VD, R]( ssc, kafkaParams, fromOffsets, messageHandler) } /** - * This stream can guarantee that each message from Kafka is included in transformations - * (as opposed to output actions) exactly once, even in most failure situations. + * :: Experimental :: + * Create an input stream that directly pulls messages from Kafka Brokers + * without using any receiver. This stream can guarantee that each message + * from Kafka is included in transformations exactly once (see points below). * * Points to note: - * - * Failure Recovery - You must checkpoint this stream. - * Kafka must have sufficient log retention to obtain messages after failure. - * - * Getting offsets from the stream - see programming guide - * -. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors - * that depend on Zookeeper, you must store offsets in ZK yourself. - * - * End-to-end semantics - This does not guarantee that any output operation will push each record - * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and - * outputting exactly once), you have to ensure that the output operation is idempotent. + * - No receivers: This stream does not use any receiver. It directly queries Kafka + * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked + * by the stream itself. For interoperability with Kafka monitoring tools that depend on + * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + * You can access the offsets used in each batch from the generated RDDs (see + * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). + * - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * in the [[StreamingContext]]. The information on consumed offset can be + * recovered from the checkpoint. See the programming guide for details (constraints, etc.). + * - End-to-end semantics: This stream ensures that every records is effectively received and + * transformed exactly once, but gives no guarantees on whether the transformed data are + * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure + * that the output operation is idempotent, or use transactions to output records atomically. + * See the programming guide for more details. * * @param ssc StreamingContext object * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> - * configuration parameters</a>. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. - * If starting without a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers), specified in + * host1:port1,host2:port2 form. + * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" * to determine where the stream starts (defaults to "largest") - * @param topics names of the topics to consume + * @param topics Names of the topics to consume */ @Experimental def createDirectStream[ K: ClassTag, V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag] ( + KD <: Decoder[K]: ClassTag, + VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String] @@ -313,11 +391,128 @@ object KafkaUtils { val fromOffsets = leaderOffsets.map { case (tp, lo) => (tp, lo.offset) } - new DirectKafkaInputDStream[K, V, U, T, (K, V)]( + new DirectKafkaInputDStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, fromOffsets, messageHandler) }).fold( errs => throw new SparkException(errs.mkString("\n")), ok => ok ) } + + /** + * :: Experimental :: + * Create an input stream that directly pulls messages from Kafka Brokers + * without using any receiver. This stream can guarantee that each message + * from Kafka is included in transformations exactly once (see points below). + * + * Points to note: + * - No receivers: This stream does not use any receiver. It directly queries Kafka + * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked + * by the stream itself. For interoperability with Kafka monitoring tools that depend on + * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + * You can access the offsets used in each batch from the generated RDDs (see + * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). + * - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * in the [[StreamingContext]]. The information on consumed offset can be + * recovered from the checkpoint. See the programming guide for details (constraints, etc.). + * - End-to-end semantics: This stream ensures that every records is effectively received and + * transformed exactly once, but gives no guarantees on whether the transformed data are + * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure + * that the output operation is idempotent, or use transactions to output records atomically. + * See the programming guide for more details. + * + * @param jssc JavaStreamingContext object + * @param keyClass Class of the keys in the Kafka records + * @param valueClass Class of the values in the Kafka records + * @param keyDecoderClass Class of the key decoder + * @param valueDecoderClass Class of the value decoder + * @param recordClass Class of the records in DStream + * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers), specified in + * host1:port1,host2:port2 form. + * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler Function for translating each message and metadata into the desired type + */ + @Experimental + def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( + jssc: JavaStreamingContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + recordClass: Class[R], + kafkaParams: JMap[String, String], + fromOffsets: JMap[TopicAndPartition, JLong], + messageHandler: JFunction[MessageAndMetadata[K, V], R] + ): JavaInputDStream[R] = { + implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) + implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) + implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) + implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) + createDirectStream[K, V, KD, VD, R]( + jssc.ssc, + Map(kafkaParams.toSeq: _*), + Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*), + messageHandler.call _ + ) + } + + /** + * :: Experimental :: + * Create an input stream that directly pulls messages from Kafka Brokers + * without using any receiver. This stream can guarantee that each message + * from Kafka is included in transformations exactly once (see points below). + * + * Points to note: + * - No receivers: This stream does not use any receiver. It directly queries Kafka + * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked + * by the stream itself. For interoperability with Kafka monitoring tools that depend on + * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + * You can access the offsets used in each batch from the generated RDDs (see + * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). + * - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * in the [[StreamingContext]]. The information on consumed offset can be + * recovered from the checkpoint. See the programming guide for details (constraints, etc.). + * - End-to-end semantics: This stream ensures that every records is effectively received and + * transformed exactly once, but gives no guarantees on whether the transformed data are + * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure + * that the output operation is idempotent, or use transactions to output records atomically. + * See the programming guide for more details. + * + * @param jssc JavaStreamingContext object + * @param keyClass Class of the keys in the Kafka records + * @param valueClass Class of the values in the Kafka records + * @param keyDecoderClass Class of the key decoder + * @param valueDecoderClass Class type of the value decoder + * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers), specified in + * host1:port1,host2:port2 form. + * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" + * to determine where the stream starts (defaults to "largest") + * @param topics Names of the topics to consume + */ + @Experimental + def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( + jssc: JavaStreamingContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + kafkaParams: JMap[String, String], + topics: JSet[String] + ): JavaPairInputDStream[K, V] = { + implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) + implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) + implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) + createDirectStream[K, V, KD, VD]( + jssc.ssc, + Map(kafkaParams.toSeq: _*), + Set(topics.toSeq: _*) + ) + } } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala index 3454d92e72..c129a26836 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala @@ -19,17 +19,28 @@ package org.apache.spark.streaming.kafka import kafka.common.TopicAndPartition -/** Host info for the leader of a Kafka TopicAndPartition */ +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Represent the host info for the leader of a Kafka partition. + */ +@Experimental final class Leader private( - /** kafka topic name */ + /** Kafka topic name */ val topic: String, - /** kafka partition id */ + /** Kafka partition id */ val partition: Int, - /** kafka hostname */ + /** Leader's hostname */ val host: String, - /** kafka host's port */ + /** Leader's port */ val port: Int) extends Serializable +/** + * :: Experimental :: + * Companion object the provides methods to create instances of [[Leader]]. + */ +@Experimental object Leader { def create(topic: String, partition: Int, host: String, port: Int): Leader = new Leader(topic, partition, host, port) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala index 334c12e462..9c3dfeb8f5 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala @@ -19,16 +19,35 @@ package org.apache.spark.streaming.kafka import kafka.common.TopicAndPartition -/** Something that has a collection of OffsetRanges */ +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Represents any object that has a collection of [[OffsetRange]]s. This can be used access the + * offset ranges in RDDs generated by the direct Kafka DStream (see + * [[KafkaUtils.createDirectStream()]]). + * {{{ + * KafkaUtils.createDirectStream(...).foreachRDD { rdd => + * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + * ... + * } + * }}} + */ +@Experimental trait HasOffsetRanges { def offsetRanges: Array[OffsetRange] } -/** Represents a range of offsets from a single Kafka TopicAndPartition */ +/** + * :: Experimental :: + * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class + * can be created with `OffsetRange.create()`. + */ +@Experimental final class OffsetRange private( - /** kafka topic name */ + /** Kafka topic name */ val topic: String, - /** kafka partition id */ + /** Kafka partition id */ val partition: Int, /** inclusive starting offset */ val fromOffset: Long, @@ -36,11 +55,33 @@ final class OffsetRange private( val untilOffset: Long) extends Serializable { import OffsetRange.OffsetRangeTuple + override def equals(obj: Any): Boolean = obj match { + case that: OffsetRange => + this.topic == that.topic && + this.partition == that.partition && + this.fromOffset == that.fromOffset && + this.untilOffset == that.untilOffset + case _ => false + } + + override def hashCode(): Int = { + toTuple.hashCode() + } + + override def toString(): String = { + s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset]" + } + /** this is to avoid ClassNotFoundException during checkpoint restore */ private[streaming] def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset) } +/** + * :: Experimental :: + * Companion object the provides methods to create instances of [[OffsetRange]]. + */ +@Experimental object OffsetRange { def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = new OffsetRange(topic, partition, fromOffset, untilOffset) @@ -61,10 +102,10 @@ object OffsetRange { new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset) /** this is to avoid ClassNotFoundException during checkpoint restore */ - private[spark] + private[kafka] type OffsetRangeTuple = (String, Int, Long, Long) - private[streaming] + private[kafka] def apply(t: OffsetRangeTuple) = new OffsetRange(t._1, t._2, t._3, t._4) } |