aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java/org
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-01-18 19:22:29 -0800
committerJosh Rosen <joshrosen@databricks.com>2016-01-18 19:22:29 -0800
commit2b5d11f34d73eb7117c0c4668c1abb27dcc3a403 (patch)
tree424b549a6ad291ead0468de16654d06c25f3b5d0 /core/src/main/java/org
parent323d51f1dadf733e413203d678cb3f76e4d68981 (diff)
downloadspark-2b5d11f34d73eb7117c0c4668c1abb27dcc3a403.tar.gz
spark-2b5d11f34d73eb7117c0c4668c1abb27dcc3a403.tar.bz2
spark-2b5d11f34d73eb7117c0c4668c1abb27dcc3a403.zip
[SPARK-12885][MINOR] Rename 3 fields in ShuffleWriteMetrics
This is a small step in implementing SPARK-10620, which migrates TaskMetrics to accumulators. This patch is strictly a cleanup patch and introduces no change in functionality. It literally just renames 3 fields for consistency. Today we have: ``` inputMetrics.recordsRead outputMetrics.bytesWritten shuffleReadMetrics.localBlocksFetched ... shuffleWriteMetrics.shuffleRecordsWritten shuffleWriteMetrics.shuffleBytesWritten shuffleWriteMetrics.shuffleWriteTime ``` The shuffle write ones are kind of redundant. We can drop the `shuffle` part in the method names. I added backward compatible (but deprecated) methods with the old names. Parent PR: #10717 Author: Andrew Or <andrew@databricks.com> Closes #10811 from andrewor14/rename-things.
Diffstat (limited to 'core/src/main/java/org')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java4
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java4
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java6
-rw-r--r--core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java10
4 files changed, 12 insertions, 12 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index 56cdc22f36..a06dc1ce91 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -143,7 +143,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
- writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);
+ writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
while (records.hasNext()) {
final Product2<K, V> record = records.next();
@@ -203,7 +203,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
threwException = false;
} finally {
Closeables.close(out, threwException);
- writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
+ writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
return lengths;
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index 9affff8014..2c84de5bf2 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -233,8 +233,8 @@ final class ShuffleExternalSorter extends MemoryConsumer {
// Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
// Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
// This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
- writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
- taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
+ writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());
+ taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());
}
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 744c3008ca..c8cc705697 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -298,8 +298,8 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
// final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
// to be counted as shuffle write, but this will lead to double-counting of the final
// SpillInfo's bytes.
- writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
- writeMetrics.incShuffleBytesWritten(outputFile.length());
+ writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
+ writeMetrics.incBytesWritten(outputFile.length());
return partitionLengths;
}
} catch (IOException e) {
@@ -411,7 +411,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
spillInputChannelPositions[i] += actualBytesTransferred;
bytesToTransfer -= actualBytesTransferred;
}
- writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
+ writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
bytesWrittenToMergedFile += partitionLengthInSpill;
partitionLengths[partition] += partitionLengthInSpill;
}
diff --git a/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java b/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java
index dc2aa30466..5d0555a8c2 100644
--- a/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java
+++ b/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java
@@ -42,34 +42,34 @@ public final class TimeTrackingOutputStream extends OutputStream {
public void write(int b) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b);
- writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+ writeMetrics.incWriteTime(System.nanoTime() - startTime);
}
@Override
public void write(byte[] b) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b);
- writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+ writeMetrics.incWriteTime(System.nanoTime() - startTime);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b, off, len);
- writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+ writeMetrics.incWriteTime(System.nanoTime() - startTime);
}
@Override
public void flush() throws IOException {
final long startTime = System.nanoTime();
outputStream.flush();
- writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+ writeMetrics.incWriteTime(System.nanoTime() - startTime);
}
@Override
public void close() throws IOException {
final long startTime = System.nanoTime();
outputStream.close();
- writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+ writeMetrics.incWriteTime(System.nanoTime() - startTime);
}
}