diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala | 20 |
1 files changed, 12 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index ee63b3c4a1..627995c826 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -27,7 +27,7 @@ sealed trait SparkListenerEvents case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties) extends SparkListenerEvents -case class StageCompleted(val stage: StageInfo) extends SparkListenerEvents +case class SparkListenerStageCompleted(val stage: StageInfo) extends SparkListenerEvents case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents @@ -47,7 +47,7 @@ trait SparkListener { /** * Called when a stage is completed, with information on the completed stage */ - def onStageCompleted(stageCompleted: StageCompleted) { } + def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { } /** * Called when a stage is submitted @@ -86,7 +86,7 @@ trait SparkListener { * Simple SparkListener that logs a few summary statistics when each stage completes */ class StatsReportListener extends SparkListener with Logging { - override def onStageCompleted(stageCompleted: StageCompleted) { + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { import org.apache.spark.scheduler.StatsReportListener._ implicit val sc = stageCompleted this.logInfo("Finished stage: " + stageCompleted.stage) @@ -119,13 +119,17 @@ object StatsReportListener extends Logging { val probabilities = percentiles.map{_ / 100.0} val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%" - def extractDoubleDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Double]): Option[Distribution] = { + def extractDoubleDistribution(stage: SparkListenerStageCompleted, + getMetric: (TaskInfo,TaskMetrics) => Option[Double]) + : Option[Distribution] = { Distribution(stage.stage.taskInfos.flatMap { case ((info,metric)) => getMetric(info, metric)}) } //is there some way to setup the types that I can get rid of this completely? - def extractLongDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Long]): Option[Distribution] = { + def extractLongDistribution(stage: SparkListenerStageCompleted, + getMetric: (TaskInfo,TaskMetrics) => Option[Long]) + : Option[Distribution] = { extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble}) } @@ -147,12 +151,12 @@ object StatsReportListener extends Logging { } def showDistribution(heading:String, format: String, getMetric: (TaskInfo,TaskMetrics) => Option[Double]) - (implicit stage: StageCompleted) { + (implicit stage: SparkListenerStageCompleted) { showDistribution(heading, extractDoubleDistribution(stage, getMetric), format) } def showBytesDistribution(heading:String, getMetric: (TaskInfo,TaskMetrics) => Option[Long]) - (implicit stage: StageCompleted) { + (implicit stage: SparkListenerStageCompleted) { showBytesDistribution(heading, extractLongDistribution(stage, getMetric)) } @@ -169,7 +173,7 @@ object StatsReportListener extends Logging { } def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long]) - (implicit stage: StageCompleted) { + (implicit stage: SparkListenerStageCompleted) { showMillisDistribution(heading, extractLongDistribution(stage, getMetric)) } |