aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2014-11-19 16:50:21 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-11-19 16:50:21 -0800
commit04d462f648aba7b18fc293b7189b86af70e421bc (patch)
treed5816c007919740531c942b23a5f99e23cc7c3a6 /core
parentc3002c4a61c4fc5b966aa384c41c3cba33de0aa6 (diff)
downloadspark-04d462f648aba7b18fc293b7189b86af70e421bc.tar.gz
spark-04d462f648aba7b18fc293b7189b86af70e421bc.tar.bz2
spark-04d462f648aba7b18fc293b7189b86af70e421bc.zip
[SPARK-4495] Fix memory leak in JobProgressListener
This commit fixes a memory leak in JobProgressListener that I introduced in SPARK-2321 and adds a testing framework to ensure that it’s very difficult to inadvertently introduce new memory leaks. This solution might be overkill, but the main idea is to partition JobProgressListener's state into three buckets: collections that should be empty once Spark is idle, collections that must obey some hard size limit, and collections that have a soft size limit (they can grow arbitrarily large when Spark is active but must shrink to fit within some bound after Spark becomes idle). Based on this, we can write fairly generic tests that run workloads that submit more than `spark.ui.retainedStages` stages and `spark.ui.retainedJobs` jobs then check that these various collections' sizes obey their contracts. Author: Josh Rosen <joshrosen@databricks.com> Closes #3372 from JoshRosen/SPARK-4495 and squashes the following commits: c73fab5 [Josh Rosen] "data structures" -> collections be72e81 [Josh Rosen] [SPARK-4495] Fix memory leaks in JobProgressListener
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala113
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala100
2 files changed, 170 insertions, 43 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 8bbde51e18..ccdcf0e047 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -40,41 +40,108 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
import JobProgressListener._
+ // Define a handful of type aliases so that data structures' types can serve as documentation.
+ // These type aliases are public because they're used in the types of public fields:
+
type JobId = Int
type StageId = Int
type StageAttemptId = Int
+ type PoolName = String
+ type ExecutorId = String
- // How many stages to remember
- val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
- // How many jobs to remember
- val retailedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)
+ // Define all of our state:
+ // Jobs:
val activeJobs = new HashMap[JobId, JobUIData]
val completedJobs = ListBuffer[JobUIData]()
val failedJobs = ListBuffer[JobUIData]()
val jobIdToData = new HashMap[JobId, JobUIData]
+ // Stages:
val activeStages = new HashMap[StageId, StageInfo]
val completedStages = ListBuffer[StageInfo]()
val failedStages = ListBuffer[StageInfo]()
val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
val stageIdToInfo = new HashMap[StageId, StageInfo]
-
- // Number of completed and failed stages, may not actually equal to completedStages.size and
- // failedStages.size respectively due to completedStage and failedStages only maintain the latest
- // part of the stages, the earlier ones will be removed when there are too many stages for
- // memory sake.
+ val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]()
+ // Total of completed and failed stages that have ever been run. These may be greater than
+ // `completedStages.size` and `failedStages.size` if we have run more stages or jobs than
+ // JobProgressListener's retention limits.
var numCompletedStages = 0
var numFailedStages = 0
- // Map from pool name to a hash map (map from stage id to StageInfo).
- val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
-
- val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()
+ // Misc:
+ val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]()
+ def blockManagerIds = executorIdToBlockManagerId.values.toSeq
var schedulingMode: Option[SchedulingMode] = None
- def blockManagerIds = executorIdToBlockManagerId.values.toSeq
+ // To limit the total memory usage of JobProgressListener, we only track information for a fixed
+ // number of non-active jobs and stages (there is no limit for active jobs and stages):
+
+ val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
+ val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)
+
+ // We can test for memory leaks by ensuring that collections that track non-active jobs and
+ // stages do not grow without bound and that collections for active jobs/stages eventually become
+ // empty once Spark is idle. Let's partition our collections into ones that should be empty
+ // once Spark is idle and ones that should have a hard- or soft-limited sizes.
+ // These methods are used by unit tests, but they're defined here so that people don't forget to
+ // update the tests when adding new collections. Some collections have multiple levels of
+ // nesting, etc, so this lets us customize our notion of "size" for each structure:
+
+ // These collections should all be empty once Spark is idle (no active stages / jobs):
+ private[spark] def getSizesOfActiveStateTrackingCollections: Map[String, Int] = {
+ Map(
+ "activeStages" -> activeStages.size,
+ "activeJobs" -> activeJobs.size,
+ "poolToActiveStages" -> poolToActiveStages.values.map(_.size).sum
+ )
+ }
+
+ // These collections should stop growing once we have run at least `spark.ui.retainedStages`
+ // stages and `spark.ui.retainedJobs` jobs:
+ private[spark] def getSizesOfHardSizeLimitedCollections: Map[String, Int] = {
+ Map(
+ "completedJobs" -> completedJobs.size,
+ "failedJobs" -> failedJobs.size,
+ "completedStages" -> completedStages.size,
+ "failedStages" -> failedStages.size
+ )
+ }
+
+ // These collections may grow arbitrarily, but once Spark becomes idle they should shrink back to
+ // some bound based on the `spark.ui.retainedStages` and `spark.ui.retainedJobs` settings:
+ private[spark] def getSizesOfSoftSizeLimitedCollections: Map[String, Int] = {
+ Map(
+ "jobIdToData" -> jobIdToData.size,
+ "stageIdToData" -> stageIdToData.size,
+ "stageIdToStageInfo" -> stageIdToInfo.size
+ )
+ }
+
+ /** If stages is too large, remove and garbage collect old stages */
+ private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
+ if (stages.size > retainedStages) {
+ val toRemove = math.max(retainedStages / 10, 1)
+ stages.take(toRemove).foreach { s =>
+ stageIdToData.remove((s.stageId, s.attemptId))
+ stageIdToInfo.remove(s.stageId)
+ }
+ stages.trimStart(toRemove)
+ }
+ }
+
+ /** If jobs is too large, remove and garbage collect old jobs */
+ private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized {
+ if (jobs.size > retainedJobs) {
+ val toRemove = math.max(retainedJobs / 10, 1)
+ jobs.take(toRemove).foreach { job =>
+ jobIdToData.remove(job.jobId)
+ }
+ jobs.trimStart(toRemove)
+ }
+ }
override def onJobStart(jobStart: SparkListenerJobStart) = synchronized {
val jobGroup = Option(jobStart.properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
@@ -92,9 +159,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
jobEnd.jobResult match {
case JobSucceeded =>
completedJobs += jobData
+ trimJobsIfNecessary(completedJobs)
jobData.status = JobExecutionStatus.SUCCEEDED
case JobFailed(exception) =>
failedJobs += jobData
+ trimJobsIfNecessary(failedJobs)
jobData.status = JobExecutionStatus.FAILED
}
}
@@ -118,23 +187,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
if (stage.failureReason.isEmpty) {
completedStages += stage
numCompletedStages += 1
- trimIfNecessary(completedStages)
+ trimStagesIfNecessary(completedStages)
} else {
failedStages += stage
numFailedStages += 1
- trimIfNecessary(failedStages)
- }
- }
-
- /** If stages is too large, remove and garbage collect old stages */
- private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
- if (stages.size > retainedStages) {
- val toRemove = math.max(retainedStages / 10, 1)
- stages.take(toRemove).foreach { s =>
- stageIdToData.remove((s.stageId, s.attemptId))
- stageIdToInfo.remove(s.stageId)
- }
- stages.trimStart(toRemove)
+ trimStagesIfNecessary(failedStages)
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 7c102cc7f4..15c5b4e702 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -28,32 +28,102 @@ import org.apache.spark.util.Utils
class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matchers {
- test("test LRU eviction of stages") {
- val conf = new SparkConf()
- conf.set("spark.ui.retainedStages", 5.toString)
- val listener = new JobProgressListener(conf)
- def createStageStartEvent(stageId: Int) = {
- val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
- SparkListenerStageSubmitted(stageInfo)
+ private def createStageStartEvent(stageId: Int) = {
+ val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
+ SparkListenerStageSubmitted(stageInfo)
+ }
+
+ private def createStageEndEvent(stageId: Int, failed: Boolean = false) = {
+ val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
+ if (failed) {
+ stageInfo.failureReason = Some("Failed!")
}
+ SparkListenerStageCompleted(stageInfo)
+ }
+
+ private def createJobStartEvent(jobId: Int, stageIds: Seq[Int]) = {
+ SparkListenerJobStart(jobId, stageIds)
+ }
- def createStageEndEvent(stageId: Int) = {
- val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
- SparkListenerStageCompleted(stageInfo)
+ private def createJobEndEvent(jobId: Int, failed: Boolean = false) = {
+ val result = if (failed) JobFailed(new Exception("dummy failure")) else JobSucceeded
+ SparkListenerJobEnd(jobId, result)
+ }
+
+ private def runJob(listener: SparkListener, jobId: Int, shouldFail: Boolean = false) {
+ val stageIds = jobId * 100 to jobId * 100 + 50
+ listener.onJobStart(createJobStartEvent(jobId, stageIds))
+ for (stageId <- stageIds) {
+ listener.onStageSubmitted(createStageStartEvent(stageId))
+ listener.onStageCompleted(createStageEndEvent(stageId, failed = stageId % 2 == 0))
+ }
+ listener.onJobEnd(createJobEndEvent(jobId, shouldFail))
+ }
+
+ private def assertActiveJobsStateIsEmpty(listener: JobProgressListener) {
+ listener.getSizesOfActiveStateTrackingCollections.foreach { case (fieldName, size) =>
+ assert(size === 0, s"$fieldName was not empty")
}
+ }
+
+ test("test LRU eviction of stages") {
+ val conf = new SparkConf()
+ conf.set("spark.ui.retainedStages", 5.toString)
+ val listener = new JobProgressListener(conf)
for (i <- 1 to 50) {
listener.onStageSubmitted(createStageStartEvent(i))
listener.onStageCompleted(createStageEndEvent(i))
}
+ assertActiveJobsStateIsEmpty(listener)
listener.completedStages.size should be (5)
- listener.completedStages.count(_.stageId == 50) should be (1)
- listener.completedStages.count(_.stageId == 49) should be (1)
- listener.completedStages.count(_.stageId == 48) should be (1)
- listener.completedStages.count(_.stageId == 47) should be (1)
- listener.completedStages.count(_.stageId == 46) should be (1)
+ listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46))
+ }
+
+ test("test LRU eviction of jobs") {
+ val conf = new SparkConf()
+ conf.set("spark.ui.retainedStages", 5.toString)
+ conf.set("spark.ui.retainedJobs", 5.toString)
+ val listener = new JobProgressListener(conf)
+
+ // Run a bunch of jobs to get the listener into a state where we've exceeded both the
+ // job and stage retention limits:
+ for (jobId <- 1 to 10) {
+ runJob(listener, jobId, shouldFail = false)
+ }
+ for (jobId <- 200 to 210) {
+ runJob(listener, jobId, shouldFail = true)
+ }
+ assertActiveJobsStateIsEmpty(listener)
+ // Snapshot the sizes of various soft- and hard-size-limited collections:
+ val softLimitSizes = listener.getSizesOfSoftSizeLimitedCollections
+ val hardLimitSizes = listener.getSizesOfHardSizeLimitedCollections
+ // Run some more jobs:
+ for (jobId <- 11 to 50) {
+ runJob(listener, jobId, shouldFail = false)
+ // We shouldn't exceed the hard / soft limit sizes after the jobs have finished:
+ listener.getSizesOfSoftSizeLimitedCollections should be (softLimitSizes)
+ listener.getSizesOfHardSizeLimitedCollections should be (hardLimitSizes)
+ }
+
+ listener.completedJobs.size should be (5)
+ listener.completedJobs.map(_.jobId).toSet should be (Set(50, 49, 48, 47, 46))
+
+ for (jobId <- 51 to 100) {
+ runJob(listener, jobId, shouldFail = true)
+ // We shouldn't exceed the hard / soft limit sizes after the jobs have finished:
+ listener.getSizesOfSoftSizeLimitedCollections should be (softLimitSizes)
+ listener.getSizesOfHardSizeLimitedCollections should be (hardLimitSizes)
+ }
+ assertActiveJobsStateIsEmpty(listener)
+
+ // Completed and failed jobs each their own size limits, so this should still be the same:
+ listener.completedJobs.size should be (5)
+ listener.completedJobs.map(_.jobId).toSet should be (Set(50, 49, 48, 47, 46))
+ listener.failedJobs.size should be (5)
+ listener.failedJobs.map(_.jobId).toSet should be (Set(100, 99, 98, 97, 96))
}
test("test executor id to summary") {