aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/java
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-15 15:39:39 -0700
committerReynold Xin <rxin@databricks.com>2016-04-15 15:39:39 -0700
commit8028a28885dbd90f20e38922240618fc310a0a65 (patch)
tree2a303488b198fdb417af37cfa6ad981b988f94ee /core/src/test/java
parent90b46e014a60069bd18754b02fce056d8f4d1b3e (diff)
downloadspark-8028a28885dbd90f20e38922240618fc310a0a65.tar.gz
spark-8028a28885dbd90f20e38922240618fc310a0a65.tar.bz2
spark-8028a28885dbd90f20e38922240618fc310a0a65.zip
[SPARK-14628][CORE] Simplify task metrics by always tracking read/write metrics
## What changes were proposed in this pull request? Part of the reason why TaskMetrics and its callers are complicated are due to the optional metrics we collect, including input, output, shuffle read, and shuffle write. I think we can always track them and just assign 0 as the initial values. It is usually very obvious whether a task is supposed to read any data or not. By always tracking them, we can remove a lot of map, foreach, flatMap, getOrElse(0L) calls throughout Spark. This patch also changes a few behaviors. 1. Removed the distinction of data read/write methods (e.g. Hadoop, Memory, Network, etc). 2. Accumulate all data reads and writes, rather than only the first method. (Fixes SPARK-5225) ## How was this patch tested? existing tests. This is bases on https://github.com/apache/spark/pull/12388, with more test fixes. Author: Reynold Xin <rxin@databricks.com> Author: Wenchen Fan <wenchen@databricks.com> Closes #12417 from cloud-fan/metrics-refactor.
Diffstat (limited to 'core/src/test/java')
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java12
1 files changed, 6 insertions, 6 deletions
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 30750b1bf1..fbaaa1cf49 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -249,8 +249,8 @@ public class UnsafeShuffleWriterSuite {
assertTrue(mapStatus.isDefined());
assertTrue(mergedOutputFile.exists());
assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
- assertEquals(0, taskMetrics.shuffleWriteMetrics().get().recordsWritten());
- assertEquals(0, taskMetrics.shuffleWriteMetrics().get().bytesWritten());
+ assertEquals(0, taskMetrics.shuffleWriteMetrics().recordsWritten());
+ assertEquals(0, taskMetrics.shuffleWriteMetrics().bytesWritten());
assertEquals(0, taskMetrics.diskBytesSpilled());
assertEquals(0, taskMetrics.memoryBytesSpilled());
}
@@ -279,7 +279,7 @@ public class UnsafeShuffleWriterSuite {
HashMultiset.create(dataToWrite),
HashMultiset.create(readRecordsFromFile()));
assertSpillFilesWereCleanedUp();
- ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
+ ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertEquals(0, taskMetrics.diskBytesSpilled());
assertEquals(0, taskMetrics.memoryBytesSpilled());
@@ -321,7 +321,7 @@ public class UnsafeShuffleWriterSuite {
assertEquals(HashMultiset.create(dataToWrite), HashMultiset.create(readRecordsFromFile()));
assertSpillFilesWereCleanedUp();
- ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
+ ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
@@ -383,7 +383,7 @@ public class UnsafeShuffleWriterSuite {
writer.stop(true);
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
- ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
+ ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
@@ -404,7 +404,7 @@ public class UnsafeShuffleWriterSuite {
writer.stop(true);
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
- ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
+ ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));