diff options
author | Carson Wang <carson.wang@intel.com> | 2017-02-23 14:31:16 -0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-02-23 14:31:16 -0800 |
commit | eff7b40890f39617538d300df747277781a6f014 (patch) | |
tree | 85e34ed3bf702e3009098aa384a5846d9b2da11c /sql/core/src/main/scala | |
parent | f87a6a59af5037c28d8b3c801c586b347f0ae10c (diff) | |
download | spark-eff7b40890f39617538d300df747277781a6f014.tar.gz spark-eff7b40890f39617538d300df747277781a6f014.tar.bz2 spark-eff7b40890f39617538d300df747277781a6f014.zip |
[SPARK-19674][SQL] Ignore driver accumulator updates don't belong to the execution when merging all accumulator updates
## What changes were proposed in this pull request?
In SQLListener.getExecutionMetrics, driver accumulator updates don't belong to the execution should be ignored when merging all accumulator updates to prevent NoSuchElementException.
## How was this patch tested?
Updated unit test.
Author: Carson Wang <carson.wang@intel.com>
Closes #17009 from carsonwang/FixSQLMetrics.
Diffstat (limited to 'sql/core/src/main/scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala | 7 |
1 files changed, 5 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 5daf21595d..12d3bc9281 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -343,10 +343,13 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging { accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield { (accumulatorUpdate._1, accumulatorUpdate._2) } - }.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) } + } val driverUpdates = executionUIData.driverAccumUpdates.toSeq - mergeAccumulatorUpdates(accumulatorUpdates ++ driverUpdates, accumulatorId => + val totalUpdates = (accumulatorUpdates ++ driverUpdates).filter { + case (id, _) => executionUIData.accumulatorMetrics.contains(id) + } + mergeAccumulatorUpdates(totalUpdates, accumulatorId => executionUIData.accumulatorMetrics(accumulatorId).metricType) case None => // This execution has been dropped |