aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-01-19 10:58:51 -0800
committerAndrew Or <andrew@databricks.com>2016-01-19 10:58:51 -0800
commitb122c861cd72b580334a7532f0a52c0439552bdf (patch)
tree9a3c7e9f3b1c190c7914c89e061c7fa15911a846 /core/src/main/java
parente14817b528ccab4b4685b45a95e2325630b5fc53 (diff)
downloadspark-b122c861cd72b580334a7532f0a52c0439552bdf.tar.gz
spark-b122c861cd72b580334a7532f0a52c0439552bdf.tar.bz2
spark-b122c861cd72b580334a7532f0a52c0439552bdf.zip
[SPARK-12887] Do not expose var's in TaskMetrics
This is a step in implementing SPARK-10620, which migrates TaskMetrics to accumulators. TaskMetrics has a bunch of var's, some are fully public, some are `private[spark]`. This is bad coding style that makes it easy to accidentally overwrite previously set metrics. This has happened a few times in the past and caused bugs that were difficult to debug. Instead, we should have get-or-create semantics, which are more readily understandable. This makes sense in the case of TaskMetrics because these are just aggregated metrics that we want to collect throughout the task, so it doesn't matter who's incrementing them. Parent PR: #10717 Author: Andrew Or <andrew@databricks.com> Author: Josh Rosen <joshrosen@databricks.com> Author: andrewor14 <andrew@databricks.com> Closes #10815 from andrewor14/get-or-create-metrics.
Diffstat (limited to 'core/src/main/java')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java3
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java3
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java4
3 files changed, 3 insertions, 7 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index a06dc1ce91..dc4f289ae7 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -114,8 +114,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
- this.writeMetrics = new ShuffleWriteMetrics();
- taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
+ this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
this.serializer = Serializer.getSerializer(dep.serializer());
this.shuffleBlockResolver = shuffleBlockResolver;
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index c8cc705697..d3d79a27ea 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -119,8 +119,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.shuffleId = dep.shuffleId();
this.serializer = Serializer.getSerializer(dep.serializer()).newInstance();
this.partitioner = dep.partitioner();
- this.writeMetrics = new ShuffleWriteMetrics();
- taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
+ this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
this.taskContext = taskContext;
this.sparkConf = sparkConf;
this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 68dc0c6d41..a6edc1ad3f 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -122,9 +122,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.fileBufferSizeBytes = 32 * 1024;
- // TODO: metrics tracking + integration with shuffle write metrics
- // need to connect the write metrics to task metrics so we count the spill IO somewhere.
- this.writeMetrics = new ShuffleWriteMetrics();
+ this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
if (existingInMemorySorter == null) {
this.inMemSorter = new UnsafeInMemorySorter(