aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-05-19 12:05:17 -0700
committerAndrew Or <andrew@databricks.com>2016-05-19 12:05:17 -0700
commit4e3cb7a5d965fd490390398ecfe35f1fc05e8511 (patch)
treeaa7b9e92c8196e6d02fbed6e8a5dd1ec3754de0c /sql
parent6ac1c3a040f88fae15c46acd73e7e3691f7d3619 (diff)
downloadspark-4e3cb7a5d965fd490390398ecfe35f1fc05e8511.tar.gz
spark-4e3cb7a5d965fd490390398ecfe35f1fc05e8511.tar.bz2
spark-4e3cb7a5d965fd490390398ecfe35f1fc05e8511.zip
[SPARK-15317][CORE] Don't store accumulators for every task in listeners
## What changes were proposed in this pull request? In general, the Web UI doesn't need to store the Accumulator/AccumulableInfo for every task. It only needs the Accumulator values. In this PR, it creates new UIData classes to store the necessary fields and make `JobProgressListener` store only these new classes, so that `JobProgressListener` won't store Accumulator/AccumulableInfo and the size of `JobProgressListener` becomes pretty small. I also eliminates `AccumulableInfo` from `SQLListener` so that we don't keep any references for those unused `AccumulableInfo`s. ## How was this patch tested? I ran two tests reported in JIRA locally: The first one is: ``` val data = spark.range(0, 10000, 1, 10000) data.cache().count() ``` The retained size of JobProgressListener decreases from 60.7M to 6.9M. The second one is: ``` import org.apache.spark.ml.CC import org.apache.spark.sql.SQLContext val sqlContext = SQLContext.getOrCreate(sc) CC.runTest(sqlContext) ``` This test won't cause OOM after applying this patch. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13153 from zsxwing/memory.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala2
4 files changed, 14 insertions, 16 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index d6de15494f..e63c7c581e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -21,7 +21,7 @@ import java.text.NumberFormat
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.AccumulableInfo
-import org.apache.spark.util.{AccumulatorV2, Utils}
+import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}
class SQLMetric(val metricType: String, initValue: Long = 0L) extends AccumulatorV2[Long, Long] {
@@ -56,15 +56,13 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato
// Provide special identifier as metadata so we can tell that this is a `SQLMetric` later
private[spark] override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
- new AccumulableInfo(id, name, update, value, true, true, Some(SQLMetrics.ACCUM_IDENTIFIER))
+ new AccumulableInfo(
+ id, name, update, value, true, true, Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
}
}
private[sql] object SQLMetrics {
- // Identifier for distinguishing SQL metrics from other accumulators
- private[sql] val ACCUM_IDENTIFIER = "sql"
-
private[sql] val SUM_METRIC = "sum"
private[sql] val SIZE_METRIC = "size"
private[sql] val TIMING_METRIC = "timing"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 510a2ee3bf..03b532664a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -26,6 +26,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.execution.metric._
import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.AccumulatorContext
@DeveloperApi
case class SparkListenerSQLExecutionStart(
@@ -177,8 +178,10 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
taskId: Long,
stageId: Int,
stageAttemptID: Int,
- accumulatorUpdates: Seq[AccumulableInfo],
+ _accumulatorUpdates: Seq[AccumulableInfo],
finishTask: Boolean): Unit = {
+ val accumulatorUpdates =
+ _accumulatorUpdates.filter(_.update.isDefined).map(accum => (accum.id, accum.update.get))
_stageIdToStageMetrics.get(stageId) match {
case Some(stageMetrics) =>
@@ -290,9 +293,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
stageMetrics <- _stageIdToStageMetrics.get(stageId).toIterable;
taskMetrics <- stageMetrics.taskIdToMetricUpdates.values;
accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield {
- assert(accumulatorUpdate.update.isDefined, s"accumulator update from " +
- s"task did not have a partial value: ${accumulatorUpdate.name}")
- (accumulatorUpdate.id, accumulatorUpdate.update.get)
+ (accumulatorUpdate._1, accumulatorUpdate._2)
}
}.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) }
mergeAccumulatorUpdates(accumulatorUpdates, accumulatorId =>
@@ -336,7 +337,7 @@ private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
taskEnd.taskInfo.accumulables.flatMap { a =>
// Filter out accumulators that are not SQL metrics
// For now we assume all SQL metrics are Long's that have been JSON serialized as String's
- if (a.metadata == Some(SQLMetrics.ACCUM_IDENTIFIER)) {
+ if (a.metadata == Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) {
val newValue = a.update.map(_.toString.toLong).getOrElse(0L)
Some(a.copy(update = Some(newValue)))
} else {
@@ -418,4 +419,4 @@ private[ui] class SQLStageMetrics(
private[ui] class SQLTaskMetrics(
val attemptId: Long, // TODO not used yet
var finished: Boolean,
- var accumulatorUpdates: Seq[AccumulableInfo])
+ var accumulatorUpdates: Seq[(Long, Any)])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 08f596f130..7a89b484eb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -29,9 +29,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.util.{JsonProtocol, Utils}
+import org.apache.spark.util.{AccumulatorContext, JsonProtocol, Utils}
class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
@@ -308,7 +307,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
case Some(v) => fail(s"metric value was not a Long: ${v.getClass.getName}")
case _ => fail("metric update is missing")
}
- assert(metricInfo.metadata === Some(SQLMetrics.ACCUM_IDENTIFIER))
+ assert(metricInfo.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
// After serializing to JSON, the original value type is lost, but we can still
// identify that it's a SQL metric from the metadata
val metricInfoJson = JsonProtocol.accumulableInfoToJson(metricInfo)
@@ -318,7 +317,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
case Some(v) => fail(s"deserialized metric value was not a string: ${v.getClass.getName}")
case _ => fail("deserialized metric update is missing")
}
- assert(metricInfoDeser.metadata === Some(SQLMetrics.ACCUM_IDENTIFIER))
+ assert(metricInfoDeser.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 9eff42ab2d..1c467137ba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -383,7 +383,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
}
// Listener tracks only SQL metrics, not other accumulators
assert(trackedAccums.size === 1)
- assert(trackedAccums.head === sqlMetricInfo)
+ assert(trackedAccums.head === (sqlMetricInfo.id, sqlMetricInfo.update.get))
}
}