From dc3ee6b6122229cd99a133baf10a46dac2f7e9e2 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 23 Dec 2013 11:30:42 -0800 Subject: Added comments to BatchInfo and JobSet, based on Patrick's comment on PR 277. --- .../apache/spark/streaming/scheduler/BatchInfo.scala | 19 +++++++++++++++++++ .../org/apache/spark/streaming/scheduler/JobSet.scala | 10 +++++++--- 2 files changed, 26 insertions(+), 3 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 88e4af59b7..e3fb07624e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -21,6 +21,12 @@ import org.apache.spark.streaming.Time /** * Class having information on completed batches. + * @param batchTime Time of the batch + * @param submissionTime Clock time of when jobs of this batch was submitted to + * the streaming scheduler queue + * @param processingStartTime Clock time of when the first job of this batch started processing + * @param processingEndTime Clock time of when the last job of this batch finished processing + * */ case class BatchInfo( batchTime: Time, @@ -29,9 +35,22 @@ case class BatchInfo( processingEndTime: Option[Long] ) { + /** + * Time taken for the first job of this batch to start processing from the time this batch + * was submitted to the streaming scheduler. Essentially, it is + * `processingStartTime` - `submissionTime`. + */ def schedulingDelay = processingStartTime.map(_ - submissionTime) + /** + * Time taken for the all jobs of this batch to finish processing from the time they started + * processing. Essentially, it is `processingEndTime` - `processingStartTime`. + */ def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption + /** + * Time taken for all the jobs of this batch to finish processing from the time they + * were submitted. Essentially, it is `processingDelay` + `schedulingDelay`. + */ def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index cf7431a8a3..57268674ea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -27,9 +27,9 @@ private[streaming] case class JobSet(time: Time, jobs: Seq[Job]) { private val incompleteJobs = new HashSet[Job]() - var submissionTime = System.currentTimeMillis() - var processingStartTime = -1L - var processingEndTime = -1L + var submissionTime = System.currentTimeMillis() // when this jobset was submitted + var processingStartTime = -1L // when the first job of this jobset started processing + var processingEndTime = -1L // when the last job of this jobset finished processing jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) } incompleteJobs ++= jobs @@ -47,8 +47,12 @@ case class JobSet(time: Time, jobs: Seq[Job]) { def hasCompleted() = incompleteJobs.isEmpty + // Time taken to process all the jobs from the time they started processing + // (i.e. not including the time they wait in the streaming scheduler queue) def processingDelay = processingEndTime - processingStartTime + // Time taken to process all the jobs from the time they were submitted + // (i.e. including the time they wait in the streaming scheduler queue) def totalDelay = { processingEndTime - time.milliseconds } -- cgit v1.2.3