diff options
author | Matei Zaharia <matei@databricks.com> | 2013-12-24 00:08:48 -0500 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2013-12-24 00:08:48 -0500 |
commit | 23a9ae6be33ab2721005b766615660e07eec739f (patch) | |
tree | fbc3564d1949c8517aebe613ebc09c8e79a8e627 /core | |
parent | 11107c9de524a7397ec99333f5d7b00886217781 (diff) | |
parent | 6eaa0505493511adb040257abc749fcd774bbb68 (diff) | |
download | spark-23a9ae6be33ab2721005b766615660e07eec739f.tar.gz spark-23a9ae6be33ab2721005b766615660e07eec739f.tar.bz2 spark-23a9ae6be33ab2721005b766615660e07eec739f.zip |
Merge pull request #277 from tdas/scheduler-update
Refactored the streaming scheduler and added StreamingListener interface
- Refactored the streaming scheduler for cleaner code. Specifically, the JobManager was renamed to JobScheduler, as it does the actual scheduling of Spark jobs to the SparkContext. The earlier Scheduler was renamed to JobGenerator, as it actually generates the jobs from the DStreams. The JobScheduler starts the JobGenerator. Also, moved all the scheduler related code from spark.streaming to spark.streaming.scheduler package.
- Implemented the StreamingListener interface, similar to SparkListener. The streaming version of StatusReportListener prints the batch processing time statistics (for now). Added StreamingListernerSuite to test it.
- Refactored streaming TestSuiteBase for deduping code in the other streaming testsuites.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala | 1 |
2 files changed, 2 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 3841b5616d..ee63b3c4a1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -63,7 +63,7 @@ trait SparkListener { * Called when a task begins remotely fetching its result (will not be called for tasks that do * not need to fetch the result remotely). */ - def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { } + def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { } /** * Called when a task ends @@ -131,8 +131,8 @@ object StatsReportListener extends Logging { def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) { val stats = d.statCounter - logInfo(heading + stats) val quantiles = d.getQuantiles(probabilities).map{formatNumber} + logInfo(heading + stats) logInfo(percentilesHeader) logInfo("\t" + quantiles.mkString("\t")) } @@ -173,8 +173,6 @@ object StatsReportListener extends Logging { showMillisDistribution(heading, extractLongDistribution(stage, getMetric)) } - - val seconds = 1000L val minutes = seconds * 60 val hours = minutes * 60 @@ -198,7 +196,6 @@ object StatsReportListener extends Logging { } - case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double) object RuntimePercentage { def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index d5824e7954..85687ea330 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -91,4 +91,3 @@ private[spark] class SparkListenerBus() extends Logging { return true
}
}
-
|