aboutsummaryrefslogtreecommitdiff
path: root/external/kafka/src/main
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-05-05 02:01:06 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-05 02:01:06 -0700
commit8436f7e98e674020007a9175973c6a1095b6774f (patch)
treefb31b238c605d6102ec00bbe2b020de415c6aab8 /external/kafka/src/main
parent8776fe0b93b6e6d718738bcaf9838a2196e12c8a (diff)
downloadspark-8436f7e98e674020007a9175973c6a1095b6774f.tar.gz
spark-8436f7e98e674020007a9175973c6a1095b6774f.tar.bz2
spark-8436f7e98e674020007a9175973c6a1095b6774f.zip
[SPARK-7113] [STREAMING] Support input information reporting for Direct Kafka stream
Author: jerryshao <saisai.shao@intel.com> Closes #5879 from jerryshao/SPARK-7113 and squashes the following commits: b0b506c [jerryshao] Address the comments 0babe66 [jerryshao] Support input information reporting for Direct Kafka stream
Diffstat (limited to 'external/kafka/src/main')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala10
1 files changed, 7 insertions, 3 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 1b1fc8051d..6715aede79 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -17,7 +17,6 @@
package org.apache.spark.streaming.kafka
-
import scala.annotation.tailrec
import scala.collection.mutable
import scala.reflect.{classTag, ClassTag}
@@ -27,10 +26,10 @@ import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.{Logging, SparkException}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.scheduler.InputInfo
/**
* A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
@@ -117,6 +116,11 @@ class DirectKafkaInputDStream[
val rdd = KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
+ // Report the record number of this batch interval to InputInfoTracker.
+ val numRecords = rdd.offsetRanges.map(r => r.untilOffset - r.fromOffset).sum
+ val inputInfo = InputInfo(id, numRecords)
+ ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
+
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
Some(rdd)
}