aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala12
3 files changed, 10 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index f82e0b8bca..786110477d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -66,7 +66,7 @@ private[sql] object SQLMetrics {
def createMetric(sc: SparkContext, name: String): SQLMetric = {
val acc = new SQLMetric(SUM_METRIC)
- acc.register(sc, name = Some(name), countFailedValues = true)
+ acc.register(sc, name = Some(name), countFailedValues = false)
acc
}
@@ -79,7 +79,7 @@ private[sql] object SQLMetrics {
// data size total (min, med, max):
// 100GB (100MB, 1GB, 10GB)
val acc = new SQLMetric(SIZE_METRIC, -1)
- acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = true)
+ acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false)
acc
}
@@ -88,7 +88,7 @@ private[sql] object SQLMetrics {
// duration(min, med, max):
// 5s (800ms, 1s, 2s)
val acc = new SQLMetric(TIMING_METRIC, -1)
- acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = true)
+ acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false)
acc
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 29c54111ea..510a2ee3bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -164,7 +164,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
taskEnd.taskInfo.taskId,
taskEnd.stageId,
taskEnd.stageAttemptId,
- taskEnd.taskMetrics.accumulators().map(a => a.toInfo(Some(a.value), None)),
+ taskEnd.taskMetrics.externalAccums.map(a => a.toInfo(Some(a.value), None)),
finishTask = true)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 5e08658e5e..67e44849ca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.ui
import java.util.Properties
-import org.mockito.Mockito.{mock, when}
+import org.mockito.Mockito.mock
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
@@ -74,13 +74,13 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
)
private def createTaskMetrics(accumulatorUpdates: Map[Long, Long]): TaskMetrics = {
- val metrics = mock(classOf[TaskMetrics])
- when(metrics.accumulators()).thenReturn(accumulatorUpdates.map { case (id, update) =>
+ val metrics = TaskMetrics.empty
+ accumulatorUpdates.foreach { case (id, update) =>
val acc = new LongAccumulator
acc.metadata = AccumulatorMetadata(id, Some(""), true)
- acc.setValue(update)
- acc
- }.toSeq)
+ acc.add(update)
+ metrics.registerAccumulator(acc)
+ }
metrics
}