From eff7b40890f39617538d300df747277781a6f014 Mon Sep 17 00:00:00 2001 From: Carson Wang Date: Thu, 23 Feb 2017 14:31:16 -0800 Subject: [SPARK-19674][SQL] Ignore driver accumulator updates don't belong to the execution when merging all accumulator updates ## What changes were proposed in this pull request? In SQLListener.getExecutionMetrics, driver accumulator updates don't belong to the execution should be ignored when merging all accumulator updates to prevent NoSuchElementException. ## How was this patch tested? Updated unit test. Author: Carson Wang Closes #17009 from carsonwang/FixSQLMetrics. --- .../main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala | 7 +++++-- .../scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala | 5 +++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 5daf21595d..12d3bc9281 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -343,10 +343,13 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging { accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield { (accumulatorUpdate._1, accumulatorUpdate._2) } - }.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) } + } val driverUpdates = executionUIData.driverAccumUpdates.toSeq - mergeAccumulatorUpdates(accumulatorUpdates ++ driverUpdates, accumulatorId => + val totalUpdates = (accumulatorUpdates ++ driverUpdates).filter { + case (id, _) => executionUIData.accumulatorMetrics.contains(id) + } + mergeAccumulatorUpdates(totalUpdates, accumulatorId => executionUIData.accumulatorMetrics(accumulatorId).metricType) case None => // This execution has been dropped diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 8aea112897..e41c00ecec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -147,6 +147,11 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + // Driver accumulator updates don't belong to this execution should be filtered and no + // exception will be thrown. + listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L)))) + checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), -- cgit v1.2.3