diff options
author | Carson Wang <carson.wang@intel.com> | 2017-02-23 14:31:16 -0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-02-23 14:31:16 -0800 |
commit | eff7b40890f39617538d300df747277781a6f014 (patch) | |
tree | 85e34ed3bf702e3009098aa384a5846d9b2da11c | |
parent | f87a6a59af5037c28d8b3c801c586b347f0ae10c (diff) | |
download | spark-eff7b40890f39617538d300df747277781a6f014.tar.gz spark-eff7b40890f39617538d300df747277781a6f014.tar.bz2 spark-eff7b40890f39617538d300df747277781a6f014.zip |
[SPARK-19674][SQL] Ignore driver accumulator updates don't belong to the execution when merging all accumulator updates
## What changes were proposed in this pull request?
In SQLListener.getExecutionMetrics, driver accumulator updates don't belong to the execution should be ignored when merging all accumulator updates to prevent NoSuchElementException.
## How was this patch tested?
Updated unit test.
Author: Carson Wang <carson.wang@intel.com>
Closes #17009 from carsonwang/FixSQLMetrics.
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala | 7 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala | 5 |
2 files changed, 10 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 5daf21595d..12d3bc9281 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -343,10 +343,13 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging { accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield { (accumulatorUpdate._1, accumulatorUpdate._2) } - }.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) } + } val driverUpdates = executionUIData.driverAccumUpdates.toSeq - mergeAccumulatorUpdates(accumulatorUpdates ++ driverUpdates, accumulatorId => + val totalUpdates = (accumulatorUpdates ++ driverUpdates).filter { + case (id, _) => executionUIData.accumulatorMetrics.contains(id) + } + mergeAccumulatorUpdates(totalUpdates, accumulatorId => executionUIData.accumulatorMetrics(accumulatorId).metricType) case None => // This execution has been dropped diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 8aea112897..e41c00ecec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -147,6 +147,11 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + // Driver accumulator updates don't belong to this execution should be filtered and no + // exception will be thrown. + listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L)))) + checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), |