From 3ddbdbfbc71486cd5076d875f82796a880d2dccb Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 20 Dec 2013 19:51:37 -0800 Subject: Minor updated based on comments on PR 277. --- .../scala/org/apache/spark/streaming/scheduler/JobScheduler.scala | 4 +++- .../src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 33c5322358..9511ccfbed 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -24,7 +24,8 @@ import scala.collection.mutable.HashSet import org.apache.spark.streaming._ /** - * This class drives the generation of Spark jobs from the DStreams. + * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate + * the jobs and runs them using a thread pool. Number of threads */ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { @@ -91,6 +92,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } } + private[streaming] class JobHandler(job: Job) extends Runnable { def run() { beforeJobStart(job) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index 05233d095b..cf7431a8a3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -20,6 +20,9 @@ package org.apache.spark.streaming.scheduler import scala.collection.mutable.HashSet import org.apache.spark.streaming.Time +/** Class representing a set of Jobs + * belong to the same batch. + */ private[streaming] case class JobSet(time: Time, jobs: Seq[Job]) { -- cgit v1.2.3 From dc3ee6b6122229cd99a133baf10a46dac2f7e9e2 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 23 Dec 2013 11:30:42 -0800 Subject: Added comments to BatchInfo and JobSet, based on Patrick's comment on PR 277. --- .../apache/spark/streaming/scheduler/BatchInfo.scala | 19 +++++++++++++++++++ .../org/apache/spark/streaming/scheduler/JobSet.scala | 10 +++++++--- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 88e4af59b7..e3fb07624e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -21,6 +21,12 @@ import org.apache.spark.streaming.Time /** * Class having information on completed batches. + * @param batchTime Time of the batch + * @param submissionTime Clock time of when jobs of this batch was submitted to + * the streaming scheduler queue + * @param processingStartTime Clock time of when the first job of this batch started processing + * @param processingEndTime Clock time of when the last job of this batch finished processing + * */ case class BatchInfo( batchTime: Time, @@ -29,9 +35,22 @@ case class BatchInfo( processingEndTime: Option[Long] ) { + /** + * Time taken for the first job of this batch to start processing from the time this batch + * was submitted to the streaming scheduler. Essentially, it is + * `processingStartTime` - `submissionTime`. + */ def schedulingDelay = processingStartTime.map(_ - submissionTime) + /** + * Time taken for the all jobs of this batch to finish processing from the time they started + * processing. Essentially, it is `processingEndTime` - `processingStartTime`. + */ def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption + /** + * Time taken for all the jobs of this batch to finish processing from the time they + * were submitted. Essentially, it is `processingDelay` + `schedulingDelay`. + */ def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index cf7431a8a3..57268674ea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -27,9 +27,9 @@ private[streaming] case class JobSet(time: Time, jobs: Seq[Job]) { private val incompleteJobs = new HashSet[Job]() - var submissionTime = System.currentTimeMillis() - var processingStartTime = -1L - var processingEndTime = -1L + var submissionTime = System.currentTimeMillis() // when this jobset was submitted + var processingStartTime = -1L // when the first job of this jobset started processing + var processingEndTime = -1L // when the last job of this jobset finished processing jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) } incompleteJobs ++= jobs @@ -47,8 +47,12 @@ case class JobSet(time: Time, jobs: Seq[Job]) { def hasCompleted() = incompleteJobs.isEmpty + // Time taken to process all the jobs from the time they started processing + // (i.e. not including the time they wait in the streaming scheduler queue) def processingDelay = processingEndTime - processingStartTime + // Time taken to process all the jobs from the time they were submitted + // (i.e. including the time they wait in the streaming scheduler queue) def totalDelay = { processingEndTime - time.milliseconds } -- cgit v1.2.3 From f9771690a698b6ce5d29eb36b38bbeb498d1af0d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 23 Dec 2013 11:32:26 -0800 Subject: Minor formatting fixes. --- .../scala/org/apache/spark/streaming/scheduler/BatchInfo.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index e3fb07624e..4e8d07fe92 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -26,7 +26,6 @@ import org.apache.spark.streaming.Time * the streaming scheduler queue * @param processingStartTime Clock time of when the first job of this batch started processing * @param processingEndTime Clock time of when the last job of this batch finished processing - * */ case class BatchInfo( batchTime: Time, @@ -48,9 +47,9 @@ case class BatchInfo( */ def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption - /** - * Time taken for all the jobs of this batch to finish processing from the time they - * were submitted. Essentially, it is `processingDelay` + `schedulingDelay`. - */ + /** + * Time taken for all the jobs of this batch to finish processing from the time they + * were submitted. Essentially, it is `processingDelay` + `schedulingDelay`. + */ def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption } -- cgit v1.2.3 From 6eaa0505493511adb040257abc749fcd774bbb68 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 23 Dec 2013 15:55:45 -0800 Subject: Minor change for PR 277. --- .../test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 16410a21e3..fa64142096 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.streaming.scheduler._ import scala.collection.mutable.ArrayBuffer import org.scalatest.matchers.ShouldMatchers -class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers{ +class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { val input = (1 to 4).map(Seq(_)).toSeq val operation = (d: DStream[Int]) => d.map(x => x) -- cgit v1.2.3