diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala | 21 |
1 files changed, 21 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala index 50bb645d97..71a24770b5 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala @@ -116,4 +116,25 @@ class ShuffleReadMetrics private ( private[spark] def setFetchWaitTime(v: Long): Unit = _fetchWaitTime.setValue(v) private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v) + /** + * Resets the value of the current metrics (`this`) and and merges all the independent + * [[ShuffleReadMetrics]] into `this`. + */ + private[spark] def setMergeValues(metrics: Seq[ShuffleReadMetrics]): Unit = { + _remoteBlocksFetched.setValue(_remoteBlocksFetched.zero) + _localBlocksFetched.setValue(_localBlocksFetched.zero) + _remoteBytesRead.setValue(_remoteBytesRead.zero) + _localBytesRead.setValue(_localBytesRead.zero) + _fetchWaitTime.setValue(_fetchWaitTime.zero) + _recordsRead.setValue(_recordsRead.zero) + metrics.foreach { metric => + _remoteBlocksFetched.add(metric.remoteBlocksFetched) + _localBlocksFetched.add(metric.localBlocksFetched) + _remoteBytesRead.add(metric.remoteBytesRead) + _localBytesRead.add(metric.localBytesRead) + _fetchWaitTime.add(metric.fetchWaitTime) + _recordsRead.add(metric.recordsRead) + } + } + } |