diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala | 21 |
1 files changed, 15 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 62b521ad45..a35081f7b1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -24,13 +24,16 @@ import org.apache.spark.executor.TaskMetrics sealed trait SparkListenerEvents -case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties) +case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties) extends SparkListenerEvents -case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents +case class StageCompleted(val stage: StageInfo) extends SparkListenerEvents case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents +case class SparkListenerTaskGettingResult( + task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents + case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo, taskMetrics: TaskMetrics) extends SparkListenerEvents @@ -54,7 +57,13 @@ trait SparkListener { /** * Called when a task starts */ - def onTaskStart(taskEnd: SparkListenerTaskStart) { } + def onTaskStart(taskStart: SparkListenerTaskStart) { } + + /** + * Called when a task begins remotely fetching its result (will not be called for tasks that do + * not need to fetch the result remotely). + */ + def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { } /** * Called when a task ends @@ -80,7 +89,7 @@ class StatsReportListener extends SparkListener with Logging { override def onStageCompleted(stageCompleted: StageCompleted) { import org.apache.spark.scheduler.StatsReportListener._ implicit val sc = stageCompleted - this.logInfo("Finished stage: " + stageCompleted.stageInfo) + this.logInfo("Finished stage: " + stageCompleted.stage) showMillisDistribution("task runtime:", (info, _) => Some(info.duration)) //shuffle write @@ -93,7 +102,7 @@ class StatsReportListener extends SparkListener with Logging { //runtime breakdown - val runtimePcts = stageCompleted.stageInfo.taskInfos.map{ + val runtimePcts = stageCompleted.stage.taskInfos.map{ case (info, metrics) => RuntimePercentage(info.duration, metrics) } showDistribution("executor (non-fetch) time pct: ", Distribution(runtimePcts.map{_.executorPct * 100}), "%2.0f %%") @@ -111,7 +120,7 @@ object StatsReportListener extends Logging { val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%" def extractDoubleDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Double]): Option[Distribution] = { - Distribution(stage.stageInfo.taskInfos.flatMap{ + Distribution(stage.stage.taskInfos.flatMap { case ((info,metric)) => getMetric(info, metric)}) } |