diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-05-19 12:05:17 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-05-19 12:05:17 -0700 |
commit | 4e3cb7a5d965fd490390398ecfe35f1fc05e8511 (patch) | |
tree | aa7b9e92c8196e6d02fbed6e8a5dd1ec3754de0c /sql | |
parent | 6ac1c3a040f88fae15c46acd73e7e3691f7d3619 (diff) | |
download | spark-4e3cb7a5d965fd490390398ecfe35f1fc05e8511.tar.gz spark-4e3cb7a5d965fd490390398ecfe35f1fc05e8511.tar.bz2 spark-4e3cb7a5d965fd490390398ecfe35f1fc05e8511.zip |
[SPARK-15317][CORE] Don't store accumulators for every task in listeners
## What changes were proposed in this pull request?
In general, the Web UI doesn't need to store the Accumulator/AccumulableInfo for every task. It only needs the Accumulator values.
In this PR, it creates new UIData classes to store the necessary fields and make `JobProgressListener` store only these new classes, so that `JobProgressListener` won't store Accumulator/AccumulableInfo and the size of `JobProgressListener` becomes pretty small. I also eliminates `AccumulableInfo` from `SQLListener` so that we don't keep any references for those unused `AccumulableInfo`s.
## How was this patch tested?
I ran two tests reported in JIRA locally:
The first one is:
```
val data = spark.range(0, 10000, 1, 10000)
data.cache().count()
```
The retained size of JobProgressListener decreases from 60.7M to 6.9M.
The second one is:
```
import org.apache.spark.ml.CC
import org.apache.spark.sql.SQLContext
val sqlContext = SQLContext.getOrCreate(sc)
CC.runTest(sqlContext)
```
This test won't cause OOM after applying this patch.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #13153 from zsxwing/memory.
Diffstat (limited to 'sql')
4 files changed, 14 insertions, 16 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index d6de15494f..e63c7c581e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -21,7 +21,7 @@ import java.text.NumberFormat import org.apache.spark.SparkContext import org.apache.spark.scheduler.AccumulableInfo -import org.apache.spark.util.{AccumulatorV2, Utils} +import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils} class SQLMetric(val metricType: String, initValue: Long = 0L) extends AccumulatorV2[Long, Long] { @@ -56,15 +56,13 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato // Provide special identifier as metadata so we can tell that this is a `SQLMetric` later private[spark] override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { - new AccumulableInfo(id, name, update, value, true, true, Some(SQLMetrics.ACCUM_IDENTIFIER)) + new AccumulableInfo( + id, name, update, value, true, true, Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) } } private[sql] object SQLMetrics { - // Identifier for distinguishing SQL metrics from other accumulators - private[sql] val ACCUM_IDENTIFIER = "sql" - private[sql] val SUM_METRIC = "sum" private[sql] val SIZE_METRIC = "size" private[sql] val TIMING_METRIC = "timing" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 510a2ee3bf..03b532664a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -26,6 +26,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.metric._ import org.apache.spark.ui.SparkUI +import org.apache.spark.util.AccumulatorContext @DeveloperApi case class SparkListenerSQLExecutionStart( @@ -177,8 +178,10 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi taskId: Long, stageId: Int, stageAttemptID: Int, - accumulatorUpdates: Seq[AccumulableInfo], + _accumulatorUpdates: Seq[AccumulableInfo], finishTask: Boolean): Unit = { + val accumulatorUpdates = + _accumulatorUpdates.filter(_.update.isDefined).map(accum => (accum.id, accum.update.get)) _stageIdToStageMetrics.get(stageId) match { case Some(stageMetrics) => @@ -290,9 +293,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi stageMetrics <- _stageIdToStageMetrics.get(stageId).toIterable; taskMetrics <- stageMetrics.taskIdToMetricUpdates.values; accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield { - assert(accumulatorUpdate.update.isDefined, s"accumulator update from " + - s"task did not have a partial value: ${accumulatorUpdate.name}") - (accumulatorUpdate.id, accumulatorUpdate.update.get) + (accumulatorUpdate._1, accumulatorUpdate._2) } }.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) } mergeAccumulatorUpdates(accumulatorUpdates, accumulatorId => @@ -336,7 +337,7 @@ private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI) taskEnd.taskInfo.accumulables.flatMap { a => // Filter out accumulators that are not SQL metrics // For now we assume all SQL metrics are Long's that have been JSON serialized as String's - if (a.metadata == Some(SQLMetrics.ACCUM_IDENTIFIER)) { + if (a.metadata == Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) { val newValue = a.update.map(_.toString.toLong).getOrElse(0L) Some(a.copy(update = Some(newValue))) } else { @@ -418,4 +419,4 @@ private[ui] class SQLStageMetrics( private[ui] class SQLTaskMetrics( val attemptId: Long, // TODO not used yet var finished: Boolean, - var accumulatorUpdates: Seq[AccumulableInfo]) + var accumulatorUpdates: Seq[(Long, Any)]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 08f596f130..7a89b484eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -29,9 +29,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.ui.SparkPlanGraph import org.apache.spark.sql.functions._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.util.{JsonProtocol, Utils} +import org.apache.spark.util.{AccumulatorContext, JsonProtocol, Utils} class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { @@ -308,7 +307,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { case Some(v) => fail(s"metric value was not a Long: ${v.getClass.getName}") case _ => fail("metric update is missing") } - assert(metricInfo.metadata === Some(SQLMetrics.ACCUM_IDENTIFIER)) + assert(metricInfo.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) // After serializing to JSON, the original value type is lost, but we can still // identify that it's a SQL metric from the metadata val metricInfoJson = JsonProtocol.accumulableInfoToJson(metricInfo) @@ -318,7 +317,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { case Some(v) => fail(s"deserialized metric value was not a string: ${v.getClass.getName}") case _ => fail("deserialized metric update is missing") } - assert(metricInfoDeser.metadata === Some(SQLMetrics.ACCUM_IDENTIFIER)) + assert(metricInfoDeser.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 9eff42ab2d..1c467137ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -383,7 +383,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { } // Listener tracks only SQL metrics, not other accumulators assert(trackedAccums.size === 1) - assert(trackedAccums.head === sqlMetricInfo) + assert(trackedAccums.head === (sqlMetricInfo.id, sqlMetricInfo.update.get)) } } |