aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-05-19 12:05:17 -0700
committerAndrew Or <andrew@databricks.com>2016-05-19 12:05:17 -0700
commit4e3cb7a5d965fd490390398ecfe35f1fc05e8511 (patch)
treeaa7b9e92c8196e6d02fbed6e8a5dd1ec3754de0c /core
parent6ac1c3a040f88fae15c46acd73e7e3691f7d3619 (diff)
downloadspark-4e3cb7a5d965fd490390398ecfe35f1fc05e8511.tar.gz
spark-4e3cb7a5d965fd490390398ecfe35f1fc05e8511.tar.bz2
spark-4e3cb7a5d965fd490390398ecfe35f1fc05e8511.zip
[SPARK-15317][CORE] Don't store accumulators for every task in listeners
## What changes were proposed in this pull request? In general, the Web UI doesn't need to store the Accumulator/AccumulableInfo for every task. It only needs the Accumulator values. In this PR, it creates new UIData classes to store the necessary fields and make `JobProgressListener` store only these new classes, so that `JobProgressListener` won't store Accumulator/AccumulableInfo and the size of `JobProgressListener` becomes pretty small. I also eliminates `AccumulableInfo` from `SQLListener` so that we don't keep any references for those unused `AccumulableInfo`s. ## How was this patch tested? I ran two tests reported in JIRA locally: The first one is: ``` val data = spark.range(0, 10000, 1, 10000) data.cache().count() ``` The retained size of JobProgressListener decreases from 60.7M to 6.9M. The second one is: ``` import org.apache.spark.ml.CC import org.apache.spark.sql.SQLContext val sqlContext = SQLContext.getOrCreate(sc) CC.runTest(sqlContext) ``` This test won't cause OOM after applying this patch. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13153 from zsxwing/memory.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala136
-rw-r--r--core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala29
7 files changed, 172 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index eddc36edc9..7d63a8f734 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -20,10 +20,10 @@ import java.util.{Arrays, Date, List => JList}
import javax.ws.rs.{GET, Produces, QueryParam}
import javax.ws.rs.core.MediaType
-import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, OutputMetrics => InternalOutputMetrics, ShuffleReadMetrics => InternalShuffleReadMetrics, ShuffleWriteMetrics => InternalShuffleWriteMetrics, TaskMetrics => InternalTaskMetrics}
import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo}
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData}
+import org.apache.spark.ui.jobs.UIData.{InputMetricsUIData => InternalInputMetrics, OutputMetricsUIData => InternalOutputMetrics, ShuffleReadMetricsUIData => InternalShuffleReadMetrics, ShuffleWriteMetricsUIData => InternalShuffleWriteMetrics, TaskMetricsUIData => InternalTaskMetrics}
import org.apache.spark.util.Distribution
@Produces(Array(MediaType.APPLICATION_JSON))
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 945830c8bf..842f42b4c9 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -332,7 +332,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
new StageUIData
})
stageData.numActiveTasks += 1
- stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo, Some(metrics)))
+ stageData.taskData.put(taskInfo.taskId, TaskUIData(taskInfo, Some(metrics)))
}
for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId);
@@ -395,9 +395,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
}
- val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info))
- taskData.taskInfo = info
- taskData.metrics = taskMetrics
+ val taskData = stageData.taskData.getOrElseUpdate(info.taskId, TaskUIData(info, None))
+ taskData.updateTaskInfo(info)
+ taskData.updateTaskMetrics(taskMetrics)
taskData.errorMessage = errorMessage
for (
@@ -425,7 +425,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageData: StageUIData,
execId: String,
taskMetrics: TaskMetrics,
- oldMetrics: Option[TaskMetrics]) {
+ oldMetrics: Option[TaskMetricsUIData]) {
val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new ExecutorSummary)
val shuffleWriteDelta =
@@ -503,7 +503,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
if (!t.taskInfo.finished) {
updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.metrics)
// Overwrite task metrics
- t.metrics = Some(metrics)
+ t.updateTaskMetrics(Some(metrics))
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 6ddabfd8ef..d986a55959 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -768,7 +768,7 @@ private[ui] object StagePage {
}
private[ui] def getSchedulerDelay(
- info: TaskInfo, metrics: TaskMetrics, currentTime: Long): Long = {
+ info: TaskInfo, metrics: TaskMetricsUIData, currentTime: Long): Long = {
if (info.finished) {
val totalExecutionTime = info.finishTime - info.launchTime
val executorOverhead = (metrics.executorDeserializeTime +
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index b454ef1b20..d76a0e657c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -21,8 +21,10 @@ import scala.collection.mutable
import scala.collection.mutable.HashMap
import org.apache.spark.JobExecutionStatus
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
+import org.apache.spark.storage.{BlockId, BlockStatus}
+import org.apache.spark.util.AccumulatorContext
import org.apache.spark.util.collection.OpenHashSet
private[spark] object UIData {
@@ -105,13 +107,137 @@ private[spark] object UIData {
/**
* These are kept mutable and reused throughout a task's lifetime to avoid excessive reallocation.
*/
- class TaskUIData(
- var taskInfo: TaskInfo,
- var metrics: Option[TaskMetrics] = None,
- var errorMessage: Option[String] = None)
+ class TaskUIData private(
+ private var _taskInfo: TaskInfo,
+ private var _metrics: Option[TaskMetricsUIData]) {
+
+ var errorMessage: Option[String] = None
+
+ def taskInfo: TaskInfo = _taskInfo
+
+ def metrics: Option[TaskMetricsUIData] = _metrics
+
+ def updateTaskInfo(taskInfo: TaskInfo): Unit = {
+ _taskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo)
+ }
+
+ def updateTaskMetrics(metrics: Option[TaskMetrics]): Unit = {
+ _metrics = TaskUIData.toTaskMetricsUIData(metrics)
+ }
+ }
+
+ object TaskUIData {
+ def apply(taskInfo: TaskInfo, metrics: Option[TaskMetrics]): TaskUIData = {
+ new TaskUIData(dropInternalAndSQLAccumulables(taskInfo), toTaskMetricsUIData(metrics))
+ }
+
+ private def toTaskMetricsUIData(metrics: Option[TaskMetrics]): Option[TaskMetricsUIData] = {
+ metrics.map { m =>
+ TaskMetricsUIData(
+ executorDeserializeTime = m.executorDeserializeTime,
+ executorRunTime = m.executorRunTime,
+ resultSize = m.resultSize,
+ jvmGCTime = m.jvmGCTime,
+ resultSerializationTime = m.resultSerializationTime,
+ memoryBytesSpilled = m.memoryBytesSpilled,
+ diskBytesSpilled = m.diskBytesSpilled,
+ peakExecutionMemory = m.peakExecutionMemory,
+ updatedBlockStatuses = m.updatedBlockStatuses.toList,
+ inputMetrics = InputMetricsUIData(m.inputMetrics.bytesRead, m.inputMetrics.recordsRead),
+ outputMetrics =
+ OutputMetricsUIData(m.outputMetrics.bytesWritten, m.outputMetrics.recordsWritten),
+ shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
+ shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
+ }
+ }
+
+ /**
+ * We don't need to store internal or SQL accumulables as their values will be shown in other
+ * places, so drop them to reduce the memory usage.
+ */
+ private[spark] def dropInternalAndSQLAccumulables(taskInfo: TaskInfo): TaskInfo = {
+ val newTaskInfo = new TaskInfo(
+ taskId = taskInfo.taskId,
+ index = taskInfo.index,
+ attemptNumber = taskInfo.attemptNumber,
+ launchTime = taskInfo.launchTime,
+ executorId = taskInfo.executorId,
+ host = taskInfo.host,
+ taskLocality = taskInfo.taskLocality,
+ speculative = taskInfo.speculative
+ )
+ newTaskInfo.gettingResultTime = taskInfo.gettingResultTime
+ newTaskInfo.accumulables ++= taskInfo.accumulables.filter {
+ accum => !accum.internal && accum.metadata != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)
+ }
+ newTaskInfo.finishTime = taskInfo.finishTime
+ newTaskInfo.failed = taskInfo.failed
+ newTaskInfo
+ }
+ }
class ExecutorUIData(
val startTime: Long,
var finishTime: Option[Long] = None,
var finishReason: Option[String] = None)
+
+ case class TaskMetricsUIData(
+ executorDeserializeTime: Long,
+ executorRunTime: Long,
+ resultSize: Long,
+ jvmGCTime: Long,
+ resultSerializationTime: Long,
+ memoryBytesSpilled: Long,
+ diskBytesSpilled: Long,
+ peakExecutionMemory: Long,
+ updatedBlockStatuses: Seq[(BlockId, BlockStatus)],
+ inputMetrics: InputMetricsUIData,
+ outputMetrics: OutputMetricsUIData,
+ shuffleReadMetrics: ShuffleReadMetricsUIData,
+ shuffleWriteMetrics: ShuffleWriteMetricsUIData)
+
+ case class InputMetricsUIData(bytesRead: Long, recordsRead: Long)
+
+ case class OutputMetricsUIData(bytesWritten: Long, recordsWritten: Long)
+
+ case class ShuffleReadMetricsUIData(
+ remoteBlocksFetched: Long,
+ localBlocksFetched: Long,
+ remoteBytesRead: Long,
+ localBytesRead: Long,
+ fetchWaitTime: Long,
+ recordsRead: Long,
+ totalBytesRead: Long,
+ totalBlocksFetched: Long)
+
+ object ShuffleReadMetricsUIData {
+ def apply(metrics: ShuffleReadMetrics): ShuffleReadMetricsUIData = {
+ new ShuffleReadMetricsUIData(
+ remoteBlocksFetched = metrics.remoteBlocksFetched,
+ localBlocksFetched = metrics.localBlocksFetched,
+ remoteBytesRead = metrics.remoteBytesRead,
+ localBytesRead = metrics.localBytesRead,
+ fetchWaitTime = metrics.fetchWaitTime,
+ recordsRead = metrics.recordsRead,
+ totalBytesRead = metrics.totalBytesRead,
+ totalBlocksFetched = metrics.totalBlocksFetched
+ )
+ }
+ }
+
+ case class ShuffleWriteMetricsUIData(
+ bytesWritten: Long,
+ recordsWritten: Long,
+ writeTime: Long)
+
+ object ShuffleWriteMetricsUIData {
+ def apply(metrics: ShuffleWriteMetrics): ShuffleWriteMetricsUIData = {
+ new ShuffleWriteMetricsUIData(
+ bytesWritten = metrics.bytesWritten,
+ recordsWritten = metrics.recordsWritten,
+ writeTime = metrics.writeTime
+ )
+ }
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 0cf9df084f..13cb6a28c3 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -255,6 +255,9 @@ private[spark] object AccumulatorContext {
def clear(): Unit = {
originals.clear()
}
+
+ // Identifier for distinguishing SQL metrics from other accumulators
+ private[spark] val SQL_ACCUM_IDENTIFIER = "sql"
}
diff --git a/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala b/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala
index d223af1496..f684e16c25 100644
--- a/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala
@@ -30,8 +30,8 @@ class AllStagesResourceSuite extends SparkFunSuite {
def getFirstTaskLaunchTime(taskLaunchTimes: Seq[Long]): Option[Date] = {
val tasks = new HashMap[Long, TaskUIData]
taskLaunchTimes.zipWithIndex.foreach { case (time, idx) =>
- tasks(idx.toLong) = new TaskUIData(
- new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false), None, None)
+ tasks(idx.toLong) = TaskUIData(
+ new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false), None)
}
val stageUiData = new StageUIData()
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index ce7d51d1c3..6f7c9f282a 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -25,7 +25,8 @@ import org.apache.spark._
import org.apache.spark.{LocalSparkContext, SparkConf, Success}
import org.apache.spark.executor._
import org.apache.spark.scheduler._
-import org.apache.spark.util.Utils
+import org.apache.spark.ui.jobs.UIData.TaskUIData
+import org.apache.spark.util.{AccumulatorContext, Utils}
class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers {
@@ -359,4 +360,30 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
assert(
stage1Data.taskData.get(1237L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 402)
}
+
+ test("drop internal and sql accumulators") {
+ val taskInfo = new TaskInfo(0, 0, 0, 0, "", "", TaskLocality.ANY, false)
+ val internalAccum =
+ AccumulableInfo(id = 1, name = Some("internal"), None, None, internal = true, false)
+ val sqlAccum = AccumulableInfo(
+ id = 2,
+ name = Some("sql"),
+ None,
+ None,
+ internal = false,
+ countFailedValues = false,
+ metadata = Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
+ val userAccum = AccumulableInfo(
+ id = 3,
+ name = Some("user"),
+ None,
+ None,
+ internal = false,
+ countFailedValues = false,
+ metadata = None)
+ taskInfo.accumulables ++= Seq(internalAccum, sqlAccum, userAccum)
+
+ val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo)
+ assert(newTaskInfo.accumulables === Seq(userAccum))
+ }
}