diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-05-19 12:05:17 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-05-19 12:05:17 -0700 |
commit | 4e3cb7a5d965fd490390398ecfe35f1fc05e8511 (patch) | |
tree | aa7b9e92c8196e6d02fbed6e8a5dd1ec3754de0c | |
parent | 6ac1c3a040f88fae15c46acd73e7e3691f7d3619 (diff) | |
download | spark-4e3cb7a5d965fd490390398ecfe35f1fc05e8511.tar.gz spark-4e3cb7a5d965fd490390398ecfe35f1fc05e8511.tar.bz2 spark-4e3cb7a5d965fd490390398ecfe35f1fc05e8511.zip |
[SPARK-15317][CORE] Don't store accumulators for every task in listeners
## What changes were proposed in this pull request?
In general, the Web UI doesn't need to store the Accumulator/AccumulableInfo for every task. It only needs the Accumulator values.
In this PR, it creates new UIData classes to store the necessary fields and make `JobProgressListener` store only these new classes, so that `JobProgressListener` won't store Accumulator/AccumulableInfo and the size of `JobProgressListener` becomes pretty small. I also eliminates `AccumulableInfo` from `SQLListener` so that we don't keep any references for those unused `AccumulableInfo`s.
## How was this patch tested?
I ran two tests reported in JIRA locally:
The first one is:
```
val data = spark.range(0, 10000, 1, 10000)
data.cache().count()
```
The retained size of JobProgressListener decreases from 60.7M to 6.9M.
The second one is:
```
import org.apache.spark.ml.CC
import org.apache.spark.sql.SQLContext
val sqlContext = SQLContext.getOrCreate(sc)
CC.runTest(sqlContext)
```
This test won't cause OOM after applying this patch.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #13153 from zsxwing/memory.
11 files changed, 186 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index eddc36edc9..7d63a8f734 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -20,10 +20,10 @@ import java.util.{Arrays, Date, List => JList} import javax.ws.rs.{GET, Produces, QueryParam} import javax.ws.rs.core.MediaType -import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, OutputMetrics => InternalOutputMetrics, ShuffleReadMetrics => InternalShuffleReadMetrics, ShuffleWriteMetrics => InternalShuffleWriteMetrics, TaskMetrics => InternalTaskMetrics} import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo} import org.apache.spark.ui.SparkUI import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData} +import org.apache.spark.ui.jobs.UIData.{InputMetricsUIData => InternalInputMetrics, OutputMetricsUIData => InternalOutputMetrics, ShuffleReadMetricsUIData => InternalShuffleReadMetrics, ShuffleWriteMetricsUIData => InternalShuffleWriteMetrics, TaskMetricsUIData => InternalTaskMetrics} import org.apache.spark.util.Distribution @Produces(Array(MediaType.APPLICATION_JSON)) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 945830c8bf..842f42b4c9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -332,7 +332,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { new StageUIData }) stageData.numActiveTasks += 1 - stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo, Some(metrics))) + stageData.taskData.put(taskInfo.taskId, TaskUIData(taskInfo, Some(metrics))) } for ( activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId); @@ -395,9 +395,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { updateAggregateMetrics(stageData, info.executorId, m, oldMetrics) } - val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info)) - taskData.taskInfo = info - taskData.metrics = taskMetrics + val taskData = stageData.taskData.getOrElseUpdate(info.taskId, TaskUIData(info, None)) + taskData.updateTaskInfo(info) + taskData.updateTaskMetrics(taskMetrics) taskData.errorMessage = errorMessage for ( @@ -425,7 +425,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData: StageUIData, execId: String, taskMetrics: TaskMetrics, - oldMetrics: Option[TaskMetrics]) { + oldMetrics: Option[TaskMetricsUIData]) { val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new ExecutorSummary) val shuffleWriteDelta = @@ -503,7 +503,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { if (!t.taskInfo.finished) { updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.metrics) // Overwrite task metrics - t.metrics = Some(metrics) + t.updateTaskMetrics(Some(metrics)) } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 6ddabfd8ef..d986a55959 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -768,7 +768,7 @@ private[ui] object StagePage { } private[ui] def getSchedulerDelay( - info: TaskInfo, metrics: TaskMetrics, currentTime: Long): Long = { + info: TaskInfo, metrics: TaskMetricsUIData, currentTime: Long): Long = { if (info.finished) { val totalExecutionTime = info.finishTime - info.launchTime val executorOverhead = (metrics.executorDeserializeTime + diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index b454ef1b20..d76a0e657c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -21,8 +21,10 @@ import scala.collection.mutable import scala.collection.mutable.HashMap import org.apache.spark.JobExecutionStatus -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics} import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} +import org.apache.spark.storage.{BlockId, BlockStatus} +import org.apache.spark.util.AccumulatorContext import org.apache.spark.util.collection.OpenHashSet private[spark] object UIData { @@ -105,13 +107,137 @@ private[spark] object UIData { /** * These are kept mutable and reused throughout a task's lifetime to avoid excessive reallocation. */ - class TaskUIData( - var taskInfo: TaskInfo, - var metrics: Option[TaskMetrics] = None, - var errorMessage: Option[String] = None) + class TaskUIData private( + private var _taskInfo: TaskInfo, + private var _metrics: Option[TaskMetricsUIData]) { + + var errorMessage: Option[String] = None + + def taskInfo: TaskInfo = _taskInfo + + def metrics: Option[TaskMetricsUIData] = _metrics + + def updateTaskInfo(taskInfo: TaskInfo): Unit = { + _taskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo) + } + + def updateTaskMetrics(metrics: Option[TaskMetrics]): Unit = { + _metrics = TaskUIData.toTaskMetricsUIData(metrics) + } + } + + object TaskUIData { + def apply(taskInfo: TaskInfo, metrics: Option[TaskMetrics]): TaskUIData = { + new TaskUIData(dropInternalAndSQLAccumulables(taskInfo), toTaskMetricsUIData(metrics)) + } + + private def toTaskMetricsUIData(metrics: Option[TaskMetrics]): Option[TaskMetricsUIData] = { + metrics.map { m => + TaskMetricsUIData( + executorDeserializeTime = m.executorDeserializeTime, + executorRunTime = m.executorRunTime, + resultSize = m.resultSize, + jvmGCTime = m.jvmGCTime, + resultSerializationTime = m.resultSerializationTime, + memoryBytesSpilled = m.memoryBytesSpilled, + diskBytesSpilled = m.diskBytesSpilled, + peakExecutionMemory = m.peakExecutionMemory, + updatedBlockStatuses = m.updatedBlockStatuses.toList, + inputMetrics = InputMetricsUIData(m.inputMetrics.bytesRead, m.inputMetrics.recordsRead), + outputMetrics = + OutputMetricsUIData(m.outputMetrics.bytesWritten, m.outputMetrics.recordsWritten), + shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics), + shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics)) + } + } + + /** + * We don't need to store internal or SQL accumulables as their values will be shown in other + * places, so drop them to reduce the memory usage. + */ + private[spark] def dropInternalAndSQLAccumulables(taskInfo: TaskInfo): TaskInfo = { + val newTaskInfo = new TaskInfo( + taskId = taskInfo.taskId, + index = taskInfo.index, + attemptNumber = taskInfo.attemptNumber, + launchTime = taskInfo.launchTime, + executorId = taskInfo.executorId, + host = taskInfo.host, + taskLocality = taskInfo.taskLocality, + speculative = taskInfo.speculative + ) + newTaskInfo.gettingResultTime = taskInfo.gettingResultTime + newTaskInfo.accumulables ++= taskInfo.accumulables.filter { + accum => !accum.internal && accum.metadata != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER) + } + newTaskInfo.finishTime = taskInfo.finishTime + newTaskInfo.failed = taskInfo.failed + newTaskInfo + } + } class ExecutorUIData( val startTime: Long, var finishTime: Option[Long] = None, var finishReason: Option[String] = None) + + case class TaskMetricsUIData( + executorDeserializeTime: Long, + executorRunTime: Long, + resultSize: Long, + jvmGCTime: Long, + resultSerializationTime: Long, + memoryBytesSpilled: Long, + diskBytesSpilled: Long, + peakExecutionMemory: Long, + updatedBlockStatuses: Seq[(BlockId, BlockStatus)], + inputMetrics: InputMetricsUIData, + outputMetrics: OutputMetricsUIData, + shuffleReadMetrics: ShuffleReadMetricsUIData, + shuffleWriteMetrics: ShuffleWriteMetricsUIData) + + case class InputMetricsUIData(bytesRead: Long, recordsRead: Long) + + case class OutputMetricsUIData(bytesWritten: Long, recordsWritten: Long) + + case class ShuffleReadMetricsUIData( + remoteBlocksFetched: Long, + localBlocksFetched: Long, + remoteBytesRead: Long, + localBytesRead: Long, + fetchWaitTime: Long, + recordsRead: Long, + totalBytesRead: Long, + totalBlocksFetched: Long) + + object ShuffleReadMetricsUIData { + def apply(metrics: ShuffleReadMetrics): ShuffleReadMetricsUIData = { + new ShuffleReadMetricsUIData( + remoteBlocksFetched = metrics.remoteBlocksFetched, + localBlocksFetched = metrics.localBlocksFetched, + remoteBytesRead = metrics.remoteBytesRead, + localBytesRead = metrics.localBytesRead, + fetchWaitTime = metrics.fetchWaitTime, + recordsRead = metrics.recordsRead, + totalBytesRead = metrics.totalBytesRead, + totalBlocksFetched = metrics.totalBlocksFetched + ) + } + } + + case class ShuffleWriteMetricsUIData( + bytesWritten: Long, + recordsWritten: Long, + writeTime: Long) + + object ShuffleWriteMetricsUIData { + def apply(metrics: ShuffleWriteMetrics): ShuffleWriteMetricsUIData = { + new ShuffleWriteMetricsUIData( + bytesWritten = metrics.bytesWritten, + recordsWritten = metrics.recordsWritten, + writeTime = metrics.writeTime + ) + } + } + } diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 0cf9df084f..13cb6a28c3 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -255,6 +255,9 @@ private[spark] object AccumulatorContext { def clear(): Unit = { originals.clear() } + + // Identifier for distinguishing SQL metrics from other accumulators + private[spark] val SQL_ACCUM_IDENTIFIER = "sql" } diff --git a/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala b/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala index d223af1496..f684e16c25 100644 --- a/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala @@ -30,8 +30,8 @@ class AllStagesResourceSuite extends SparkFunSuite { def getFirstTaskLaunchTime(taskLaunchTimes: Seq[Long]): Option[Date] = { val tasks = new HashMap[Long, TaskUIData] taskLaunchTimes.zipWithIndex.foreach { case (time, idx) => - tasks(idx.toLong) = new TaskUIData( - new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false), None, None) + tasks(idx.toLong) = TaskUIData( + new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false), None) } val stageUiData = new StageUIData() diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index ce7d51d1c3..6f7c9f282a 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -25,7 +25,8 @@ import org.apache.spark._ import org.apache.spark.{LocalSparkContext, SparkConf, Success} import org.apache.spark.executor._ import org.apache.spark.scheduler._ -import org.apache.spark.util.Utils +import org.apache.spark.ui.jobs.UIData.TaskUIData +import org.apache.spark.util.{AccumulatorContext, Utils} class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers { @@ -359,4 +360,30 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with assert( stage1Data.taskData.get(1237L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 402) } + + test("drop internal and sql accumulators") { + val taskInfo = new TaskInfo(0, 0, 0, 0, "", "", TaskLocality.ANY, false) + val internalAccum = + AccumulableInfo(id = 1, name = Some("internal"), None, None, internal = true, false) + val sqlAccum = AccumulableInfo( + id = 2, + name = Some("sql"), + None, + None, + internal = false, + countFailedValues = false, + metadata = Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) + val userAccum = AccumulableInfo( + id = 3, + name = Some("user"), + None, + None, + internal = false, + countFailedValues = false, + metadata = None) + taskInfo.accumulables ++= Seq(internalAccum, sqlAccum, userAccum) + + val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo) + assert(newTaskInfo.accumulables === Seq(userAccum)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index d6de15494f..e63c7c581e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -21,7 +21,7 @@ import java.text.NumberFormat import org.apache.spark.SparkContext import org.apache.spark.scheduler.AccumulableInfo -import org.apache.spark.util.{AccumulatorV2, Utils} +import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils} class SQLMetric(val metricType: String, initValue: Long = 0L) extends AccumulatorV2[Long, Long] { @@ -56,15 +56,13 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato // Provide special identifier as metadata so we can tell that this is a `SQLMetric` later private[spark] override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { - new AccumulableInfo(id, name, update, value, true, true, Some(SQLMetrics.ACCUM_IDENTIFIER)) + new AccumulableInfo( + id, name, update, value, true, true, Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) } } private[sql] object SQLMetrics { - // Identifier for distinguishing SQL metrics from other accumulators - private[sql] val ACCUM_IDENTIFIER = "sql" - private[sql] val SUM_METRIC = "sum" private[sql] val SIZE_METRIC = "size" private[sql] val TIMING_METRIC = "timing" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 510a2ee3bf..03b532664a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -26,6 +26,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.metric._ import org.apache.spark.ui.SparkUI +import org.apache.spark.util.AccumulatorContext @DeveloperApi case class SparkListenerSQLExecutionStart( @@ -177,8 +178,10 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi taskId: Long, stageId: Int, stageAttemptID: Int, - accumulatorUpdates: Seq[AccumulableInfo], + _accumulatorUpdates: Seq[AccumulableInfo], finishTask: Boolean): Unit = { + val accumulatorUpdates = + _accumulatorUpdates.filter(_.update.isDefined).map(accum => (accum.id, accum.update.get)) _stageIdToStageMetrics.get(stageId) match { case Some(stageMetrics) => @@ -290,9 +293,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi stageMetrics <- _stageIdToStageMetrics.get(stageId).toIterable; taskMetrics <- stageMetrics.taskIdToMetricUpdates.values; accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield { - assert(accumulatorUpdate.update.isDefined, s"accumulator update from " + - s"task did not have a partial value: ${accumulatorUpdate.name}") - (accumulatorUpdate.id, accumulatorUpdate.update.get) + (accumulatorUpdate._1, accumulatorUpdate._2) } }.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) } mergeAccumulatorUpdates(accumulatorUpdates, accumulatorId => @@ -336,7 +337,7 @@ private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI) taskEnd.taskInfo.accumulables.flatMap { a => // Filter out accumulators that are not SQL metrics // For now we assume all SQL metrics are Long's that have been JSON serialized as String's - if (a.metadata == Some(SQLMetrics.ACCUM_IDENTIFIER)) { + if (a.metadata == Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) { val newValue = a.update.map(_.toString.toLong).getOrElse(0L) Some(a.copy(update = Some(newValue))) } else { @@ -418,4 +419,4 @@ private[ui] class SQLStageMetrics( private[ui] class SQLTaskMetrics( val attemptId: Long, // TODO not used yet var finished: Boolean, - var accumulatorUpdates: Seq[AccumulableInfo]) + var accumulatorUpdates: Seq[(Long, Any)]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 08f596f130..7a89b484eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -29,9 +29,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.ui.SparkPlanGraph import org.apache.spark.sql.functions._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.util.{JsonProtocol, Utils} +import org.apache.spark.util.{AccumulatorContext, JsonProtocol, Utils} class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { @@ -308,7 +307,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { case Some(v) => fail(s"metric value was not a Long: ${v.getClass.getName}") case _ => fail("metric update is missing") } - assert(metricInfo.metadata === Some(SQLMetrics.ACCUM_IDENTIFIER)) + assert(metricInfo.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) // After serializing to JSON, the original value type is lost, but we can still // identify that it's a SQL metric from the metadata val metricInfoJson = JsonProtocol.accumulableInfoToJson(metricInfo) @@ -318,7 +317,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { case Some(v) => fail(s"deserialized metric value was not a string: ${v.getClass.getName}") case _ => fail("deserialized metric update is missing") } - assert(metricInfoDeser.metadata === Some(SQLMetrics.ACCUM_IDENTIFIER)) + assert(metricInfoDeser.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 9eff42ab2d..1c467137ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -383,7 +383,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { } // Listener tracks only SQL metrics, not other accumulators assert(trackedAccums.size === 1) - assert(trackedAccums.head === sqlMetricInfo) + assert(trackedAccums.head === (sqlMetricInfo.id, sqlMetricInfo.update.get)) } } |