aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala5
2 files changed, 10 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 5daf21595d..12d3bc9281 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -343,10 +343,13 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging {
accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield {
(accumulatorUpdate._1, accumulatorUpdate._2)
}
- }.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) }
+ }
val driverUpdates = executionUIData.driverAccumUpdates.toSeq
- mergeAccumulatorUpdates(accumulatorUpdates ++ driverUpdates, accumulatorId =>
+ val totalUpdates = (accumulatorUpdates ++ driverUpdates).filter {
+ case (id, _) => executionUIData.accumulatorMetrics.contains(id)
+ }
+ mergeAccumulatorUpdates(totalUpdates, accumulatorId =>
executionUIData.accumulatorMetrics(accumulatorId).metricType)
case None =>
// This execution has been dropped
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 8aea112897..e41c00ecec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -147,6 +147,11 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
+ // Driver accumulator updates don't belong to this execution should be filtered and no
+ // exception will be thrown.
+ listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
+ checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
+
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
(0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),