aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-15 15:39:39 -0700
committerReynold Xin <rxin@databricks.com>2016-04-15 15:39:39 -0700
commit8028a28885dbd90f20e38922240618fc310a0a65 (patch)
tree2a303488b198fdb417af37cfa6ad981b988f94ee /core/src/main/java
parent90b46e014a60069bd18754b02fce056d8f4d1b3e (diff)
downloadspark-8028a28885dbd90f20e38922240618fc310a0a65.tar.gz
spark-8028a28885dbd90f20e38922240618fc310a0a65.tar.bz2
spark-8028a28885dbd90f20e38922240618fc310a0a65.zip
[SPARK-14628][CORE] Simplify task metrics by always tracking read/write metrics
## What changes were proposed in this pull request? Part of the reason why TaskMetrics and its callers are complicated are due to the optional metrics we collect, including input, output, shuffle read, and shuffle write. I think we can always track them and just assign 0 as the initial values. It is usually very obvious whether a task is supposed to read any data or not. By always tracking them, we can remove a lot of map, foreach, flatMap, getOrElse(0L) calls throughout Spark. This patch also changes a few behaviors. 1. Removed the distinction of data read/write methods (e.g. Hadoop, Memory, Network, etc). 2. Accumulate all data reads and writes, rather than only the first method. (Fixes SPARK-5225) ## How was this patch tested? existing tests. This is bases on https://github.com/apache/spark/pull/12388, with more test fixes. Author: Reynold Xin <rxin@databricks.com> Author: Wenchen Fan <wenchen@databricks.com> Closes #12417 from cloud-fan/metrics-refactor.
Diffstat (limited to 'core/src/main/java')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java2
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java2
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java2
3 files changed, 3 insertions, 3 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index 7a60c3eb35..0e9defe5b4 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -114,7 +114,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
- this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
+ this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
this.serializer = dep.serializer();
this.shuffleBlockResolver = shuffleBlockResolver;
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 0c5fb883a8..daa63d47e6 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -118,7 +118,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.shuffleId = dep.shuffleId();
this.serializer = dep.serializer().newInstance();
this.partitioner = dep.partitioner();
- this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
+ this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
this.taskContext = taskContext;
this.sparkConf = sparkConf;
this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index ef79b49083..3e32dd9d63 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -129,7 +129,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.fileBufferSizeBytes = 32 * 1024;
- this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
+ this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
if (existingInMemorySorter == null) {
this.inMemSorter = new UnsafeInMemorySorter(