From a854f5bfcf4ffeefd2a74fadb695d0aa8d52a431 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 20 Oct 2013 13:18:03 -0700 Subject: SPARK-940: Do not directly pass Stage objects to SparkListener. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 9 ++-- .../org/apache/spark/scheduler/JobLogger.scala | 8 ++-- .../org/apache/spark/scheduler/SparkListener.scala | 10 ++-- .../org/apache/spark/scheduler/StageInfo.scala | 13 ++++- .../apache/spark/ui/jobs/JobProgressListener.scala | 55 ++++++++++++---------- .../scala/org/apache/spark/ui/jobs/PoolTable.scala | 8 ++-- .../org/apache/spark/ui/jobs/StageTable.scala | 24 +++++----- .../apache/spark/scheduler/JobLoggerSuite.scala | 5 +- .../spark/scheduler/SparkListenerSuite.scala | 2 +- 9 files changed, 75 insertions(+), 59 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e58ff37b9b..0dec93ff83 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -585,7 +585,7 @@ class DAGScheduler( // must be run listener before possible NotSerializableException // should be "StageSubmitted" first and then "JobEnded" - listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties)) + listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), tasks.size, properties)) if (tasks.size > 0) { // Preemptively serialize a task to make sure it can be serialized. We are catching this @@ -606,9 +606,9 @@ class DAGScheduler( logDebug("New pending tasks: " + myPending) taskSched.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) - if (!stage.submissionTime.isDefined) { - stage.submissionTime = Some(System.currentTimeMillis()) - } + stage.submissionTime = Some(System.currentTimeMillis()) + stageToInfos(stage).submissionTime = Some(System.currentTimeMillis()) + } else { logDebug("Stage " + stage + " is actually done; %b %d %d".format( stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) @@ -636,6 +636,7 @@ class DAGScheduler( } logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) stage.completionTime = Some(System.currentTimeMillis) + stageToInfos(stage).completionTime = Some(System.currentTimeMillis()) listenerBus.post(StageCompleted(stageToInfos(stage))) running -= stage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 19c0251690..fe1990bcc1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -207,13 +207,13 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { - stageLogInfo(stageSubmitted.stage.id, "STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format( - stageSubmitted.stage.id, stageSubmitted.taskSize)) + stageLogInfo(stageSubmitted.stage.stageId, "STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format( + stageSubmitted.stage.stageId, stageSubmitted.taskSize)) } override def onStageCompleted(stageCompleted: StageCompleted) { - stageLogInfo(stageCompleted.stageInfo.stage.id, - "STAGE_ID=%d STATUS=COMPLETED".format(stageCompleted.stageInfo.stage.id)) + stageLogInfo(stageCompleted.stage.stage.id, "STAGE_ID=%d STATUS=COMPLETED".format( + stageCompleted.stage.stage.id)) } override def onTaskStart(taskStart: SparkListenerTaskStart) { } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 466baf9913..49faa0ba7a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -24,10 +24,10 @@ import org.apache.spark.executor.TaskMetrics sealed trait SparkListenerEvents -case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties) +case class SparkListenerStageSubmitted(stage: StageInfo, taskSize: Int, properties: Properties) extends SparkListenerEvents -case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents +case class StageCompleted(val stage: StageInfo) extends SparkListenerEvents case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents @@ -80,7 +80,7 @@ class StatsReportListener extends SparkListener with Logging { override def onStageCompleted(stageCompleted: StageCompleted) { import org.apache.spark.scheduler.StatsReportListener._ implicit val sc = stageCompleted - this.logInfo("Finished stage: " + stageCompleted.stageInfo) + this.logInfo("Finished stage: " + stageCompleted.stage) showMillisDistribution("task runtime:", (info, _) => Some(info.duration)) //shuffle write @@ -93,7 +93,7 @@ class StatsReportListener extends SparkListener with Logging { //runtime breakdown - val runtimePcts = stageCompleted.stageInfo.taskInfos.map{ + val runtimePcts = stageCompleted.stage.taskInfos.map{ case (info, metrics) => RuntimePercentage(info.duration, metrics) } showDistribution("executor (non-fetch) time pct: ", Distribution(runtimePcts.map{_.executorPct * 100}), "%2.0f %%") @@ -111,7 +111,7 @@ object StatsReportListener extends Logging { val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%" def extractDoubleDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Double]): Option[Distribution] = { - Distribution(stage.stageInfo.taskInfos.flatMap{ + Distribution(stage.stage.taskInfos.flatMap{ case ((info,metric)) => getMetric(info, metric)}) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index b6f11969e5..3156071535 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -22,8 +22,17 @@ import scala.collection._ import org.apache.spark.executor.TaskMetrics case class StageInfo( - val stage: Stage, + stage: Stage, val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] = mutable.Buffer[(TaskInfo, TaskMetrics)]() ) { - override def toString = stage.rdd.toString + val stageId = stage.id + var submissionTime: Option[Long] = None + var completionTime: Option[Long] = None + val rddName = stage.rdd.toString + val name = stage.name + // TODO: We should also track the number of tasks associated with this stage, which may not + // be equal to numPartitions. + val numPartitions = stage.numPartitions + + override def toString = rddName } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index eb3b4e8522..c048e9b1e8 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -36,13 +36,13 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt val DEFAULT_POOL_NAME = "default" - val stageToPool = new HashMap[Stage, String]() - val stageToDescription = new HashMap[Stage, String]() - val poolToActiveStages = new HashMap[String, HashSet[Stage]]() + val stageToPool = new HashMap[Int, String]() + val stageToDescription = new HashMap[Int, String]() + val poolToActiveStages = new HashMap[String, HashSet[StageInfo]]() - val activeStages = HashSet[Stage]() - val completedStages = ListBuffer[Stage]() - val failedStages = ListBuffer[Stage]() + val activeStages = HashSet[StageInfo]() + val completedStages = ListBuffer[StageInfo]() + val failedStages = ListBuffer[StageInfo]() // Total metrics reflect metrics only for completed tasks var totalTime = 0L @@ -61,27 +61,27 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList override def onJobStart(jobStart: SparkListenerJobStart) {} override def onStageCompleted(stageCompleted: StageCompleted) = synchronized { - val stage = stageCompleted.stageInfo.stage - poolToActiveStages(stageToPool(stage)) -= stage + val stage = stageCompleted.stage + poolToActiveStages(stageToPool(stage.stageId)) -= stage activeStages -= stage completedStages += stage trimIfNecessary(completedStages) } /** If stages is too large, remove and garbage collect old stages */ - def trimIfNecessary(stages: ListBuffer[Stage]) = synchronized { + def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > RETAINED_STAGES) { val toRemove = RETAINED_STAGES / 10 stages.takeRight(toRemove).foreach( s => { - stageToTaskInfos.remove(s.id) - stageToTime.remove(s.id) - stageToShuffleRead.remove(s.id) - stageToShuffleWrite.remove(s.id) - stageToTasksActive.remove(s.id) - stageToTasksComplete.remove(s.id) - stageToTasksFailed.remove(s.id) - stageToPool.remove(s) - if (stageToDescription.contains(s)) {stageToDescription.remove(s)} + stageToTaskInfos.remove(s.stageId) + stageToTime.remove(s.stageId) + stageToShuffleRead.remove(s.stageId) + stageToShuffleWrite.remove(s.stageId) + stageToTasksActive.remove(s.stageId) + stageToTasksComplete.remove(s.stageId) + stageToTasksFailed.remove(s.stageId) + stageToPool.remove(s.stageId) + if (stageToDescription.contains(s.stageId)) {stageToDescription.remove(s.stageId)} }) stages.trimEnd(toRemove) } @@ -95,14 +95,14 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val poolName = Option(stageSubmitted.properties).map { p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME) - stageToPool(stage) = poolName + stageToPool(stage.stageId) = poolName val description = Option(stageSubmitted.properties).flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) } - description.map(d => stageToDescription(stage) = d) + description.map(d => stageToDescription(stage.stageId) = d) - val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]()) + val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[StageInfo]()) stages += stage } @@ -159,10 +159,15 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList case end: SparkListenerJobEnd => end.jobResult match { case JobFailed(ex, Some(stage)) => - activeStages -= stage - poolToActiveStages(stageToPool(stage)) -= stage - failedStages += stage - trimIfNecessary(failedStages) + /* If two jobs share a stage we could get this failure message twice. So we first + * check whether we've already retired this stage. */ + val stageInfo = activeStages.filter(s => s.stageId == stage.id).headOption + stageInfo.foreach {s => + activeStages -= s + poolToActiveStages(stageToPool(stage.id)) -= s + failedStages += s + trimIfNecessary(failedStages) + } case _ => } case _ => diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index 06810d8dbc..cfeeccda41 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -21,13 +21,13 @@ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet import scala.xml.Node -import org.apache.spark.scheduler.{Schedulable, Stage} +import org.apache.spark.scheduler.{Schedulable, StageInfo} import org.apache.spark.ui.UIUtils /** Table showing list of pools */ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressListener) { - var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages + var poolToActiveStages: HashMap[String, HashSet[StageInfo]] = listener.poolToActiveStages def toNodeSeq(): Seq[Node] = { listener.synchronized { @@ -35,7 +35,7 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis } } - private def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node], + private def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[StageInfo]]) => Seq[Node], rows: Seq[Schedulable] ): Seq[Node] = { @@ -53,7 +53,7 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
} - private def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]]) + private def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[StageInfo]]) : Seq[Node] = { val activeStages = poolToActiveStages.get(p.name) match { case Some(stages) => stages.size diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 07db8622da..c47878dcd6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -22,13 +22,13 @@ import java.util.Date import scala.xml.Node import scala.collection.mutable.HashSet -import org.apache.spark.scheduler.{SchedulingMode, Stage, TaskInfo} +import org.apache.spark.scheduler.{SchedulingMode, StageInfo, TaskInfo} import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils /** Page showing list of all ongoing and recently finished stages */ -private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressUI) { +private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgressUI) { val listener = parent.listener val dateFmt = parent.dateFmt @@ -73,40 +73,40 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU } - private def stageRow(s: Stage): Seq[Node] = { + private def stageRow(s: StageInfo): Seq[Node] = { val submissionTime = s.submissionTime match { case Some(t) => dateFmt.format(new Date(t)) case None => "Unknown" } - val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match { + val shuffleRead = listener.stageToShuffleRead.getOrElse(s.stageId, 0L) match { case 0 => "" case b => Utils.bytesToString(b) } - val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match { + val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.stageId, 0L) match { case 0 => "" case b => Utils.bytesToString(b) } - val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size - val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0) - val failedTasks = listener.stageToTasksFailed.getOrElse(s.id, 0) match { + val startedTasks = listener.stageToTasksActive.getOrElse(s.stageId, HashSet[TaskInfo]()).size + val completedTasks = listener.stageToTasksComplete.getOrElse(s.stageId, 0) + val failedTasks = listener.stageToTasksFailed.getOrElse(s.stageId, 0) match { case f if f > 0 => "(%s failed)".format(f) case _ => "" } val totalTasks = s.numPartitions - val poolName = listener.stageToPool.get(s) + val poolName = listener.stageToPool.get(s.stageId) val nameLink = - {s.name} - val description = listener.stageToDescription.get(s) + {s.toString} + val description = listener.stageToDescription.get(s.stageId) .map(d =>
{d}
{nameLink}
).getOrElse(nameLink) val finishTime = s.completionTime.getOrElse(System.currentTimeMillis()) val duration = s.submissionTime.map(t => finishTime - t) - {s.id} + {s.stageId} {if (isFairScheduler) { {poolName.get}} diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala index cece60dda7..9c3ca0bb92 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala @@ -60,8 +60,9 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers val rootRdd = makeRdd(4, List(shuffleDep)) val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID, None) val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID, None) - - joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4, null)) + val rootStageInfo = new StageInfo(rootStage) + + joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStageInfo, 4, null)) joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName) parentRdd.setName("MyRDD") joblogger.getRddNameTest(parentRdd) should be ("MyRDD") diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index a549417a47..1fe4e19e89 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -101,7 +101,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc class SaveStageInfo extends SparkListener { val stageInfos = mutable.Buffer[StageInfo]() override def onStageCompleted(stage: StageCompleted) { - stageInfos += stage.stageInfo + stageInfos += stage.stage } } -- cgit v1.2.3