diff options
author | jerryshao <saisai.shao@intel.com> | 2015-05-05 02:01:06 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-05 02:01:06 -0700 |
commit | 8436f7e98e674020007a9175973c6a1095b6774f (patch) | |
tree | fb31b238c605d6102ec00bbe2b020de415c6aab8 /external/kafka/src/main/scala | |
parent | 8776fe0b93b6e6d718738bcaf9838a2196e12c8a (diff) | |
download | spark-8436f7e98e674020007a9175973c6a1095b6774f.tar.gz spark-8436f7e98e674020007a9175973c6a1095b6774f.tar.bz2 spark-8436f7e98e674020007a9175973c6a1095b6774f.zip |
[SPARK-7113] [STREAMING] Support input information reporting for Direct Kafka stream
Author: jerryshao <saisai.shao@intel.com>
Closes #5879 from jerryshao/SPARK-7113 and squashes the following commits:
b0b506c [jerryshao] Address the comments
0babe66 [jerryshao] Support input information reporting for Direct Kafka stream
Diffstat (limited to 'external/kafka/src/main/scala')
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala | 10 |
1 files changed, 7 insertions, 3 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 1b1fc8051d..6715aede79 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -17,7 +17,6 @@ package org.apache.spark.streaming.kafka - import scala.annotation.tailrec import scala.collection.mutable import scala.reflect.{classTag, ClassTag} @@ -27,10 +26,10 @@ import kafka.message.MessageAndMetadata import kafka.serializer.Decoder import org.apache.spark.{Logging, SparkException} -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset +import org.apache.spark.streaming.scheduler.InputInfo /** * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where @@ -117,6 +116,11 @@ class DirectKafkaInputDStream[ val rdd = KafkaRDD[K, V, U, T, R]( context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) + // Report the record number of this batch interval to InputInfoTracker. + val numRecords = rdd.offsetRanges.map(r => r.untilOffset - r.fromOffset).sum + val inputInfo = InputInfo(id, numRecords) + ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) + currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd) } |