diff options
Diffstat (limited to 'sql')
3 files changed, 10 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index f82e0b8bca..786110477d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -66,7 +66,7 @@ private[sql] object SQLMetrics { def createMetric(sc: SparkContext, name: String): SQLMetric = { val acc = new SQLMetric(SUM_METRIC) - acc.register(sc, name = Some(name), countFailedValues = true) + acc.register(sc, name = Some(name), countFailedValues = false) acc } @@ -79,7 +79,7 @@ private[sql] object SQLMetrics { // data size total (min, med, max): // 100GB (100MB, 1GB, 10GB) val acc = new SQLMetric(SIZE_METRIC, -1) - acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = true) + acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false) acc } @@ -88,7 +88,7 @@ private[sql] object SQLMetrics { // duration(min, med, max): // 5s (800ms, 1s, 2s) val acc = new SQLMetric(TIMING_METRIC, -1) - acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = true) + acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false) acc } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 29c54111ea..510a2ee3bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -164,7 +164,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi taskEnd.taskInfo.taskId, taskEnd.stageId, taskEnd.stageAttemptId, - taskEnd.taskMetrics.accumulators().map(a => a.toInfo(Some(a.value), None)), + taskEnd.taskMetrics.externalAccums.map(a => a.toInfo(Some(a.value), None)), finishTask = true) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 5e08658e5e..67e44849ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.ui import java.util.Properties -import org.mockito.Mockito.{mock, when} +import org.mockito.Mockito.mock import org.apache.spark._ import org.apache.spark.executor.TaskMetrics @@ -74,13 +74,13 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { ) private def createTaskMetrics(accumulatorUpdates: Map[Long, Long]): TaskMetrics = { - val metrics = mock(classOf[TaskMetrics]) - when(metrics.accumulators()).thenReturn(accumulatorUpdates.map { case (id, update) => + val metrics = TaskMetrics.empty + accumulatorUpdates.foreach { case (id, update) => val acc = new LongAccumulator acc.metadata = AccumulatorMetadata(id, Some(""), true) - acc.setValue(update) - acc - }.toSeq) + acc.add(update) + metrics.registerAccumulator(acc) + } metrics } |