diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/JobProgressListener.scala | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala index 8886951d1f..1d9767a83c 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala @@ -45,7 +45,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList override def onJobStart(jobStart: SparkListenerJobStart) {} - override def onStageCompleted(stageCompleted: StageCompleted) : Unit = synchronized { + override def onStageCompleted(stageCompleted: StageCompleted) = synchronized { val stage = stageCompleted.stageInfo.stage poolToActiveStages(stageToPool(stage)) -= stage activeStages -= stage @@ -54,7 +54,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList } /** If stages is too large, remove and garbage collect old stages */ - def trimIfNecessary(stages: ListBuffer[Stage]): Unit = synchronized { + def trimIfNecessary(stages: ListBuffer[Stage]) = synchronized { if (stages.size > RETAINED_STAGES) { val toRemove = RETAINED_STAGES / 10 stages.takeRight(toRemove).foreach( s => { @@ -73,7 +73,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList } /** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */ - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) : Unit = synchronized { + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized { val stage = stageSubmitted.stage activeStages += stage @@ -91,7 +91,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList stages += stage } - override def onTaskStart(taskStart: SparkListenerTaskStart) : Unit = synchronized { + override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { val sid = taskStart.task.stageId val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) tasksActive += taskStart.taskInfo @@ -101,7 +101,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList stageToTaskInfos(sid) = taskList } - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) : Unit = synchronized { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val sid = taskEnd.task.stageId val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) tasksActive -= taskEnd.taskInfo @@ -139,7 +139,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList stageToTaskInfos(sid) = taskList } - override def onJobEnd(jobEnd: SparkListenerJobEnd) : Unit = synchronized { + override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized { jobEnd match { case end: SparkListenerJobEnd => end.jobResult match { |