aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala
diff options
context:
space:
mode:
authorCarson Wang <carson.wang@intel.com>2017-02-23 14:31:16 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-23 14:31:16 -0800
commiteff7b40890f39617538d300df747277781a6f014 (patch)
tree85e34ed3bf702e3009098aa384a5846d9b2da11c /sql/core/src/main/scala
parentf87a6a59af5037c28d8b3c801c586b347f0ae10c (diff)
downloadspark-eff7b40890f39617538d300df747277781a6f014.tar.gz
spark-eff7b40890f39617538d300df747277781a6f014.tar.bz2
spark-eff7b40890f39617538d300df747277781a6f014.zip
[SPARK-19674][SQL] Ignore driver accumulator updates don't belong to the execution when merging all accumulator updates
## What changes were proposed in this pull request? In SQLListener.getExecutionMetrics, driver accumulator updates don't belong to the execution should be ignored when merging all accumulator updates to prevent NoSuchElementException. ## How was this patch tested? Updated unit test. Author: Carson Wang <carson.wang@intel.com> Closes #17009 from carsonwang/FixSQLMetrics.
Diffstat (limited to 'sql/core/src/main/scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala7
1 files changed, 5 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 5daf21595d..12d3bc9281 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -343,10 +343,13 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging {
accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield {
(accumulatorUpdate._1, accumulatorUpdate._2)
}
- }.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) }
+ }
val driverUpdates = executionUIData.driverAccumUpdates.toSeq
- mergeAccumulatorUpdates(accumulatorUpdates ++ driverUpdates, accumulatorId =>
+ val totalUpdates = (accumulatorUpdates ++ driverUpdates).filter {
+ case (id, _) => executionUIData.accumulatorMetrics.contains(id)
+ }
+ mergeAccumulatorUpdates(totalUpdates, accumulatorId =>
executionUIData.accumulatorMetrics(accumulatorId).metricType)
case None =>
// This execution has been dropped