aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala113
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala100
2 files changed, 170 insertions, 43 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 8bbde51e18..ccdcf0e047 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -40,41 +40,108 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
import JobProgressListener._
+ // Define a handful of type aliases so that data structures' types can serve as documentation.
+ // These type aliases are public because they're used in the types of public fields:
+
type JobId = Int
type StageId = Int
type StageAttemptId = Int
+ type PoolName = String
+ type ExecutorId = String
- // How many stages to remember
- val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
- // How many jobs to remember
- val retailedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)
+ // Define all of our state:
+ // Jobs:
val activeJobs = new HashMap[JobId, JobUIData]
val completedJobs = ListBuffer[JobUIData]()
val failedJobs = ListBuffer[JobUIData]()
val jobIdToData = new HashMap[JobId, JobUIData]
+ // Stages:
val activeStages = new HashMap[StageId, StageInfo]
val completedStages = ListBuffer[StageInfo]()
val failedStages = ListBuffer[StageInfo]()
val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
val stageIdToInfo = new HashMap[StageId, StageInfo]
-
- // Number of completed and failed stages, may not actually equal to completedStages.size and
- // failedStages.size respectively due to completedStage and failedStages only maintain the latest
- // part of the stages, the earlier ones will be removed when there are too many stages for
- // memory sake.
+ val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]()
+ // Total of completed and failed stages that have ever been run. These may be greater than
+ // `completedStages.size` and `failedStages.size` if we have run more stages or jobs than
+ // JobProgressListener's retention limits.
var numCompletedStages = 0
var numFailedStages = 0
- // Map from pool name to a hash map (map from stage id to StageInfo).
- val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
-
- val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()
+ // Misc:
+ val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]()
+ def blockManagerIds = executorIdToBlockManagerId.values.toSeq
var schedulingMode: Option[SchedulingMode] = None
- def blockManagerIds = executorIdToBlockManagerId.values.toSeq
+ // To limit the total memory usage of JobProgressListener, we only track information for a fixed
+ // number of non-active jobs and stages (there is no limit for active jobs and stages):
+
+ val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
+ val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)
+
+ // We can test for memory leaks by ensuring that collections that track non-active jobs and
+ // stages do not grow without bound and that collections for active jobs/stages eventually become
+ // empty once Spark is idle. Let's partition our collections into ones that should be empty
+ // once Spark is idle and ones that should have a hard- or soft-limited sizes.
+ // These methods are used by unit tests, but they're defined here so that people don't forget to
+ // update the tests when adding new collections. Some collections have multiple levels of
+ // nesting, etc, so this lets us customize our notion of "size" for each structure:
+
+ // These collections should all be empty once Spark is idle (no active stages / jobs):
+ private[spark] def getSizesOfActiveStateTrackingCollections: Map[String, Int] = {
+ Map(
+ "activeStages" -> activeStages.size,
+ "activeJobs" -> activeJobs.size,
+ "poolToActiveStages" -> poolToActiveStages.values.map(_.size).sum
+ )
+ }
+
+ // These collections should stop growing once we have run at least `spark.ui.retainedStages`
+ // stages and `spark.ui.retainedJobs` jobs:
+ private[spark] def getSizesOfHardSizeLimitedCollections: Map[String, Int] = {
+ Map(
+ "completedJobs" -> completedJobs.size,
+ "failedJobs" -> failedJobs.size,
+ "completedStages" -> completedStages.size,
+ "failedStages" -> failedStages.size
+ )
+ }
+
+ // These collections may grow arbitrarily, but once Spark becomes idle they should shrink back to
+ // some bound based on the `spark.ui.retainedStages` and `spark.ui.retainedJobs` settings:
+ private[spark] def getSizesOfSoftSizeLimitedCollections: Map[String, Int] = {
+ Map(
+ "jobIdToData" -> jobIdToData.size,
+ "stageIdToData" -> stageIdToData.size,
+ "stageIdToStageInfo" -> stageIdToInfo.size
+ )
+ }
+
+ /** If stages is too large, remove and garbage collect old stages */
+ private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
+ if (stages.size > retainedStages) {
+ val toRemove = math.max(retainedStages / 10, 1)
+ stages.take(toRemove).foreach { s =>
+ stageIdToData.remove((s.stageId, s.attemptId))
+ stageIdToInfo.remove(s.stageId)
+ }
+ stages.trimStart(toRemove)
+ }
+ }
+
+ /** If jobs is too large, remove and garbage collect old jobs */
+ private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized {
+ if (jobs.size > retainedJobs) {
+ val toRemove = math.max(retainedJobs / 10, 1)
+ jobs.take(toRemove).foreach { job =>
+ jobIdToData.remove(job.jobId)
+ }
+ jobs.trimStart(toRemove)
+ }
+ }
override def onJobStart(jobStart: SparkListenerJobStart) = synchronized {
val jobGroup = Option(jobStart.properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
@@ -92,9 +159,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
jobEnd.jobResult match {
case JobSucceeded =>
completedJobs += jobData
+ trimJobsIfNecessary(completedJobs)
jobData.status = JobExecutionStatus.SUCCEEDED
case JobFailed(exception) =>
failedJobs += jobData
+ trimJobsIfNecessary(failedJobs)
jobData.status = JobExecutionStatus.FAILED
}
}
@@ -118,23 +187,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
if (stage.failureReason.isEmpty) {
completedStages += stage
numCompletedStages += 1
- trimIfNecessary(completedStages)
+ trimStagesIfNecessary(completedStages)
} else {
failedStages += stage
numFailedStages += 1
- trimIfNecessary(failedStages)
- }
- }
-
- /** If stages is too large, remove and garbage collect old stages */
- private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
- if (stages.size > retainedStages) {
- val toRemove = math.max(retainedStages / 10, 1)
- stages.take(toRemove).foreach { s =>
- stageIdToData.remove((s.stageId, s.attemptId))
- stageIdToInfo.remove(s.stageId)
- }
- stages.trimStart(toRemove)
+ trimStagesIfNecessary(failedStages)
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 7c102cc7f4..15c5b4e702 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -28,32 +28,102 @@ import org.apache.spark.util.Utils
class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matchers {
- test("test LRU eviction of stages") {
- val conf = new SparkConf()
- conf.set("spark.ui.retainedStages", 5.toString)
- val listener = new JobProgressListener(conf)
- def createStageStartEvent(stageId: Int) = {
- val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
- SparkListenerStageSubmitted(stageInfo)
+ private def createStageStartEvent(stageId: Int) = {
+ val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
+ SparkListenerStageSubmitted(stageInfo)
+ }
+
+ private def createStageEndEvent(stageId: Int, failed: Boolean = false) = {
+ val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
+ if (failed) {
+ stageInfo.failureReason = Some("Failed!")
}
+ SparkListenerStageCompleted(stageInfo)
+ }
+
+ private def createJobStartEvent(jobId: Int, stageIds: Seq[Int]) = {
+ SparkListenerJobStart(jobId, stageIds)
+ }
- def createStageEndEvent(stageId: Int) = {
- val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
- SparkListenerStageCompleted(stageInfo)
+ private def createJobEndEvent(jobId: Int, failed: Boolean = false) = {
+ val result = if (failed) JobFailed(new Exception("dummy failure")) else JobSucceeded
+ SparkListenerJobEnd(jobId, result)
+ }
+
+ private def runJob(listener: SparkListener, jobId: Int, shouldFail: Boolean = false) {
+ val stageIds = jobId * 100 to jobId * 100 + 50
+ listener.onJobStart(createJobStartEvent(jobId, stageIds))
+ for (stageId <- stageIds) {
+ listener.onStageSubmitted(createStageStartEvent(stageId))
+ listener.onStageCompleted(createStageEndEvent(stageId, failed = stageId % 2 == 0))
+ }
+ listener.onJobEnd(createJobEndEvent(jobId, shouldFail))
+ }
+
+ private def assertActiveJobsStateIsEmpty(listener: JobProgressListener) {
+ listener.getSizesOfActiveStateTrackingCollections.foreach { case (fieldName, size) =>
+ assert(size === 0, s"$fieldName was not empty")
}
+ }
+
+ test("test LRU eviction of stages") {
+ val conf = new SparkConf()
+ conf.set("spark.ui.retainedStages", 5.toString)
+ val listener = new JobProgressListener(conf)
for (i <- 1 to 50) {
listener.onStageSubmitted(createStageStartEvent(i))
listener.onStageCompleted(createStageEndEvent(i))
}
+ assertActiveJobsStateIsEmpty(listener)
listener.completedStages.size should be (5)
- listener.completedStages.count(_.stageId == 50) should be (1)
- listener.completedStages.count(_.stageId == 49) should be (1)
- listener.completedStages.count(_.stageId == 48) should be (1)
- listener.completedStages.count(_.stageId == 47) should be (1)
- listener.completedStages.count(_.stageId == 46) should be (1)
+ listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46))
+ }
+
+ test("test LRU eviction of jobs") {
+ val conf = new SparkConf()
+ conf.set("spark.ui.retainedStages", 5.toString)
+ conf.set("spark.ui.retainedJobs", 5.toString)
+ val listener = new JobProgressListener(conf)
+
+ // Run a bunch of jobs to get the listener into a state where we've exceeded both the
+ // job and stage retention limits:
+ for (jobId <- 1 to 10) {
+ runJob(listener, jobId, shouldFail = false)
+ }
+ for (jobId <- 200 to 210) {
+ runJob(listener, jobId, shouldFail = true)
+ }
+ assertActiveJobsStateIsEmpty(listener)
+ // Snapshot the sizes of various soft- and hard-size-limited collections:
+ val softLimitSizes = listener.getSizesOfSoftSizeLimitedCollections
+ val hardLimitSizes = listener.getSizesOfHardSizeLimitedCollections
+ // Run some more jobs:
+ for (jobId <- 11 to 50) {
+ runJob(listener, jobId, shouldFail = false)
+ // We shouldn't exceed the hard / soft limit sizes after the jobs have finished:
+ listener.getSizesOfSoftSizeLimitedCollections should be (softLimitSizes)
+ listener.getSizesOfHardSizeLimitedCollections should be (hardLimitSizes)
+ }
+
+ listener.completedJobs.size should be (5)
+ listener.completedJobs.map(_.jobId).toSet should be (Set(50, 49, 48, 47, 46))
+
+ for (jobId <- 51 to 100) {
+ runJob(listener, jobId, shouldFail = true)
+ // We shouldn't exceed the hard / soft limit sizes after the jobs have finished:
+ listener.getSizesOfSoftSizeLimitedCollections should be (softLimitSizes)
+ listener.getSizesOfHardSizeLimitedCollections should be (hardLimitSizes)
+ }
+ assertActiveJobsStateIsEmpty(listener)
+
+ // Completed and failed jobs each their own size limits, so this should still be the same:
+ listener.completedJobs.size should be (5)
+ listener.completedJobs.map(_.jobId).toSet should be (Set(50, 49, 48, 47, 46))
+ listener.failedJobs.size should be (5)
+ listener.failedJobs.map(_.jobId).toSet should be (Set(100, 99, 98, 97, 96))
}
test("test executor id to summary") {