aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2013-12-24 00:08:48 -0500
committerMatei Zaharia <matei@databricks.com>2013-12-24 00:08:48 -0500
commit23a9ae6be33ab2721005b766615660e07eec739f (patch)
treefbc3564d1949c8517aebe613ebc09c8e79a8e627 /core/src/main/scala/org/apache
parent11107c9de524a7397ec99333f5d7b00886217781 (diff)
parent6eaa0505493511adb040257abc749fcd774bbb68 (diff)
downloadspark-23a9ae6be33ab2721005b766615660e07eec739f.tar.gz
spark-23a9ae6be33ab2721005b766615660e07eec739f.tar.bz2
spark-23a9ae6be33ab2721005b766615660e07eec739f.zip
Merge pull request #277 from tdas/scheduler-update
Refactored the streaming scheduler and added StreamingListener interface - Refactored the streaming scheduler for cleaner code. Specifically, the JobManager was renamed to JobScheduler, as it does the actual scheduling of Spark jobs to the SparkContext. The earlier Scheduler was renamed to JobGenerator, as it actually generates the jobs from the DStreams. The JobScheduler starts the JobGenerator. Also, moved all the scheduler related code from spark.streaming to spark.streaming.scheduler package. - Implemented the StreamingListener interface, similar to SparkListener. The streaming version of StatusReportListener prints the batch processing time statistics (for now). Added StreamingListernerSuite to test it. - Refactored streaming TestSuiteBase for deduping code in the other streaming testsuites.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala1
2 files changed, 2 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 3841b5616d..ee63b3c4a1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -63,7 +63,7 @@ trait SparkListener {
* Called when a task begins remotely fetching its result (will not be called for tasks that do
* not need to fetch the result remotely).
*/
- def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { }
+ def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { }
/**
* Called when a task ends
@@ -131,8 +131,8 @@ object StatsReportListener extends Logging {
def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
val stats = d.statCounter
- logInfo(heading + stats)
val quantiles = d.getQuantiles(probabilities).map{formatNumber}
+ logInfo(heading + stats)
logInfo(percentilesHeader)
logInfo("\t" + quantiles.mkString("\t"))
}
@@ -173,8 +173,6 @@ object StatsReportListener extends Logging {
showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
}
-
-
val seconds = 1000L
val minutes = seconds * 60
val hours = minutes * 60
@@ -198,7 +196,6 @@ object StatsReportListener extends Logging {
}
-
case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
object RuntimePercentage {
def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index d5824e7954..85687ea330 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -91,4 +91,3 @@ private[spark] class SparkListenerBus() extends Logging {
return true
}
}
-