From 29b79714f94cda0db0b56746b29cf60830d076ad Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Fri, 9 Aug 2013 16:46:34 -0700 Subject: Style fixes based on code review --- .../src/main/scala/spark/scheduler/JobLogger.scala | 3 +- .../scheduler/SparkListenerEventProcessor.scala | 26 ++- .../scala/spark/ui/jobs/JobProgressListener.scala | 213 +++++++++------------ 3 files changed, 110 insertions(+), 132 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index 7ceb2f22f6..7194fcaa49 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -23,10 +23,11 @@ import java.io.FileNotFoundException import java.text.SimpleDateFormat import java.util.{Date, Properties} import java.util.concurrent.LinkedBlockingQueue + import scala.collection.mutable.{Map, HashMap, ListBuffer} import scala.io.Source + import spark._ -import spark.SparkContext import spark.executor.TaskMetrics import spark.scheduler.cluster.TaskInfo diff --git a/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala b/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala index 6d21c9d836..43864c711f 100644 --- a/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala +++ b/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala @@ -17,16 +17,20 @@ package spark.scheduler -import scala.collection.mutable.ArrayBuffer - import java.util.concurrent.LinkedBlockingQueue +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} + +import spark.Logging + /** Asynchronously passes SparkListenerEvents to registered SparkListeners. */ -class SparkListenerEventProcessor() { - /* sparkListeners is not thread safe, so this assumes that listeners are all added before any - * SparkListenerEvents occur. */ - private val sparkListeners = ArrayBuffer[SparkListener]() - private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents] +class SparkListenerEventProcessor() extends Logging { + private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener] + + /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than + * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ + private val EVENT_QUEUE_CAPACITY = 10000 + private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY) new Thread("SparkListenerEventProcessor") { setDaemon(true) @@ -57,6 +61,12 @@ class SparkListenerEventProcessor() { } def addEvent(event: SparkListenerEvents) { - eventQueue.put(event) + val eventAdded = eventQueue.offer(event) + if (!eventAdded) { + logError("Dropping SparkListenerEvent because no remaining room in event queue. " + + "This likely means one of the SparkListeners is too slow and cannot keep up with the " + + "rate at which tasks are being started by the scheduler.") + } } } + diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala index ee6468dc9a..8886951d1f 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala @@ -9,7 +9,8 @@ import spark.scheduler.cluster.TaskInfo import spark.executor.TaskMetrics import collection.mutable -/** Tracks task-level information to be displayed in the UI. +/** + * Tracks task-level information to be displayed in the UI. * * All access to the data structures in this class must be synchronized on the * class, since the UI thread and the DAGScheduler event loop may otherwise @@ -44,146 +45,112 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList override def onJobStart(jobStart: SparkListenerJobStart) {} - override def onStageCompleted(stageCompleted: StageCompleted) = { - this.synchronized { - val stage = stageCompleted.stageInfo.stage - poolToActiveStages(stageToPool(stage)) -= stage - activeStages -= stage - completedStages += stage - trimIfNecessary(completedStages) - } + override def onStageCompleted(stageCompleted: StageCompleted) : Unit = synchronized { + val stage = stageCompleted.stageInfo.stage + poolToActiveStages(stageToPool(stage)) -= stage + activeStages -= stage + completedStages += stage + trimIfNecessary(completedStages) } /** If stages is too large, remove and garbage collect old stages */ - def trimIfNecessary(stages: ListBuffer[Stage]) { - this.synchronized { - if (stages.size > RETAINED_STAGES) { - val toRemove = RETAINED_STAGES / 10 - stages.takeRight(toRemove).foreach( s => { - stageToTaskInfos.remove(s.id) - stageToTime.remove(s.id) - stageToShuffleRead.remove(s.id) - stageToShuffleWrite.remove(s.id) - stageToTasksActive.remove(s.id) - stageToTasksComplete.remove(s.id) - stageToTasksFailed.remove(s.id) - stageToPool.remove(s) - if (stageToDescription.contains(s)) {stageToDescription.remove(s)} - }) - stages.trimEnd(toRemove) - } + def trimIfNecessary(stages: ListBuffer[Stage]): Unit = synchronized { + if (stages.size > RETAINED_STAGES) { + val toRemove = RETAINED_STAGES / 10 + stages.takeRight(toRemove).foreach( s => { + stageToTaskInfos.remove(s.id) + stageToTime.remove(s.id) + stageToShuffleRead.remove(s.id) + stageToShuffleWrite.remove(s.id) + stageToTasksActive.remove(s.id) + stageToTasksComplete.remove(s.id) + stageToTasksFailed.remove(s.id) + stageToPool.remove(s) + if (stageToDescription.contains(s)) {stageToDescription.remove(s)} + }) + stages.trimEnd(toRemove) } } /** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */ - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = { - this.synchronized { - val stage = stageSubmitted.stage - activeStages += stage - - val poolName = Option(stageSubmitted.properties).map { - p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME) - }.getOrElse(DEFAULT_POOL_NAME) - stageToPool(stage) = poolName - - val description = Option(stageSubmitted.properties).flatMap { - p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) - } - description.map(d => stageToDescription(stage) = d) + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) : Unit = synchronized { + val stage = stageSubmitted.stage + activeStages += stage - val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]()) - stages += stage + val poolName = Option(stageSubmitted.properties).map { + p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME) + }.getOrElse(DEFAULT_POOL_NAME) + stageToPool(stage) = poolName + + val description = Option(stageSubmitted.properties).flatMap { + p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) } + description.map(d => stageToDescription(stage) = d) + + val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]()) + stages += stage } - override def onTaskStart(taskStart: SparkListenerTaskStart) { - this.synchronized { - val sid = taskStart.task.stageId - val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) - tasksActive += taskStart.taskInfo - val taskList = stageToTaskInfos.getOrElse( - sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) - taskList += ((taskStart.taskInfo, None, None)) - stageToTaskInfos(sid) = taskList - } + override def onTaskStart(taskStart: SparkListenerTaskStart) : Unit = synchronized { + val sid = taskStart.task.stageId + val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) + tasksActive += taskStart.taskInfo + val taskList = stageToTaskInfos.getOrElse( + sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) + taskList += ((taskStart.taskInfo, None, None)) + stageToTaskInfos(sid) = taskList } - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { - this.synchronized { - val sid = taskEnd.task.stageId - val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) - tasksActive -= taskEnd.taskInfo - val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = - taskEnd.reason match { - case e: ExceptionFailure => - stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 - (Some(e), e.metrics) - case _ => - stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 - (None, Option(taskEnd.taskMetrics)) - } - - stageToTime.getOrElseUpdate(sid, 0L) - val time = metrics.map(m => m.executorRunTime).getOrElse(0) - stageToTime(sid) += time - totalTime += time - - stageToShuffleRead.getOrElseUpdate(sid, 0L) - val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s => - s.remoteBytesRead).getOrElse(0L) - stageToShuffleRead(sid) += shuffleRead - totalShuffleRead += shuffleRead - - stageToShuffleWrite.getOrElseUpdate(sid, 0L) - val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s => - s.shuffleBytesWritten).getOrElse(0L) - stageToShuffleWrite(sid) += shuffleWrite - totalShuffleWrite += shuffleWrite - - val taskList = stageToTaskInfos.getOrElse( - sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) - taskList -= ((taskEnd.taskInfo, None, None)) - taskList += ((taskEnd.taskInfo, metrics, failureInfo)) - stageToTaskInfos(sid) = taskList - } - } - - override def onJobEnd(jobEnd: SparkListenerJobEnd) { - this.synchronized { - jobEnd match { - case end: SparkListenerJobEnd => - end.jobResult match { - case JobFailed(ex, Some(stage)) => - activeStages -= stage - poolToActiveStages(stageToPool(stage)) -= stage - failedStages += stage - trimIfNecessary(failedStages) - case _ => - } + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) : Unit = synchronized { + val sid = taskEnd.task.stageId + val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) + tasksActive -= taskEnd.taskInfo + val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = + taskEnd.reason match { + case e: ExceptionFailure => + stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 + (Some(e), e.metrics) case _ => + stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 + (None, Option(taskEnd.taskMetrics)) } - } - } - /** Is this stage's input from a shuffle read. */ - def hasShuffleRead(stageID: Int): Boolean = { - this.synchronized { - // This is written in a slightly complicated way to avoid having to scan all tasks - for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { - if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined - } - return false // No tasks have finished for this stage - } + stageToTime.getOrElseUpdate(sid, 0L) + val time = metrics.map(m => m.executorRunTime).getOrElse(0) + stageToTime(sid) += time + totalTime += time + + stageToShuffleRead.getOrElseUpdate(sid, 0L) + val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s => + s.remoteBytesRead).getOrElse(0L) + stageToShuffleRead(sid) += shuffleRead + totalShuffleRead += shuffleRead + + stageToShuffleWrite.getOrElseUpdate(sid, 0L) + val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s => + s.shuffleBytesWritten).getOrElse(0L) + stageToShuffleWrite(sid) += shuffleWrite + totalShuffleWrite += shuffleWrite + + val taskList = stageToTaskInfos.getOrElse( + sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) + taskList -= ((taskEnd.taskInfo, None, None)) + taskList += ((taskEnd.taskInfo, metrics, failureInfo)) + stageToTaskInfos(sid) = taskList } - /** Is this stage's output to a shuffle write. */ - def hasShuffleWrite(stageID: Int): Boolean = { - this.synchronized { - // This is written in a slightly complicated way to avoid having to scan all tasks - for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { - if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined - } - return false // No tasks have finished for this stage + override def onJobEnd(jobEnd: SparkListenerJobEnd) : Unit = synchronized { + jobEnd match { + case end: SparkListenerJobEnd => + end.jobResult match { + case JobFailed(ex, Some(stage)) => + activeStages -= stage + poolToActiveStages(stageToPool(stage)) -= stage + failedStages += stage + trimIfNecessary(failedStages) + case _ => + } + case _ => } } } -- cgit v1.2.3