diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-23 17:49:41 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-23 17:49:41 -0800 |
commit | e9165d2a391c73d7e06436426047759aa62807c2 (patch) | |
tree | e7f7d162c951e5f111778f2a00635bc1ce6dc278 | |
parent | 61f4bbda0d4e3ecbd8b955232a741231936a25de (diff) | |
parent | 6eaa0505493511adb040257abc749fcd774bbb68 (diff) | |
download | spark-e9165d2a391c73d7e06436426047759aa62807c2.tar.gz spark-e9165d2a391c73d7e06436426047759aa62807c2.tar.bz2 spark-e9165d2a391c73d7e06436426047759aa62807c2.zip |
Merge branch 'scheduler-update' into window-improvement
4 files changed, 32 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 88e4af59b7..4e8d07fe92 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -21,6 +21,11 @@ import org.apache.spark.streaming.Time /** * Class having information on completed batches. + * @param batchTime Time of the batch + * @param submissionTime Clock time of when jobs of this batch was submitted to + * the streaming scheduler queue + * @param processingStartTime Clock time of when the first job of this batch started processing + * @param processingEndTime Clock time of when the last job of this batch finished processing */ case class BatchInfo( batchTime: Time, @@ -29,9 +34,22 @@ case class BatchInfo( processingEndTime: Option[Long] ) { + /** + * Time taken for the first job of this batch to start processing from the time this batch + * was submitted to the streaming scheduler. Essentially, it is + * `processingStartTime` - `submissionTime`. + */ def schedulingDelay = processingStartTime.map(_ - submissionTime) + /** + * Time taken for the all jobs of this batch to finish processing from the time they started + * processing. Essentially, it is `processingEndTime` - `processingStartTime`. + */ def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption + /** + * Time taken for all the jobs of this batch to finish processing from the time they + * were submitted. Essentially, it is `processingDelay` + `schedulingDelay`. + */ def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 33c5322358..9511ccfbed 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -24,7 +24,8 @@ import scala.collection.mutable.HashSet import org.apache.spark.streaming._ /** - * This class drives the generation of Spark jobs from the DStreams. + * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate + * the jobs and runs them using a thread pool. Number of threads */ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { @@ -91,6 +92,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } } + private[streaming] class JobHandler(job: Job) extends Runnable { def run() { beforeJobStart(job) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index 05233d095b..57268674ea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -20,13 +20,16 @@ package org.apache.spark.streaming.scheduler import scala.collection.mutable.HashSet import org.apache.spark.streaming.Time +/** Class representing a set of Jobs + * belong to the same batch. + */ private[streaming] case class JobSet(time: Time, jobs: Seq[Job]) { private val incompleteJobs = new HashSet[Job]() - var submissionTime = System.currentTimeMillis() - var processingStartTime = -1L - var processingEndTime = -1L + var submissionTime = System.currentTimeMillis() // when this jobset was submitted + var processingStartTime = -1L // when the first job of this jobset started processing + var processingEndTime = -1L // when the last job of this jobset finished processing jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) } incompleteJobs ++= jobs @@ -44,8 +47,12 @@ case class JobSet(time: Time, jobs: Seq[Job]) { def hasCompleted() = incompleteJobs.isEmpty + // Time taken to process all the jobs from the time they started processing + // (i.e. not including the time they wait in the streaming scheduler queue) def processingDelay = processingEndTime - processingStartTime + // Time taken to process all the jobs from the time they were submitted + // (i.e. including the time they wait in the streaming scheduler queue) def totalDelay = { processingEndTime - time.milliseconds } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 16410a21e3..fa64142096 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.streaming.scheduler._ import scala.collection.mutable.ArrayBuffer import org.scalatest.matchers.ShouldMatchers -class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers{ +class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { val input = (1 to 4).map(Seq(_)).toSeq val operation = (d: DStream[Int]) => d.map(x => x) |