diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala | 12 |
1 files changed, 5 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala index 586c98b156..db3924cb69 100644 --- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import org.apache.spark.InternalAccumulator +import org.apache.spark.LongAccumulator import org.apache.spark.annotation.DeveloperApi @@ -39,20 +39,18 @@ object DataWriteMethod extends Enumeration with Serializable { */ @DeveloperApi class OutputMetrics private[spark] () extends Serializable { - import InternalAccumulator._ - - private[executor] val _bytesWritten = TaskMetrics.createLongAccum(output.BYTES_WRITTEN) - private[executor] val _recordsWritten = TaskMetrics.createLongAccum(output.RECORDS_WRITTEN) + private[executor] val _bytesWritten = new LongAccumulator + private[executor] val _recordsWritten = new LongAccumulator /** * Total number of bytes written. */ - def bytesWritten: Long = _bytesWritten.localValue + def bytesWritten: Long = _bytesWritten.sum /** * Total number of records written. */ - def recordsWritten: Long = _recordsWritten.localValue + def recordsWritten: Long = _recordsWritten.sum private[spark] def setBytesWritten(v: Long): Unit = _bytesWritten.setValue(v) private[spark] def setRecordsWritten(v: Long): Unit = _recordsWritten.setValue(v) |