diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 113 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala | 100 |
2 files changed, 170 insertions, 43 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 8bbde51e18..ccdcf0e047 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -40,41 +40,108 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { import JobProgressListener._ + // Define a handful of type aliases so that data structures' types can serve as documentation. + // These type aliases are public because they're used in the types of public fields: + type JobId = Int type StageId = Int type StageAttemptId = Int + type PoolName = String + type ExecutorId = String - // How many stages to remember - val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) - // How many jobs to remember - val retailedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS) + // Define all of our state: + // Jobs: val activeJobs = new HashMap[JobId, JobUIData] val completedJobs = ListBuffer[JobUIData]() val failedJobs = ListBuffer[JobUIData]() val jobIdToData = new HashMap[JobId, JobUIData] + // Stages: val activeStages = new HashMap[StageId, StageInfo] val completedStages = ListBuffer[StageInfo]() val failedStages = ListBuffer[StageInfo]() val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData] val stageIdToInfo = new HashMap[StageId, StageInfo] - - // Number of completed and failed stages, may not actually equal to completedStages.size and - // failedStages.size respectively due to completedStage and failedStages only maintain the latest - // part of the stages, the earlier ones will be removed when there are too many stages for - // memory sake. + val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]() + // Total of completed and failed stages that have ever been run. These may be greater than + // `completedStages.size` and `failedStages.size` if we have run more stages or jobs than + // JobProgressListener's retention limits. var numCompletedStages = 0 var numFailedStages = 0 - // Map from pool name to a hash map (map from stage id to StageInfo). - val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]() - - val executorIdToBlockManagerId = HashMap[String, BlockManagerId]() + // Misc: + val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]() + def blockManagerIds = executorIdToBlockManagerId.values.toSeq var schedulingMode: Option[SchedulingMode] = None - def blockManagerIds = executorIdToBlockManagerId.values.toSeq + // To limit the total memory usage of JobProgressListener, we only track information for a fixed + // number of non-active jobs and stages (there is no limit for active jobs and stages): + + val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) + val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS) + + // We can test for memory leaks by ensuring that collections that track non-active jobs and + // stages do not grow without bound and that collections for active jobs/stages eventually become + // empty once Spark is idle. Let's partition our collections into ones that should be empty + // once Spark is idle and ones that should have a hard- or soft-limited sizes. + // These methods are used by unit tests, but they're defined here so that people don't forget to + // update the tests when adding new collections. Some collections have multiple levels of + // nesting, etc, so this lets us customize our notion of "size" for each structure: + + // These collections should all be empty once Spark is idle (no active stages / jobs): + private[spark] def getSizesOfActiveStateTrackingCollections: Map[String, Int] = { + Map( + "activeStages" -> activeStages.size, + "activeJobs" -> activeJobs.size, + "poolToActiveStages" -> poolToActiveStages.values.map(_.size).sum + ) + } + + // These collections should stop growing once we have run at least `spark.ui.retainedStages` + // stages and `spark.ui.retainedJobs` jobs: + private[spark] def getSizesOfHardSizeLimitedCollections: Map[String, Int] = { + Map( + "completedJobs" -> completedJobs.size, + "failedJobs" -> failedJobs.size, + "completedStages" -> completedStages.size, + "failedStages" -> failedStages.size + ) + } + + // These collections may grow arbitrarily, but once Spark becomes idle they should shrink back to + // some bound based on the `spark.ui.retainedStages` and `spark.ui.retainedJobs` settings: + private[spark] def getSizesOfSoftSizeLimitedCollections: Map[String, Int] = { + Map( + "jobIdToData" -> jobIdToData.size, + "stageIdToData" -> stageIdToData.size, + "stageIdToStageInfo" -> stageIdToInfo.size + ) + } + + /** If stages is too large, remove and garbage collect old stages */ + private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { + if (stages.size > retainedStages) { + val toRemove = math.max(retainedStages / 10, 1) + stages.take(toRemove).foreach { s => + stageIdToData.remove((s.stageId, s.attemptId)) + stageIdToInfo.remove(s.stageId) + } + stages.trimStart(toRemove) + } + } + + /** If jobs is too large, remove and garbage collect old jobs */ + private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized { + if (jobs.size > retainedJobs) { + val toRemove = math.max(retainedJobs / 10, 1) + jobs.take(toRemove).foreach { job => + jobIdToData.remove(job.jobId) + } + jobs.trimStart(toRemove) + } + } override def onJobStart(jobStart: SparkListenerJobStart) = synchronized { val jobGroup = Option(jobStart.properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) @@ -92,9 +159,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { jobEnd.jobResult match { case JobSucceeded => completedJobs += jobData + trimJobsIfNecessary(completedJobs) jobData.status = JobExecutionStatus.SUCCEEDED case JobFailed(exception) => failedJobs += jobData + trimJobsIfNecessary(failedJobs) jobData.status = JobExecutionStatus.FAILED } } @@ -118,23 +187,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { if (stage.failureReason.isEmpty) { completedStages += stage numCompletedStages += 1 - trimIfNecessary(completedStages) + trimStagesIfNecessary(completedStages) } else { failedStages += stage numFailedStages += 1 - trimIfNecessary(failedStages) - } - } - - /** If stages is too large, remove and garbage collect old stages */ - private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { - if (stages.size > retainedStages) { - val toRemove = math.max(retainedStages / 10, 1) - stages.take(toRemove).foreach { s => - stageIdToData.remove((s.stageId, s.attemptId)) - stageIdToInfo.remove(s.stageId) - } - stages.trimStart(toRemove) + trimStagesIfNecessary(failedStages) } } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 7c102cc7f4..15c5b4e702 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -28,32 +28,102 @@ import org.apache.spark.util.Utils class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matchers { - test("test LRU eviction of stages") { - val conf = new SparkConf() - conf.set("spark.ui.retainedStages", 5.toString) - val listener = new JobProgressListener(conf) - def createStageStartEvent(stageId: Int) = { - val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "") - SparkListenerStageSubmitted(stageInfo) + private def createStageStartEvent(stageId: Int) = { + val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "") + SparkListenerStageSubmitted(stageInfo) + } + + private def createStageEndEvent(stageId: Int, failed: Boolean = false) = { + val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "") + if (failed) { + stageInfo.failureReason = Some("Failed!") } + SparkListenerStageCompleted(stageInfo) + } + + private def createJobStartEvent(jobId: Int, stageIds: Seq[Int]) = { + SparkListenerJobStart(jobId, stageIds) + } - def createStageEndEvent(stageId: Int) = { - val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "") - SparkListenerStageCompleted(stageInfo) + private def createJobEndEvent(jobId: Int, failed: Boolean = false) = { + val result = if (failed) JobFailed(new Exception("dummy failure")) else JobSucceeded + SparkListenerJobEnd(jobId, result) + } + + private def runJob(listener: SparkListener, jobId: Int, shouldFail: Boolean = false) { + val stageIds = jobId * 100 to jobId * 100 + 50 + listener.onJobStart(createJobStartEvent(jobId, stageIds)) + for (stageId <- stageIds) { + listener.onStageSubmitted(createStageStartEvent(stageId)) + listener.onStageCompleted(createStageEndEvent(stageId, failed = stageId % 2 == 0)) + } + listener.onJobEnd(createJobEndEvent(jobId, shouldFail)) + } + + private def assertActiveJobsStateIsEmpty(listener: JobProgressListener) { + listener.getSizesOfActiveStateTrackingCollections.foreach { case (fieldName, size) => + assert(size === 0, s"$fieldName was not empty") } + } + + test("test LRU eviction of stages") { + val conf = new SparkConf() + conf.set("spark.ui.retainedStages", 5.toString) + val listener = new JobProgressListener(conf) for (i <- 1 to 50) { listener.onStageSubmitted(createStageStartEvent(i)) listener.onStageCompleted(createStageEndEvent(i)) } + assertActiveJobsStateIsEmpty(listener) listener.completedStages.size should be (5) - listener.completedStages.count(_.stageId == 50) should be (1) - listener.completedStages.count(_.stageId == 49) should be (1) - listener.completedStages.count(_.stageId == 48) should be (1) - listener.completedStages.count(_.stageId == 47) should be (1) - listener.completedStages.count(_.stageId == 46) should be (1) + listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46)) + } + + test("test LRU eviction of jobs") { + val conf = new SparkConf() + conf.set("spark.ui.retainedStages", 5.toString) + conf.set("spark.ui.retainedJobs", 5.toString) + val listener = new JobProgressListener(conf) + + // Run a bunch of jobs to get the listener into a state where we've exceeded both the + // job and stage retention limits: + for (jobId <- 1 to 10) { + runJob(listener, jobId, shouldFail = false) + } + for (jobId <- 200 to 210) { + runJob(listener, jobId, shouldFail = true) + } + assertActiveJobsStateIsEmpty(listener) + // Snapshot the sizes of various soft- and hard-size-limited collections: + val softLimitSizes = listener.getSizesOfSoftSizeLimitedCollections + val hardLimitSizes = listener.getSizesOfHardSizeLimitedCollections + // Run some more jobs: + for (jobId <- 11 to 50) { + runJob(listener, jobId, shouldFail = false) + // We shouldn't exceed the hard / soft limit sizes after the jobs have finished: + listener.getSizesOfSoftSizeLimitedCollections should be (softLimitSizes) + listener.getSizesOfHardSizeLimitedCollections should be (hardLimitSizes) + } + + listener.completedJobs.size should be (5) + listener.completedJobs.map(_.jobId).toSet should be (Set(50, 49, 48, 47, 46)) + + for (jobId <- 51 to 100) { + runJob(listener, jobId, shouldFail = true) + // We shouldn't exceed the hard / soft limit sizes after the jobs have finished: + listener.getSizesOfSoftSizeLimitedCollections should be (softLimitSizes) + listener.getSizesOfHardSizeLimitedCollections should be (hardLimitSizes) + } + assertActiveJobsStateIsEmpty(listener) + + // Completed and failed jobs each their own size limits, so this should still be the same: + listener.completedJobs.size should be (5) + listener.completedJobs.map(_.jobId).toSet should be (Set(50, 49, 48, 47, 46)) + listener.failedJobs.size should be (5) + listener.failedJobs.map(_.jobId).toSet should be (Set(100, 99, 98, 97, 96)) } test("test executor id to summary") { |