aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala21
1 files changed, 21 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
index 50bb645d97..71a24770b5 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -116,4 +116,25 @@ class ShuffleReadMetrics private (
private[spark] def setFetchWaitTime(v: Long): Unit = _fetchWaitTime.setValue(v)
private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v)
+ /**
+ * Resets the value of the current metrics (`this`) and and merges all the independent
+ * [[ShuffleReadMetrics]] into `this`.
+ */
+ private[spark] def setMergeValues(metrics: Seq[ShuffleReadMetrics]): Unit = {
+ _remoteBlocksFetched.setValue(_remoteBlocksFetched.zero)
+ _localBlocksFetched.setValue(_localBlocksFetched.zero)
+ _remoteBytesRead.setValue(_remoteBytesRead.zero)
+ _localBytesRead.setValue(_localBytesRead.zero)
+ _fetchWaitTime.setValue(_fetchWaitTime.zero)
+ _recordsRead.setValue(_recordsRead.zero)
+ metrics.foreach { metric =>
+ _remoteBlocksFetched.add(metric.remoteBlocksFetched)
+ _localBlocksFetched.add(metric.localBlocksFetched)
+ _remoteBytesRead.add(metric.remoteBytesRead)
+ _localBytesRead.add(metric.localBytesRead)
+ _fetchWaitTime.add(metric.fetchWaitTime)
+ _recordsRead.add(metric.recordsRead)
+ }
+ }
+
}