aboutsummaryrefslogtreecommitdiff
path: root/external/kafka
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-07-09 13:48:29 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-09 13:48:29 -0700
commit1f6b0b1234cc03aa2e07aea7fec2de7563885238 (patch)
treec1a8c2d2f750ca5d8db14f6bf1558d58649396e5 /external/kafka
parentc4830598b271cc6390d127bd4cf8ab02b28792e0 (diff)
downloadspark-1f6b0b1234cc03aa2e07aea7fec2de7563885238.tar.gz
spark-1f6b0b1234cc03aa2e07aea7fec2de7563885238.tar.bz2
spark-1f6b0b1234cc03aa2e07aea7fec2de7563885238.zip
[SPARK-8701] [STREAMING] [WEBUI] Add input metadata in the batch page
This PR adds `metadata` to `InputInfo`. `InputDStream` can report its metadata for a batch and it will be shown in the batch page. For example, ![screen shot](https://cloud.githubusercontent.com/assets/1000778/8403741/d6ffc7e2-1e79-11e5-9888-c78c1575123a.png) FileInputDStream will display the new files for a batch, and DirectKafkaInputDStream will display its offset ranges. Author: zsxwing <zsxwing@gmail.com> Closes #7081 from zsxwing/input-metadata and squashes the following commits: f7abd9b [zsxwing] Revert the space changes in project/MimaExcludes.scala d906209 [zsxwing] Merge branch 'master' into input-metadata 74762da [zsxwing] Fix MiMa tests 7903e33 [zsxwing] Merge branch 'master' into input-metadata 450a46c [zsxwing] Address comments 1d94582 [zsxwing] Raname InputInfo to StreamInputInfo and change "metadata" to Map[String, Any] d496ae9 [zsxwing] Add input metadata in the batch page
Diffstat (limited to 'external/kafka')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala23
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala2
2 files changed, 20 insertions, 5 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 876456c964..48a1933d92 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.kafka
import scala.annotation.tailrec
import scala.collection.mutable
-import scala.reflect.{classTag, ClassTag}
+import scala.reflect.ClassTag
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
@@ -29,7 +29,7 @@ import org.apache.spark.{Logging, SparkException}
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
-import org.apache.spark.streaming.scheduler.InputInfo
+import org.apache.spark.streaming.scheduler.StreamInputInfo
/**
* A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
@@ -119,8 +119,23 @@ class DirectKafkaInputDStream[
val rdd = KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
- // Report the record number of this batch interval to InputInfoTracker.
- val inputInfo = InputInfo(id, rdd.count)
+ // Report the record number and metadata of this batch interval to InputInfoTracker.
+ val offsetRanges = currentOffsets.map { case (tp, fo) =>
+ val uo = untilOffsets(tp)
+ OffsetRange(tp.topic, tp.partition, fo, uo.offset)
+ }
+ val description = offsetRanges.filter { offsetRange =>
+ // Don't display empty ranges.
+ offsetRange.fromOffset != offsetRange.untilOffset
+ }.map { offsetRange =>
+ s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
+ s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+ }.mkString("\n")
+ // Copy offsetRanges to immutable.List to prevent from being modified by the user
+ val metadata = Map(
+ "offsets" -> offsetRanges.toList,
+ StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
+ val inputInfo = StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
index 2675042666..f326e7f1f6 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
@@ -75,7 +75,7 @@ final class OffsetRange private(
}
override def toString(): String = {
- s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset]"
+ s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset])"
}
/** this is to avoid ClassNotFoundException during checkpoint restore */