aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-23 11:30:42 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-23 11:30:42 -0800
commitdc3ee6b6122229cd99a133baf10a46dac2f7e9e2 (patch)
tree670d94a467925db6db38c30bf0aecb215ded342b /streaming
parent3ddbdbfbc71486cd5076d875f82796a880d2dccb (diff)
downloadspark-dc3ee6b6122229cd99a133baf10a46dac2f7e9e2.tar.gz
spark-dc3ee6b6122229cd99a133baf10a46dac2f7e9e2.tar.bz2
spark-dc3ee6b6122229cd99a133baf10a46dac2f7e9e2.zip
Added comments to BatchInfo and JobSet, based on Patrick's comment on PR 277.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala19
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala10
2 files changed, 26 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 88e4af59b7..e3fb07624e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -21,6 +21,12 @@ import org.apache.spark.streaming.Time
/**
* Class having information on completed batches.
+ * @param batchTime Time of the batch
+ * @param submissionTime Clock time of when jobs of this batch was submitted to
+ * the streaming scheduler queue
+ * @param processingStartTime Clock time of when the first job of this batch started processing
+ * @param processingEndTime Clock time of when the last job of this batch finished processing
+ *
*/
case class BatchInfo(
batchTime: Time,
@@ -29,9 +35,22 @@ case class BatchInfo(
processingEndTime: Option[Long]
) {
+ /**
+ * Time taken for the first job of this batch to start processing from the time this batch
+ * was submitted to the streaming scheduler. Essentially, it is
+ * `processingStartTime` - `submissionTime`.
+ */
def schedulingDelay = processingStartTime.map(_ - submissionTime)
+ /**
+ * Time taken for the all jobs of this batch to finish processing from the time they started
+ * processing. Essentially, it is `processingEndTime` - `processingStartTime`.
+ */
def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption
+ /**
+ * Time taken for all the jobs of this batch to finish processing from the time they
+ * were submitted. Essentially, it is `processingDelay` + `schedulingDelay`.
+ */
def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index cf7431a8a3..57268674ea 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -27,9 +27,9 @@ private[streaming]
case class JobSet(time: Time, jobs: Seq[Job]) {
private val incompleteJobs = new HashSet[Job]()
- var submissionTime = System.currentTimeMillis()
- var processingStartTime = -1L
- var processingEndTime = -1L
+ var submissionTime = System.currentTimeMillis() // when this jobset was submitted
+ var processingStartTime = -1L // when the first job of this jobset started processing
+ var processingEndTime = -1L // when the last job of this jobset finished processing
jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) }
incompleteJobs ++= jobs
@@ -47,8 +47,12 @@ case class JobSet(time: Time, jobs: Seq[Job]) {
def hasCompleted() = incompleteJobs.isEmpty
+ // Time taken to process all the jobs from the time they started processing
+ // (i.e. not including the time they wait in the streaming scheduler queue)
def processingDelay = processingEndTime - processingStartTime
+ // Time taken to process all the jobs from the time they were submitted
+ // (i.e. including the time they wait in the streaming scheduler queue)
def totalDelay = {
processingEndTime - time.milliseconds
}