aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-23 15:59:15 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-23 15:59:15 -0800
commitb31e91f927356c50d24286ba70f00fa8f6527e2f (patch)
tree7435f546812df5003d30717ac78c3047308ef570 /streaming
parent19d1d58b67a767b227e009ab8723efaa7087dd07 (diff)
parent6eaa0505493511adb040257abc749fcd774bbb68 (diff)
downloadspark-b31e91f927356c50d24286ba70f00fa8f6527e2f.tar.gz
spark-b31e91f927356c50d24286ba70f00fa8f6527e2f.tar.bz2
spark-b31e91f927356c50d24286ba70f00fa8f6527e2f.zip
Merge branch 'scheduler-update' into filestream-fix
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala10
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala2
3 files changed, 26 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 88e4af59b7..4e8d07fe92 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -21,6 +21,11 @@ import org.apache.spark.streaming.Time
/**
* Class having information on completed batches.
+ * @param batchTime Time of the batch
+ * @param submissionTime Clock time of when jobs of this batch was submitted to
+ * the streaming scheduler queue
+ * @param processingStartTime Clock time of when the first job of this batch started processing
+ * @param processingEndTime Clock time of when the last job of this batch finished processing
*/
case class BatchInfo(
batchTime: Time,
@@ -29,9 +34,22 @@ case class BatchInfo(
processingEndTime: Option[Long]
) {
+ /**
+ * Time taken for the first job of this batch to start processing from the time this batch
+ * was submitted to the streaming scheduler. Essentially, it is
+ * `processingStartTime` - `submissionTime`.
+ */
def schedulingDelay = processingStartTime.map(_ - submissionTime)
+ /**
+ * Time taken for the all jobs of this batch to finish processing from the time they started
+ * processing. Essentially, it is `processingEndTime` - `processingStartTime`.
+ */
def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption
+ /**
+ * Time taken for all the jobs of this batch to finish processing from the time they
+ * were submitted. Essentially, it is `processingDelay` + `schedulingDelay`.
+ */
def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index cf7431a8a3..57268674ea 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -27,9 +27,9 @@ private[streaming]
case class JobSet(time: Time, jobs: Seq[Job]) {
private val incompleteJobs = new HashSet[Job]()
- var submissionTime = System.currentTimeMillis()
- var processingStartTime = -1L
- var processingEndTime = -1L
+ var submissionTime = System.currentTimeMillis() // when this jobset was submitted
+ var processingStartTime = -1L // when the first job of this jobset started processing
+ var processingEndTime = -1L // when the last job of this jobset finished processing
jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) }
incompleteJobs ++= jobs
@@ -47,8 +47,12 @@ case class JobSet(time: Time, jobs: Seq[Job]) {
def hasCompleted() = incompleteJobs.isEmpty
+ // Time taken to process all the jobs from the time they started processing
+ // (i.e. not including the time they wait in the streaming scheduler queue)
def processingDelay = processingEndTime - processingStartTime
+ // Time taken to process all the jobs from the time they were submitted
+ // (i.e. including the time they wait in the streaming scheduler queue)
def totalDelay = {
processingEndTime - time.milliseconds
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 16410a21e3..fa64142096 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.streaming.scheduler._
import scala.collection.mutable.ArrayBuffer
import org.scalatest.matchers.ShouldMatchers
-class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers{
+class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
val input = (1 to 4).map(Seq(_)).toSeq
val operation = (d: DStream[Int]) => d.map(x => x)