aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayo@yahoo-inc.com>2013-08-09 16:46:34 -0700
committerKay Ousterhout <kayo@yahoo-inc.com>2013-08-09 16:46:34 -0700
commit29b79714f94cda0db0b56746b29cf60830d076ad (patch)
tree7ee56700966970e71c205d7f4795545d1e1e1910 /core
parent81e1d4a7d19cefe4bbf0fe6ec54b4f894d7b21d0 (diff)
downloadspark-29b79714f94cda0db0b56746b29cf60830d076ad.tar.gz
spark-29b79714f94cda0db0b56746b29cf60830d076ad.tar.bz2
spark-29b79714f94cda0db0b56746b29cf60830d076ad.zip
Style fixes based on code review
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/JobLogger.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala26
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressListener.scala213
3 files changed, 110 insertions, 132 deletions
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
index 7ceb2f22f6..7194fcaa49 100644
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -23,10 +23,11 @@ import java.io.FileNotFoundException
import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import java.util.concurrent.LinkedBlockingQueue
+
import scala.collection.mutable.{Map, HashMap, ListBuffer}
import scala.io.Source
+
import spark._
-import spark.SparkContext
import spark.executor.TaskMetrics
import spark.scheduler.cluster.TaskInfo
diff --git a/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala b/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala
index 6d21c9d836..43864c711f 100644
--- a/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala
@@ -17,16 +17,20 @@
package spark.scheduler
-import scala.collection.mutable.ArrayBuffer
-
import java.util.concurrent.LinkedBlockingQueue
+import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
+
+import spark.Logging
+
/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
-class SparkListenerEventProcessor() {
- /* sparkListeners is not thread safe, so this assumes that listeners are all added before any
- * SparkListenerEvents occur. */
- private val sparkListeners = ArrayBuffer[SparkListener]()
- private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents]
+class SparkListenerEventProcessor() extends Logging {
+ private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener]
+
+ /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
+ * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
+ private val EVENT_QUEUE_CAPACITY = 10000
+ private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
new Thread("SparkListenerEventProcessor") {
setDaemon(true)
@@ -57,6 +61,12 @@ class SparkListenerEventProcessor() {
}
def addEvent(event: SparkListenerEvents) {
- eventQueue.put(event)
+ val eventAdded = eventQueue.offer(event)
+ if (!eventAdded) {
+ logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
+ "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
+ "rate at which tasks are being started by the scheduler.")
+ }
}
}
+
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
index ee6468dc9a..8886951d1f 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
@@ -9,7 +9,8 @@ import spark.scheduler.cluster.TaskInfo
import spark.executor.TaskMetrics
import collection.mutable
-/** Tracks task-level information to be displayed in the UI.
+/**
+ * Tracks task-level information to be displayed in the UI.
*
* All access to the data structures in this class must be synchronized on the
* class, since the UI thread and the DAGScheduler event loop may otherwise
@@ -44,146 +45,112 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
override def onJobStart(jobStart: SparkListenerJobStart) {}
- override def onStageCompleted(stageCompleted: StageCompleted) = {
- this.synchronized {
- val stage = stageCompleted.stageInfo.stage
- poolToActiveStages(stageToPool(stage)) -= stage
- activeStages -= stage
- completedStages += stage
- trimIfNecessary(completedStages)
- }
+ override def onStageCompleted(stageCompleted: StageCompleted) : Unit = synchronized {
+ val stage = stageCompleted.stageInfo.stage
+ poolToActiveStages(stageToPool(stage)) -= stage
+ activeStages -= stage
+ completedStages += stage
+ trimIfNecessary(completedStages)
}
/** If stages is too large, remove and garbage collect old stages */
- def trimIfNecessary(stages: ListBuffer[Stage]) {
- this.synchronized {
- if (stages.size > RETAINED_STAGES) {
- val toRemove = RETAINED_STAGES / 10
- stages.takeRight(toRemove).foreach( s => {
- stageToTaskInfos.remove(s.id)
- stageToTime.remove(s.id)
- stageToShuffleRead.remove(s.id)
- stageToShuffleWrite.remove(s.id)
- stageToTasksActive.remove(s.id)
- stageToTasksComplete.remove(s.id)
- stageToTasksFailed.remove(s.id)
- stageToPool.remove(s)
- if (stageToDescription.contains(s)) {stageToDescription.remove(s)}
- })
- stages.trimEnd(toRemove)
- }
+ def trimIfNecessary(stages: ListBuffer[Stage]): Unit = synchronized {
+ if (stages.size > RETAINED_STAGES) {
+ val toRemove = RETAINED_STAGES / 10
+ stages.takeRight(toRemove).foreach( s => {
+ stageToTaskInfos.remove(s.id)
+ stageToTime.remove(s.id)
+ stageToShuffleRead.remove(s.id)
+ stageToShuffleWrite.remove(s.id)
+ stageToTasksActive.remove(s.id)
+ stageToTasksComplete.remove(s.id)
+ stageToTasksFailed.remove(s.id)
+ stageToPool.remove(s)
+ if (stageToDescription.contains(s)) {stageToDescription.remove(s)}
+ })
+ stages.trimEnd(toRemove)
}
}
/** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */
- override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = {
- this.synchronized {
- val stage = stageSubmitted.stage
- activeStages += stage
-
- val poolName = Option(stageSubmitted.properties).map {
- p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
- }.getOrElse(DEFAULT_POOL_NAME)
- stageToPool(stage) = poolName
-
- val description = Option(stageSubmitted.properties).flatMap {
- p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
- }
- description.map(d => stageToDescription(stage) = d)
+ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) : Unit = synchronized {
+ val stage = stageSubmitted.stage
+ activeStages += stage
- val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
- stages += stage
+ val poolName = Option(stageSubmitted.properties).map {
+ p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
+ }.getOrElse(DEFAULT_POOL_NAME)
+ stageToPool(stage) = poolName
+
+ val description = Option(stageSubmitted.properties).flatMap {
+ p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
}
+ description.map(d => stageToDescription(stage) = d)
+
+ val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
+ stages += stage
}
- override def onTaskStart(taskStart: SparkListenerTaskStart) {
- this.synchronized {
- val sid = taskStart.task.stageId
- val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
- tasksActive += taskStart.taskInfo
- val taskList = stageToTaskInfos.getOrElse(
- sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
- taskList += ((taskStart.taskInfo, None, None))
- stageToTaskInfos(sid) = taskList
- }
+ override def onTaskStart(taskStart: SparkListenerTaskStart) : Unit = synchronized {
+ val sid = taskStart.task.stageId
+ val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+ tasksActive += taskStart.taskInfo
+ val taskList = stageToTaskInfos.getOrElse(
+ sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList += ((taskStart.taskInfo, None, None))
+ stageToTaskInfos(sid) = taskList
}
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- this.synchronized {
- val sid = taskEnd.task.stageId
- val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
- tasksActive -= taskEnd.taskInfo
- val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
- taskEnd.reason match {
- case e: ExceptionFailure =>
- stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
- (Some(e), e.metrics)
- case _ =>
- stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
- (None, Option(taskEnd.taskMetrics))
- }
-
- stageToTime.getOrElseUpdate(sid, 0L)
- val time = metrics.map(m => m.executorRunTime).getOrElse(0)
- stageToTime(sid) += time
- totalTime += time
-
- stageToShuffleRead.getOrElseUpdate(sid, 0L)
- val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
- s.remoteBytesRead).getOrElse(0L)
- stageToShuffleRead(sid) += shuffleRead
- totalShuffleRead += shuffleRead
-
- stageToShuffleWrite.getOrElseUpdate(sid, 0L)
- val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
- s.shuffleBytesWritten).getOrElse(0L)
- stageToShuffleWrite(sid) += shuffleWrite
- totalShuffleWrite += shuffleWrite
-
- val taskList = stageToTaskInfos.getOrElse(
- sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
- taskList -= ((taskEnd.taskInfo, None, None))
- taskList += ((taskEnd.taskInfo, metrics, failureInfo))
- stageToTaskInfos(sid) = taskList
- }
- }
-
- override def onJobEnd(jobEnd: SparkListenerJobEnd) {
- this.synchronized {
- jobEnd match {
- case end: SparkListenerJobEnd =>
- end.jobResult match {
- case JobFailed(ex, Some(stage)) =>
- activeStages -= stage
- poolToActiveStages(stageToPool(stage)) -= stage
- failedStages += stage
- trimIfNecessary(failedStages)
- case _ =>
- }
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) : Unit = synchronized {
+ val sid = taskEnd.task.stageId
+ val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+ tasksActive -= taskEnd.taskInfo
+ val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
+ taskEnd.reason match {
+ case e: ExceptionFailure =>
+ stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
+ (Some(e), e.metrics)
case _ =>
+ stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
+ (None, Option(taskEnd.taskMetrics))
}
- }
- }
- /** Is this stage's input from a shuffle read. */
- def hasShuffleRead(stageID: Int): Boolean = {
- this.synchronized {
- // This is written in a slightly complicated way to avoid having to scan all tasks
- for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
- if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
- }
- return false // No tasks have finished for this stage
- }
+ stageToTime.getOrElseUpdate(sid, 0L)
+ val time = metrics.map(m => m.executorRunTime).getOrElse(0)
+ stageToTime(sid) += time
+ totalTime += time
+
+ stageToShuffleRead.getOrElseUpdate(sid, 0L)
+ val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
+ s.remoteBytesRead).getOrElse(0L)
+ stageToShuffleRead(sid) += shuffleRead
+ totalShuffleRead += shuffleRead
+
+ stageToShuffleWrite.getOrElseUpdate(sid, 0L)
+ val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
+ s.shuffleBytesWritten).getOrElse(0L)
+ stageToShuffleWrite(sid) += shuffleWrite
+ totalShuffleWrite += shuffleWrite
+
+ val taskList = stageToTaskInfos.getOrElse(
+ sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList -= ((taskEnd.taskInfo, None, None))
+ taskList += ((taskEnd.taskInfo, metrics, failureInfo))
+ stageToTaskInfos(sid) = taskList
}
- /** Is this stage's output to a shuffle write. */
- def hasShuffleWrite(stageID: Int): Boolean = {
- this.synchronized {
- // This is written in a slightly complicated way to avoid having to scan all tasks
- for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
- if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
- }
- return false // No tasks have finished for this stage
+ override def onJobEnd(jobEnd: SparkListenerJobEnd) : Unit = synchronized {
+ jobEnd match {
+ case end: SparkListenerJobEnd =>
+ end.jobResult match {
+ case JobFailed(ex, Some(stage)) =>
+ activeStages -= stage
+ poolToActiveStages(stageToPool(stage)) -= stage
+ failedStages += stage
+ trimIfNecessary(failedStages)
+ case _ =>
+ }
+ case _ =>
}
}
}