From 097e120c0c4132f007bfd0b0254b362ee9a02d8f Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 12 Dec 2013 20:41:51 -0800 Subject: Refactored streaming scheduler and added listener interface. - Refactored Scheduler + JobManager to JobGenerator + JobScheduler and added JobSet for cleaner code. Moved scheduler related code to streaming.scheduler package. - Added StreamingListener trait (similar to SparkListener) to enable gathering to streaming stats like processing times and delays. StreamingContext.addListener() to added listeners. - Deduped some code in streaming tests by modifying TestSuiteBase, and added StreamingListenerSuite. --- core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 3841b5616d..2c5d87419d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -63,7 +63,7 @@ trait SparkListener { * Called when a task begins remotely fetching its result (will not be called for tasks that do * not need to fetch the result remotely). */ - def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { } + def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { } /** * Called when a task ends -- cgit v1.2.3 From b80ec05635132f96772545803a10a1bbfa1250e7 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 18 Dec 2013 15:35:24 -0800 Subject: Added StatsReportListener to generate processing time statistics across multiple batches. --- .../org/apache/spark/scheduler/SparkListener.scala | 5 +-- .../spark/streaming/scheduler/JobScheduler.scala | 2 +- .../streaming/scheduler/StreamingListener.scala | 45 +++++++++++++++++++++- 3 files changed, 46 insertions(+), 6 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 2c5d87419d..ee63b3c4a1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -131,8 +131,8 @@ object StatsReportListener extends Logging { def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) { val stats = d.statCounter - logInfo(heading + stats) val quantiles = d.getQuantiles(probabilities).map{formatNumber} + logInfo(heading + stats) logInfo(percentilesHeader) logInfo("\t" + quantiles.mkString("\t")) } @@ -173,8 +173,6 @@ object StatsReportListener extends Logging { showMillisDistribution(heading, extractLongDistribution(stage, getMetric)) } - - val seconds = 1000L val minutes = seconds * 60 val hours = minutes * 60 @@ -198,7 +196,6 @@ object StatsReportListener extends Logging { } - case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double) object RuntimePercentage { def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 14906fd720..69930f3b6c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -79,13 +79,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { jobSet.afterJobStop(job) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) { - listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo())) jobSets.remove(jobSet.time) generator.onBatchCompletion(jobSet.time) logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( jobSet.totalDelay / 1000.0, jobSet.time.toString, jobSet.processingDelay / 1000.0 )) + listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo())) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 49fd0d29c3..5647ffab8d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -17,14 +17,22 @@ package org.apache.spark.streaming.scheduler +import scala.collection.mutable.Queue +import org.apache.spark.util.Distribution + +/** Base trait for events related to StreamingListener */ sealed trait StreamingListenerEvent case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent -trait StreamingListener { +/** + * A listener interface for receiving information about an ongoing streaming + * computation. + */ +trait StreamingListener { /** * Called when processing of a batch has completed */ @@ -34,4 +42,39 @@ trait StreamingListener { * Called when processing of a batch has started */ def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { } +} + + +/** + * A simple StreamingListener that logs summary statistics across Spark Streaming batches + * @param numBatchInfos Number of last batches to consider for generating statistics (default: 10) + */ +class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener { + + import org.apache.spark + + val batchInfos = new Queue[BatchInfo]() + + override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) { + addToQueue(batchStarted.batchInfo) + printStats() + } + + def addToQueue(newPoint: BatchInfo) { + batchInfos.enqueue(newPoint) + if (batchInfos.size > numBatchInfos) batchInfos.dequeue() + } + + def printStats() { + showMillisDistribution("Total delay: ", _.totalDelay) + showMillisDistribution("Processing time: ", _.processingDelay) + } + + def showMillisDistribution(heading: String, getMetric: BatchInfo => Option[Long]) { + spark.scheduler.StatsReportListener.showMillisDistribution(heading, extractDistribution(getMetric)) + } + + def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = { + Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble)) + } } \ No newline at end of file -- cgit v1.2.3 From ec71b445ad0440e84c4b4909e4faf75aba0f13d7 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 18 Dec 2013 23:39:28 -0800 Subject: Minor changes. --- .../org/apache/spark/scheduler/SparkListenerBus.scala | 1 - .../org/apache/spark/streaming/StreamingContext.scala | 10 ++++++---- .../spark/streaming/api/java/JavaStreamingContext.scala | 8 ++++++++ .../apache/spark/streaming/scheduler/BatchInfo.scala | 7 +++---- .../org/apache/spark/streaming/scheduler/Job.scala | 14 ++++---------- .../apache/spark/streaming/scheduler/JobGenerator.scala | 4 ++++ .../apache/spark/streaming/scheduler/JobScheduler.scala | 4 +++- .../spark/streaming/scheduler/StreamingListener.scala | 17 ++++++----------- .../streaming/scheduler/StreamingListenerBus.scala | 2 +- .../apache/spark/streaming/StreamingListenerSuite.scala | 2 +- 10 files changed, 36 insertions(+), 33 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index d5824e7954..85687ea330 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -91,4 +91,3 @@ private[spark] class SparkListenerBus() extends Logging { return true } } - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index fedbbde80c..41da028a3c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -513,7 +513,10 @@ class StreamingContext private ( graph.addOutputStream(outputStream) } - def addListener(streamingListener: StreamingListener) { + /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for + * receiving system events related to streaming. + */ + def addStreamingListener(streamingListener: StreamingListener) { scheduler.listenerBus.addListener(streamingListener) } @@ -532,20 +535,19 @@ class StreamingContext private ( * Start the execution of the streams. */ def start() { - validate() + // Get the network input streams val networkInputStreams = graph.getInputStreams().filter(s => s match { case n: NetworkInputDStream[_] => true case _ => false }).map(_.asInstanceOf[NetworkInputDStream[_]]).toArray + // Start the network input tracker (must start before receivers) if (networkInputStreams.length > 0) { - // Start the network input tracker (must start before receivers) networkInputTracker = new NetworkInputTracker(this, networkInputStreams) networkInputTracker.start() } - Thread.sleep(1000) // Start the scheduler diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 80dcf87491..78d318cf27 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -39,6 +39,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.StreamingListener /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -687,6 +688,13 @@ class JavaStreamingContext(val ssc: StreamingContext) { ssc.remember(duration) } + /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for + * receiving system events related to streaming. + */ + def addStreamingListener(streamingListener: StreamingListener) { + ssc.addStreamingListener(streamingListener) + } + /** * Starts the execution of the streams. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 798598ad50..88e4af59b7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -19,6 +19,9 @@ package org.apache.spark.streaming.scheduler import org.apache.spark.streaming.Time +/** + * Class having information on completed batches. + */ case class BatchInfo( batchTime: Time, submissionTime: Long, @@ -32,7 +35,3 @@ case class BatchInfo( def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption } - - - - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala index bca5e1f1a5..7341bfbc99 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala @@ -17,9 +17,11 @@ package org.apache.spark.streaming.scheduler -import java.util.concurrent.atomic.AtomicLong import org.apache.spark.streaming.Time +/** + * Class representing a Spark computation. It may contain multiple Spark jobs. + */ private[streaming] class Job(val time: Time, func: () => _) { var id: String = _ @@ -36,12 +38,4 @@ class Job(val time: Time, func: () => _) { } override def toString = id -} -/* -private[streaming] -object Job { - val id = new AtomicLong(0) - - def getNewId() = id.getAndIncrement() -} -*/ +} \ No newline at end of file diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 5d3ce9c398..1cd0b9b0a4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -22,6 +22,10 @@ import org.apache.spark.Logging import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter} import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock} +/** + * This class generates jobs from DStreams as well as drives checkpointing and cleaning + * up DStream metadata. + */ private[streaming] class JobGenerator(jobScheduler: JobScheduler) extends Logging { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 69930f3b6c..33c5322358 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -23,7 +23,9 @@ import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors} import scala.collection.mutable.HashSet import org.apache.spark.streaming._ - +/** + * This class drives the generation of Spark jobs from the DStreams. + */ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 5647ffab8d..36225e190c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -50,19 +50,13 @@ trait StreamingListener { * @param numBatchInfos Number of last batches to consider for generating statistics (default: 10) */ class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener { - - import org.apache.spark - + // Queue containing latest completed batches val batchInfos = new Queue[BatchInfo]() override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) { - addToQueue(batchStarted.batchInfo) - printStats() - } - - def addToQueue(newPoint: BatchInfo) { - batchInfos.enqueue(newPoint) + batchInfos.enqueue(batchStarted.batchInfo) if (batchInfos.size > numBatchInfos) batchInfos.dequeue() + printStats() } def printStats() { @@ -71,10 +65,11 @@ class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener { } def showMillisDistribution(heading: String, getMetric: BatchInfo => Option[Long]) { - spark.scheduler.StatsReportListener.showMillisDistribution(heading, extractDistribution(getMetric)) + org.apache.spark.scheduler.StatsReportListener.showMillisDistribution( + heading, extractDistribution(getMetric)) } def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = { Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble)) } -} \ No newline at end of file +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 324e491914..110a20f282 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -78,4 +78,4 @@ private[spark] class StreamingListenerBus() extends Logging { } return true } -} \ No newline at end of file +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 826c839932..16410a21e3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -34,7 +34,7 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers{ test("basic BatchInfo generation") { val ssc = setupStreams(input, operation) val collector = new BatchInfoCollector - ssc.addListener(collector) + ssc.addStreamingListener(collector) runStreams(ssc, input.size, input.size) val batchInfos = collector.batchInfos batchInfos should have size 4 -- cgit v1.2.3