From 8028a28885dbd90f20e38922240618fc310a0a65 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 15 Apr 2016 15:39:39 -0700 Subject: [SPARK-14628][CORE] Simplify task metrics by always tracking read/write metrics ## What changes were proposed in this pull request? Part of the reason why TaskMetrics and its callers are complicated are due to the optional metrics we collect, including input, output, shuffle read, and shuffle write. I think we can always track them and just assign 0 as the initial values. It is usually very obvious whether a task is supposed to read any data or not. By always tracking them, we can remove a lot of map, foreach, flatMap, getOrElse(0L) calls throughout Spark. This patch also changes a few behaviors. 1. Removed the distinction of data read/write methods (e.g. Hadoop, Memory, Network, etc). 2. Accumulate all data reads and writes, rather than only the first method. (Fixes SPARK-5225) ## How was this patch tested? existing tests. This is bases on https://github.com/apache/spark/pull/12388, with more test fixes. Author: Reynold Xin Author: Wenchen Fan Closes #12417 from cloud-fan/metrics-refactor. --- .../org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java | 2 +- .../main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 2 +- .../apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) (limited to 'core/src/main/java') diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 7a60c3eb35..0e9defe5b4 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -114,7 +114,7 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { this.shuffleId = dep.shuffleId(); this.partitioner = dep.partitioner(); this.numPartitions = partitioner.numPartitions(); - this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics(); + this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics(); this.serializer = dep.serializer(); this.shuffleBlockResolver = shuffleBlockResolver; } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 0c5fb883a8..daa63d47e6 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -118,7 +118,7 @@ public class UnsafeShuffleWriter extends ShuffleWriter { this.shuffleId = dep.shuffleId(); this.serializer = dep.serializer().newInstance(); this.partitioner = dep.partitioner(); - this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics(); + this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics(); this.taskContext = taskContext; this.sparkConf = sparkConf; this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true); diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index ef79b49083..3e32dd9d63 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -129,7 +129,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { // Use getSizeAsKb (not bytes) to maintain backwards compatibility for units // this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; this.fileBufferSizeBytes = 32 * 1024; - this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics(); + this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics(); if (existingInMemorySorter == null) { this.inMemSorter = new UnsafeInMemorySorter( -- cgit v1.2.3