diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-18 15:35:24 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-18 15:35:24 -0800 |
commit | b80ec05635132f96772545803a10a1bbfa1250e7 (patch) | |
tree | 999562e0dc469fa2f767c43b884af5f78a8d6b34 /streaming/src | |
parent | 097e120c0c4132f007bfd0b0254b362ee9a02d8f (diff) | |
download | spark-b80ec05635132f96772545803a10a1bbfa1250e7.tar.gz spark-b80ec05635132f96772545803a10a1bbfa1250e7.tar.bz2 spark-b80ec05635132f96772545803a10a1bbfa1250e7.zip |
Added StatsReportListener to generate processing time statistics across multiple batches.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala | 2 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala | 45 |
2 files changed, 45 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 14906fd720..69930f3b6c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -79,13 +79,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { jobSet.afterJobStop(job) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) { - listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo())) jobSets.remove(jobSet.time) generator.onBatchCompletion(jobSet.time) logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( jobSet.totalDelay / 1000.0, jobSet.time.toString, jobSet.processingDelay / 1000.0 )) + listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo())) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 49fd0d29c3..5647ffab8d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -17,14 +17,22 @@ package org.apache.spark.streaming.scheduler +import scala.collection.mutable.Queue +import org.apache.spark.util.Distribution + +/** Base trait for events related to StreamingListener */ sealed trait StreamingListenerEvent case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent -trait StreamingListener { +/** + * A listener interface for receiving information about an ongoing streaming + * computation. + */ +trait StreamingListener { /** * Called when processing of a batch has completed */ @@ -34,4 +42,39 @@ trait StreamingListener { * Called when processing of a batch has started */ def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { } +} + + +/** + * A simple StreamingListener that logs summary statistics across Spark Streaming batches + * @param numBatchInfos Number of last batches to consider for generating statistics (default: 10) + */ +class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener { + + import org.apache.spark + + val batchInfos = new Queue[BatchInfo]() + + override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) { + addToQueue(batchStarted.batchInfo) + printStats() + } + + def addToQueue(newPoint: BatchInfo) { + batchInfos.enqueue(newPoint) + if (batchInfos.size > numBatchInfos) batchInfos.dequeue() + } + + def printStats() { + showMillisDistribution("Total delay: ", _.totalDelay) + showMillisDistribution("Processing time: ", _.processingDelay) + } + + def showMillisDistribution(heading: String, getMetric: BatchInfo => Option[Long]) { + spark.scheduler.StatsReportListener.showMillisDistribution(heading, extractDistribution(getMetric)) + } + + def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = { + Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble)) + } }
\ No newline at end of file |