aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala61
1 files changed, 61 insertions, 0 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index ad2fb8aa5f..fe57222052 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -51,6 +51,7 @@ object KafkaUtils {
* in its own thread
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ * @return DStream of (Kafka message key, Kafka message value)
*/
def createStream(
ssc: StreamingContext,
@@ -74,6 +75,11 @@ object KafkaUtils {
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam U type of Kafka message key decoder
+ * @tparam T type of Kafka message value decoder
+ * @return DStream of (Kafka message key, Kafka message value)
*/
def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
ssc: StreamingContext,
@@ -93,6 +99,7 @@ object KafkaUtils {
* @param groupId The group id for this consumer
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
+ * @return DStream of (Kafka message key, Kafka message value)
*/
def createStream(
jssc: JavaStreamingContext,
@@ -111,6 +118,7 @@ object KafkaUtils {
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level.
+ * @return DStream of (Kafka message key, Kafka message value)
*/
def createStream(
jssc: JavaStreamingContext,
@@ -135,6 +143,11 @@ object KafkaUtils {
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
* @param storageLevel RDD storage level.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam U type of Kafka message key decoder
+ * @tparam T type of Kafka message value decoder
+ * @return DStream of (Kafka message key, Kafka message value)
*/
def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
jssc: JavaStreamingContext,
@@ -219,6 +232,11 @@ object KafkaUtils {
* host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @return RDD of (Kafka message key, Kafka message value)
*/
def createRDD[
K: ClassTag,
@@ -251,6 +269,12 @@ object KafkaUtils {
* @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
* in which case leaders will be looked up on the driver.
* @param messageHandler Function for translating each message and metadata into the desired type
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @tparam R type returned by messageHandler
+ * @return RDD of R
*/
def createRDD[
K: ClassTag,
@@ -288,6 +312,15 @@ object KafkaUtils {
* host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
+ * @param keyClass type of Kafka message key
+ * @param valueClass type of Kafka message value
+ * @param keyDecoderClass type of Kafka message key decoder
+ * @param valueDecoderClass type of Kafka message value decoder
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @return RDD of (Kafka message key, Kafka message value)
*/
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
jsc: JavaSparkContext,
@@ -321,6 +354,12 @@ object KafkaUtils {
* @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
* in which case leaders will be looked up on the driver.
* @param messageHandler Function for translating each message and metadata into the desired type
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @tparam R type returned by messageHandler
+ * @return RDD of R
*/
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
jsc: JavaSparkContext,
@@ -373,6 +412,12 @@ object KafkaUtils {
* @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
* starting point of the stream
* @param messageHandler Function for translating each message and metadata into the desired type
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @tparam R type returned by messageHandler
+ * @return DStream of R
*/
def createDirectStream[
K: ClassTag,
@@ -419,6 +464,11 @@ object KafkaUtils {
* If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
* to determine where the stream starts (defaults to "largest")
* @param topics Names of the topics to consume
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @return DStream of (Kafka message key, Kafka message value)
*/
def createDirectStream[
K: ClassTag,
@@ -470,6 +520,12 @@ object KafkaUtils {
* @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
* starting point of the stream
* @param messageHandler Function for translating each message and metadata into the desired type
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @tparam R type returned by messageHandler
+ * @return DStream of R
*/
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
jssc: JavaStreamingContext,
@@ -529,6 +585,11 @@ object KafkaUtils {
* If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
* to determine where the stream starts (defaults to "largest")
* @param topics Names of the topics to consume
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ * @tparam KD type of Kafka message key decoder
+ * @tparam VD type of Kafka message value decoder
+ * @return DStream of (Kafka message key, Kafka message value)
*/
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](
jssc: JavaStreamingContext,