aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala12
7 files changed, 51 insertions, 21 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 7f4652c2dd..1893167cf7 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -218,7 +218,7 @@ class TaskMetrics private[spark] () extends Serializable {
/**
* External accumulators registered with this task.
*/
- @transient private lazy val externalAccums = new ArrayBuffer[AccumulatorV2[_, _]]
+ @transient private[spark] lazy val externalAccums = new ArrayBuffer[AccumulatorV2[_, _]]
private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {
externalAccums += a
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 95bcc7bc96..15f863b66c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -155,7 +155,14 @@ private[spark] abstract class Task[T](
*/
def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulatorV2[_, _]] = {
if (context != null) {
- context.taskMetrics.accumulators().filter { a => !taskFailed || a.countFailedValues }
+ context.taskMetrics.internalAccums.filter { a =>
+ // RESULT_SIZE accumulator is always zero at executor, we need to send it back as its
+ // value will be updated at driver side.
+ // Note: internal accumulators representing task metrics always count failed values
+ !a.isZero || a.name == Some(InternalAccumulator.RESULT_SIZE)
+ // zero value external accumulators may still be useful, e.g. SQLMetrics, we should not filter
+ // them out.
+ } ++ context.taskMetrics.externalAccums.filter(a => !taskFailed || a.countFailedValues)
} else {
Seq.empty
}
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index d8f380e123..c4879036f6 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -256,7 +256,7 @@ class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {
* Adds v to the accumulator, i.e. increment sum by v and count by 1.
* @since 2.0.0
*/
- override def isZero: Boolean = _count == 0L
+ override def isZero: Boolean = _sum == 0L && _count == 0
override def copyAndReset(): LongAccumulator = new LongAccumulator
@@ -321,7 +321,7 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
private[this] var _sum = 0.0
private[this] var _count = 0L
- override def isZero: Boolean = _count == 0L
+ override def isZero: Boolean = _sum == 0.0 && _count == 0
override def copyAndReset(): DoubleAccumulator = new DoubleAccumulator
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index 9aca4dbc23..368668bc7e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -168,8 +168,10 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
test("failed tasks collect only accumulators whose values count during failures") {
sc = new SparkContext("local", "test")
- val acc1 = AccumulatorSuite.createLongAccum("x", true)
- val acc2 = AccumulatorSuite.createLongAccum("y", false)
+ val acc1 = AccumulatorSuite.createLongAccum("x", false)
+ val acc2 = AccumulatorSuite.createLongAccum("y", true)
+ acc1.add(1)
+ acc2.add(1)
// Create a dummy task. We won't end up running this; we just want to collect
// accumulator updates from it.
val taskMetrics = TaskMetrics.empty
@@ -185,12 +187,33 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
}
// First, simulate task success. This should give us all the accumulators.
val accumUpdates1 = task.collectAccumulatorUpdates(taskFailed = false)
- val accumUpdates2 = taskMetrics.internalAccums ++ Seq(acc1, acc2)
- TaskMetricsSuite.assertUpdatesEquals(accumUpdates1, accumUpdates2)
+ TaskMetricsSuite.assertUpdatesEquals(accumUpdates1.takeRight(2), Seq(acc1, acc2))
// Now, simulate task failures. This should give us only the accums that count failed values.
- val accumUpdates3 = task.collectAccumulatorUpdates(taskFailed = true)
- val accumUpdates4 = taskMetrics.internalAccums ++ Seq(acc1)
- TaskMetricsSuite.assertUpdatesEquals(accumUpdates3, accumUpdates4)
+ val accumUpdates2 = task.collectAccumulatorUpdates(taskFailed = true)
+ TaskMetricsSuite.assertUpdatesEquals(accumUpdates2.takeRight(1), Seq(acc2))
+ }
+
+ test("only updated internal accumulators will be sent back to driver") {
+ sc = new SparkContext("local", "test")
+ // Create a dummy task. We won't end up running this; we just want to collect
+ // accumulator updates from it.
+ val taskMetrics = TaskMetrics.empty
+ val task = new Task[Int](0, 0, 0) {
+ context = new TaskContextImpl(0, 0, 0L, 0,
+ new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
+ new Properties,
+ SparkEnv.get.metricsSystem,
+ taskMetrics)
+ taskMetrics.incMemoryBytesSpilled(10)
+ override def runTask(tc: TaskContext): Int = 0
+ }
+ val updatedAccums = task.collectAccumulatorUpdates()
+ assert(updatedAccums.length == 2)
+ // the RESULT_SIZE accumulator will be sent back anyway.
+ assert(updatedAccums(0).name == Some(InternalAccumulator.RESULT_SIZE))
+ assert(updatedAccums(0).value == 0)
+ assert(updatedAccums(1).name == Some(InternalAccumulator.MEMORY_BYTES_SPILLED))
+ assert(updatedAccums(1).value == 10)
}
test("localProperties are propagated to executors correctly") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index f82e0b8bca..786110477d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -66,7 +66,7 @@ private[sql] object SQLMetrics {
def createMetric(sc: SparkContext, name: String): SQLMetric = {
val acc = new SQLMetric(SUM_METRIC)
- acc.register(sc, name = Some(name), countFailedValues = true)
+ acc.register(sc, name = Some(name), countFailedValues = false)
acc
}
@@ -79,7 +79,7 @@ private[sql] object SQLMetrics {
// data size total (min, med, max):
// 100GB (100MB, 1GB, 10GB)
val acc = new SQLMetric(SIZE_METRIC, -1)
- acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = true)
+ acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false)
acc
}
@@ -88,7 +88,7 @@ private[sql] object SQLMetrics {
// duration(min, med, max):
// 5s (800ms, 1s, 2s)
val acc = new SQLMetric(TIMING_METRIC, -1)
- acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = true)
+ acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false)
acc
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 29c54111ea..510a2ee3bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -164,7 +164,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
taskEnd.taskInfo.taskId,
taskEnd.stageId,
taskEnd.stageAttemptId,
- taskEnd.taskMetrics.accumulators().map(a => a.toInfo(Some(a.value), None)),
+ taskEnd.taskMetrics.externalAccums.map(a => a.toInfo(Some(a.value), None)),
finishTask = true)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 5e08658e5e..67e44849ca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.ui
import java.util.Properties
-import org.mockito.Mockito.{mock, when}
+import org.mockito.Mockito.mock
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
@@ -74,13 +74,13 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
)
private def createTaskMetrics(accumulatorUpdates: Map[Long, Long]): TaskMetrics = {
- val metrics = mock(classOf[TaskMetrics])
- when(metrics.accumulators()).thenReturn(accumulatorUpdates.map { case (id, update) =>
+ val metrics = TaskMetrics.empty
+ accumulatorUpdates.foreach { case (id, update) =>
val acc = new LongAccumulator
acc.metadata = AccumulatorMetadata(id, Some(""), true)
- acc.setValue(update)
- acc
- }.toSeq)
+ acc.add(update)
+ metrics.registerAccumulator(acc)
+ }
metrics
}