aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala12
1 files changed, 5 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
index 586c98b156..db3924cb69 100644
--- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.executor
-import org.apache.spark.InternalAccumulator
+import org.apache.spark.LongAccumulator
import org.apache.spark.annotation.DeveloperApi
@@ -39,20 +39,18 @@ object DataWriteMethod extends Enumeration with Serializable {
*/
@DeveloperApi
class OutputMetrics private[spark] () extends Serializable {
- import InternalAccumulator._
-
- private[executor] val _bytesWritten = TaskMetrics.createLongAccum(output.BYTES_WRITTEN)
- private[executor] val _recordsWritten = TaskMetrics.createLongAccum(output.RECORDS_WRITTEN)
+ private[executor] val _bytesWritten = new LongAccumulator
+ private[executor] val _recordsWritten = new LongAccumulator
/**
* Total number of bytes written.
*/
- def bytesWritten: Long = _bytesWritten.localValue
+ def bytesWritten: Long = _bytesWritten.sum
/**
* Total number of records written.
*/
- def recordsWritten: Long = _recordsWritten.localValue
+ def recordsWritten: Long = _recordsWritten.sum
private[spark] def setBytesWritten(v: Long): Unit = _bytesWritten.setValue(v)
private[spark] def setRecordsWritten(v: Long): Unit = _recordsWritten.setValue(v)