diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-23 15:59:15 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-23 15:59:15 -0800 |
commit | b31e91f927356c50d24286ba70f00fa8f6527e2f (patch) | |
tree | 7435f546812df5003d30717ac78c3047308ef570 /streaming/src/main | |
parent | 19d1d58b67a767b227e009ab8723efaa7087dd07 (diff) | |
parent | 6eaa0505493511adb040257abc749fcd774bbb68 (diff) | |
download | spark-b31e91f927356c50d24286ba70f00fa8f6527e2f.tar.gz spark-b31e91f927356c50d24286ba70f00fa8f6527e2f.tar.bz2 spark-b31e91f927356c50d24286ba70f00fa8f6527e2f.zip |
Merge branch 'scheduler-update' into filestream-fix
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala | 18 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala | 10 |
2 files changed, 25 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 88e4af59b7..4e8d07fe92 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -21,6 +21,11 @@ import org.apache.spark.streaming.Time /** * Class having information on completed batches. + * @param batchTime Time of the batch + * @param submissionTime Clock time of when jobs of this batch was submitted to + * the streaming scheduler queue + * @param processingStartTime Clock time of when the first job of this batch started processing + * @param processingEndTime Clock time of when the last job of this batch finished processing */ case class BatchInfo( batchTime: Time, @@ -29,9 +34,22 @@ case class BatchInfo( processingEndTime: Option[Long] ) { + /** + * Time taken for the first job of this batch to start processing from the time this batch + * was submitted to the streaming scheduler. Essentially, it is + * `processingStartTime` - `submissionTime`. + */ def schedulingDelay = processingStartTime.map(_ - submissionTime) + /** + * Time taken for the all jobs of this batch to finish processing from the time they started + * processing. Essentially, it is `processingEndTime` - `processingStartTime`. + */ def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption + /** + * Time taken for all the jobs of this batch to finish processing from the time they + * were submitted. Essentially, it is `processingDelay` + `schedulingDelay`. + */ def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index cf7431a8a3..57268674ea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -27,9 +27,9 @@ private[streaming] case class JobSet(time: Time, jobs: Seq[Job]) { private val incompleteJobs = new HashSet[Job]() - var submissionTime = System.currentTimeMillis() - var processingStartTime = -1L - var processingEndTime = -1L + var submissionTime = System.currentTimeMillis() // when this jobset was submitted + var processingStartTime = -1L // when the first job of this jobset started processing + var processingEndTime = -1L // when the last job of this jobset finished processing jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) } incompleteJobs ++= jobs @@ -47,8 +47,12 @@ case class JobSet(time: Time, jobs: Seq[Job]) { def hasCompleted() = incompleteJobs.isEmpty + // Time taken to process all the jobs from the time they started processing + // (i.e. not including the time they wait in the streaming scheduler queue) def processingDelay = processingEndTime - processingStartTime + // Time taken to process all the jobs from the time they were submitted + // (i.e. including the time they wait in the streaming scheduler queue) def totalDelay = { processingEndTime - time.milliseconds } |