aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-23 17:49:41 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-23 17:49:41 -0800
commite9165d2a391c73d7e06436426047759aa62807c2 (patch)
treee7f7d162c951e5f111778f2a00635bc1ce6dc278 /streaming
parent61f4bbda0d4e3ecbd8b955232a741231936a25de (diff)
parent6eaa0505493511adb040257abc749fcd774bbb68 (diff)
downloadspark-e9165d2a391c73d7e06436426047759aa62807c2.tar.gz
spark-e9165d2a391c73d7e06436426047759aa62807c2.tar.bz2
spark-e9165d2a391c73d7e06436426047759aa62807c2.zip
Merge branch 'scheduler-update' into window-improvement
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala13
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala2
4 files changed, 32 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 88e4af59b7..4e8d07fe92 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -21,6 +21,11 @@ import org.apache.spark.streaming.Time
/**
* Class having information on completed batches.
+ * @param batchTime Time of the batch
+ * @param submissionTime Clock time of when jobs of this batch was submitted to
+ * the streaming scheduler queue
+ * @param processingStartTime Clock time of when the first job of this batch started processing
+ * @param processingEndTime Clock time of when the last job of this batch finished processing
*/
case class BatchInfo(
batchTime: Time,
@@ -29,9 +34,22 @@ case class BatchInfo(
processingEndTime: Option[Long]
) {
+ /**
+ * Time taken for the first job of this batch to start processing from the time this batch
+ * was submitted to the streaming scheduler. Essentially, it is
+ * `processingStartTime` - `submissionTime`.
+ */
def schedulingDelay = processingStartTime.map(_ - submissionTime)
+ /**
+ * Time taken for the all jobs of this batch to finish processing from the time they started
+ * processing. Essentially, it is `processingEndTime` - `processingStartTime`.
+ */
def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption
+ /**
+ * Time taken for all the jobs of this batch to finish processing from the time they
+ * were submitted. Essentially, it is `processingDelay` + `schedulingDelay`.
+ */
def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 33c5322358..9511ccfbed 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -24,7 +24,8 @@ import scala.collection.mutable.HashSet
import org.apache.spark.streaming._
/**
- * This class drives the generation of Spark jobs from the DStreams.
+ * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
+ * the jobs and runs them using a thread pool. Number of threads
*/
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
@@ -91,6 +92,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
}
+ private[streaming]
class JobHandler(job: Job) extends Runnable {
def run() {
beforeJobStart(job)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 05233d095b..57268674ea 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -20,13 +20,16 @@ package org.apache.spark.streaming.scheduler
import scala.collection.mutable.HashSet
import org.apache.spark.streaming.Time
+/** Class representing a set of Jobs
+ * belong to the same batch.
+ */
private[streaming]
case class JobSet(time: Time, jobs: Seq[Job]) {
private val incompleteJobs = new HashSet[Job]()
- var submissionTime = System.currentTimeMillis()
- var processingStartTime = -1L
- var processingEndTime = -1L
+ var submissionTime = System.currentTimeMillis() // when this jobset was submitted
+ var processingStartTime = -1L // when the first job of this jobset started processing
+ var processingEndTime = -1L // when the last job of this jobset finished processing
jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) }
incompleteJobs ++= jobs
@@ -44,8 +47,12 @@ case class JobSet(time: Time, jobs: Seq[Job]) {
def hasCompleted() = incompleteJobs.isEmpty
+ // Time taken to process all the jobs from the time they started processing
+ // (i.e. not including the time they wait in the streaming scheduler queue)
def processingDelay = processingEndTime - processingStartTime
+ // Time taken to process all the jobs from the time they were submitted
+ // (i.e. including the time they wait in the streaming scheduler queue)
def totalDelay = {
processingEndTime - time.milliseconds
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 16410a21e3..fa64142096 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.streaming.scheduler._
import scala.collection.mutable.ArrayBuffer
import org.scalatest.matchers.ShouldMatchers
-class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers{
+class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
val input = (1 to 4).map(Seq(_)).toSeq
val operation = (d: DStream[Int]) => d.map(x => x)