aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2014-12-25 19:39:49 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-12-25 19:39:49 -0800
commitf205fe477c33a541053c198cd43a5811d6cf9fe2 (patch)
treef42adec11f8580e67e1ab3a16a54619f0f000c0a /streaming
parentac8278593ea68dd3be8cddf8cd5ce739f163ab84 (diff)
downloadspark-f205fe477c33a541053c198cd43a5811d6cf9fe2.tar.gz
spark-f205fe477c33a541053c198cd43a5811d6cf9fe2.tar.bz2
spark-f205fe477c33a541053c198cd43a5811d6cf9fe2.zip
[SPARK-4537][Streaming] Expand StreamingSource to add more metrics
Add `processingDelay`, `schedulingDelay` and `totalDelay` for the last completed batch. Add `lastReceivedBatchRecords` and `totalReceivedBatchRecords` to the received records counting. Author: jerryshao <saisai.shao@intel.com> Closes #3466 from jerryshao/SPARK-4537 and squashes the following commits: 00f5f7f [jerryshao] Change the code style and add totalProcessedRecords 44721a6 [jerryshao] Further address the comments c097ddc [jerryshao] Address the comments 02dd44f [jerryshao] Fix the addressed comments c7a9376 [jerryshao] Expand StreamingSource to add more metrics
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala53
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala19
2 files changed, 57 insertions, 15 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
index e35a568ddf..9697437dd2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
@@ -29,9 +29,17 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
private val streamingListener = ssc.progressListener
private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
- defaultValue: T) {
+ defaultValue: T): Unit = {
+ registerGaugeWithOption[T](name,
+ (l: StreamingJobProgressListener) => Option(f(streamingListener)), defaultValue)
+ }
+
+ private def registerGaugeWithOption[T](
+ name: String,
+ f: StreamingJobProgressListener => Option[T],
+ defaultValue: T): Unit = {
metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] {
- override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue)
+ override def getValue: T = f(streamingListener).getOrElse(defaultValue)
})
}
@@ -41,6 +49,12 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
// Gauge for number of total completed batches
registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L)
+ // Gauge for number of total received records
+ registerGauge("totalReceivedRecords", _.numTotalReceivedRecords, 0L)
+
+ // Gauge for number of total processed records
+ registerGauge("totalProcessedRecords", _.numTotalProcessedRecords, 0L)
+
// Gauge for number of unprocessed batches
registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L)
@@ -55,19 +69,30 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
// Gauge for last completed batch, useful for monitoring the streaming job's running status,
// displayed data -1 for any abnormal condition.
- registerGauge("lastCompletedBatch_submissionTime",
- _.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
- registerGauge("lastCompletedBatch_processStartTime",
- _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
- registerGauge("lastCompletedBatch_processEndTime",
- _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
+ registerGaugeWithOption("lastCompletedBatch_submissionTime",
+ _.lastCompletedBatch.map(_.submissionTime), -1L)
+ registerGaugeWithOption("lastCompletedBatch_processingStartTime",
+ _.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
+ registerGaugeWithOption("lastCompletedBatch_processingEndTime",
+ _.lastCompletedBatch.flatMap(_.processingEndTime), -1L)
+
+ // Gauge for last completed batch's delay information.
+ registerGaugeWithOption("lastCompletedBatch_processingDelay",
+ _.lastCompletedBatch.flatMap(_.processingDelay), -1L)
+ registerGaugeWithOption("lastCompletedBatch_schedulingDelay",
+ _.lastCompletedBatch.flatMap(_.schedulingDelay), -1L)
+ registerGaugeWithOption("lastCompletedBatch_totalDelay",
+ _.lastCompletedBatch.flatMap(_.totalDelay), -1L)
// Gauge for last received batch, useful for monitoring the streaming job's running status,
// displayed data -1 for any abnormal condition.
- registerGauge("lastReceivedBatch_submissionTime",
- _.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
- registerGauge("lastReceivedBatch_processStartTime",
- _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
- registerGauge("lastReceivedBatch_processEndTime",
- _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
+ registerGaugeWithOption("lastReceivedBatch_submissionTime",
+ _.lastCompletedBatch.map(_.submissionTime), -1L)
+ registerGaugeWithOption("lastReceivedBatch_processingStartTime",
+ _.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
+ registerGaugeWithOption("lastReceivedBatch_processingEndTime",
+ _.lastCompletedBatch.flatMap(_.processingEndTime), -1L)
+
+ // Gauge for last received batch records.
+ registerGauge("lastReceivedBatch_records", _.lastReceivedBatchRecords.values.sum, 0L)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index f61069b56d..5ee53a5c5f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -25,7 +25,6 @@ import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
import org.apache.spark.util.Distribution
-import org.apache.spark.Logging
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
@@ -36,6 +35,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private val completedaBatchInfos = new Queue[BatchInfo]
private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
private var totalCompletedBatches = 0L
+ private var totalReceivedRecords = 0L
+ private var totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
val batchDuration = ssc.graph.batchDuration.milliseconds
@@ -65,6 +66,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
+
+ batchStarted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
+ totalReceivedRecords += infos.map(_.numRecords).sum
+ }
}
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
@@ -73,6 +78,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
completedaBatchInfos.enqueue(batchCompleted.batchInfo)
if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
totalCompletedBatches += 1L
+
+ batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
+ totalProcessedRecords += infos.map(_.numRecords).sum
+ }
}
def numReceivers = synchronized {
@@ -83,6 +92,14 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
totalCompletedBatches
}
+ def numTotalReceivedRecords: Long = synchronized {
+ totalReceivedRecords
+ }
+
+ def numTotalProcessedRecords: Long = synchronized {
+ totalProcessedRecords
+ }
+
def numUnprocessedBatches: Long = synchronized {
waitingBatchInfos.size + runningBatchInfos.size
}