aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorAlex Bozarth <ajbozart@us.ibm.com>2016-08-24 14:39:41 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-08-24 14:39:41 -0500
commit891ac2b914fb6f90a62c6fbc0a3960a89d1c1d92 (patch)
tree19d1eea2303c7b29c310047d40e04a1b5f694cc7 /core/src/main/scala
parent40b30fcf453169534cb53d01cd22236210b13005 (diff)
downloadspark-891ac2b914fb6f90a62c6fbc0a3960a89d1c1d92.tar.gz
spark-891ac2b914fb6f90a62c6fbc0a3960a89d1c1d92.tar.bz2
spark-891ac2b914fb6f90a62c6fbc0a3960a89d1c1d92.zip
[SPARK-15083][WEB UI] History Server can OOM due to unlimited TaskUIData
## What changes were proposed in this pull request? Based on #12990 by tankkyo Since the History Server currently loads all application's data it can OOM if too many applications have a significant task count. `spark.ui.trimTasks` (default: false) can be set to true to trim tasks by `spark.ui.retainedTasks` (default: 10000) (This is a "quick fix" to help those running into the problem until a update of how the history server loads app data can be done) ## How was this patch tested? Manual testing and dev/run-tests ![spark-15083](https://cloud.githubusercontent.com/assets/13952758/17713694/fe82d246-63b0-11e6-9697-b87ea75ff4ef.png) Author: Alex Bozarth <ajbozart@us.ibm.com> Closes #14673 from ajbozarth/spark15083.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala4
4 files changed, 25 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index be3dac4d24..47174e4efe 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -114,4 +114,9 @@ package object config {
private[spark] val PYSPARK_PYTHON = ConfigBuilder("spark.pyspark.python")
.stringConf
.createOptional
+
+ // To limit memory usage, we only track information for a fixed number of tasks
+ private[spark] val UI_RETAINED_TASKS = ConfigBuilder("spark.ui.retainedTasks")
+ .intConf
+ .createWithDefault(100000)
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 491f7160bc..d3a4f9d322 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -19,12 +19,13 @@ package org.apache.spark.ui.jobs
import java.util.concurrent.TimeoutException
-import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
+import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap, ListBuffer}
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
@@ -93,6 +94,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
+ val retainedTasks = conf.get(UI_RETAINED_TASKS)
// We can test for memory leaks by ensuring that collections that track non-active jobs and
// stages do not grow without bound and that collections for active jobs/stages eventually become
@@ -405,6 +407,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
taskData.updateTaskMetrics(taskMetrics)
taskData.errorMessage = errorMessage
+ // If Tasks is too large, remove and garbage collect old tasks
+ if (stageData.taskData.size > retainedTasks) {
+ stageData.taskData = stageData.taskData.drop(stageData.taskData.size - retainedTasks)
+ }
+
for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskEnd.stageId);
jobId <- activeJobsDependentOnStage;
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index ea7acc4734..a266164587 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -133,7 +133,14 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val stageData = stageDataOption.get
val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)
- val numCompleted = tasks.count(_.taskInfo.finished)
+ val numCompleted = stageData.numCompleteTasks
+ val totalTasks = stageData.numActiveTasks +
+ stageData.numCompleteTasks + stageData.numFailedTasks
+ val totalTasksNumStr = if (totalTasks == tasks.size) {
+ s"$totalTasks"
+ } else {
+ s"$totalTasks, showing ${tasks.size}"
+ }
val allAccumulables = progressListener.stageIdToData((stageId, stageAttemptId)).accumulables
val externalAccumulables = allAccumulables.values.filter { acc => !acc.internal }
@@ -591,7 +598,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
aggMetrics ++
maybeAccumulableTable ++
- <h4 id="tasks-section">Tasks</h4> ++ taskTableHTML ++ jsForScrollingDownToTaskTable
+ <h4 id="tasks-section">Tasks ({totalTasksNumStr})</h4> ++
+ taskTableHTML ++ jsForScrollingDownToTaskTable
UIUtils.headerSparkPage(stageHeader, content, parent, showVisualization = true)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 20dde7cec8..66b88129ee 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -18,7 +18,7 @@
package org.apache.spark.ui.jobs
import scala.collection.mutable
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{HashMap, LinkedHashMap}
import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
@@ -97,7 +97,7 @@ private[spark] object UIData {
var description: Option[String] = None
var accumulables = new HashMap[Long, AccumulableInfo]
- var taskData = new HashMap[Long, TaskUIData]
+ var taskData = new LinkedHashMap[Long, TaskUIData]
var executorSummary = new HashMap[String, ExecutorSummary]
def hasInput: Boolean = inputBytes > 0