aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-18 15:35:24 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-18 15:35:24 -0800
commitb80ec05635132f96772545803a10a1bbfa1250e7 (patch)
tree999562e0dc469fa2f767c43b884af5f78a8d6b34 /streaming
parent097e120c0c4132f007bfd0b0254b362ee9a02d8f (diff)
downloadspark-b80ec05635132f96772545803a10a1bbfa1250e7.tar.gz
spark-b80ec05635132f96772545803a10a1bbfa1250e7.tar.bz2
spark-b80ec05635132f96772545803a10a1bbfa1250e7.zip
Added StatsReportListener to generate processing time statistics across multiple batches.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala45
2 files changed, 45 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 14906fd720..69930f3b6c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -79,13 +79,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
jobSet.afterJobStop(job)
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
- listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo()))
jobSets.remove(jobSet.time)
generator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
+ listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo()))
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 49fd0d29c3..5647ffab8d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -17,14 +17,22 @@
package org.apache.spark.streaming.scheduler
+import scala.collection.mutable.Queue
+import org.apache.spark.util.Distribution
+
+/** Base trait for events related to StreamingListener */
sealed trait StreamingListenerEvent
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
-trait StreamingListener {
+/**
+ * A listener interface for receiving information about an ongoing streaming
+ * computation.
+ */
+trait StreamingListener {
/**
* Called when processing of a batch has completed
*/
@@ -34,4 +42,39 @@ trait StreamingListener {
* Called when processing of a batch has started
*/
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
+}
+
+
+/**
+ * A simple StreamingListener that logs summary statistics across Spark Streaming batches
+ * @param numBatchInfos Number of last batches to consider for generating statistics (default: 10)
+ */
+class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
+
+ import org.apache.spark
+
+ val batchInfos = new Queue[BatchInfo]()
+
+ override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) {
+ addToQueue(batchStarted.batchInfo)
+ printStats()
+ }
+
+ def addToQueue(newPoint: BatchInfo) {
+ batchInfos.enqueue(newPoint)
+ if (batchInfos.size > numBatchInfos) batchInfos.dequeue()
+ }
+
+ def printStats() {
+ showMillisDistribution("Total delay: ", _.totalDelay)
+ showMillisDistribution("Processing time: ", _.processingDelay)
+ }
+
+ def showMillisDistribution(heading: String, getMetric: BatchInfo => Option[Long]) {
+ spark.scheduler.StatsReportListener.showMillisDistribution(heading, extractDistribution(getMetric))
+ }
+
+ def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
+ Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble))
+ }
} \ No newline at end of file