aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-01-18 19:22:29 -0800
committerJosh Rosen <joshrosen@databricks.com>2016-01-18 19:22:29 -0800
commit2b5d11f34d73eb7117c0c4668c1abb27dcc3a403 (patch)
tree424b549a6ad291ead0468de16654d06c25f3b5d0 /core
parent323d51f1dadf733e413203d678cb3f76e4d68981 (diff)
downloadspark-2b5d11f34d73eb7117c0c4668c1abb27dcc3a403.tar.gz
spark-2b5d11f34d73eb7117c0c4668c1abb27dcc3a403.tar.bz2
spark-2b5d11f34d73eb7117c0c4668c1abb27dcc3a403.zip
[SPARK-12885][MINOR] Rename 3 fields in ShuffleWriteMetrics
This is a small step in implementing SPARK-10620, which migrates TaskMetrics to accumulators. This patch is strictly a cleanup patch and introduces no change in functionality. It literally just renames 3 fields for consistency. Today we have: ``` inputMetrics.recordsRead outputMetrics.bytesWritten shuffleReadMetrics.localBlocksFetched ... shuffleWriteMetrics.shuffleRecordsWritten shuffleWriteMetrics.shuffleBytesWritten shuffleWriteMetrics.shuffleWriteTime ``` The shuffle write ones are kind of redundant. We can drop the `shuffle` part in the method names. I added backward compatible (but deprecated) methods with the old names. Parent PR: #10717 Author: Andrew Or <andrew@databricks.com> Closes #10811 from andrewor14/rename-things.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java4
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java4
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java6
-rw-r--r--core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java10
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala4
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java20
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala54
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala12
24 files changed, 126 insertions, 114 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index 56cdc22f36..a06dc1ce91 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -143,7 +143,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
- writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);
+ writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
while (records.hasNext()) {
final Product2<K, V> record = records.next();
@@ -203,7 +203,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
threwException = false;
} finally {
Closeables.close(out, threwException);
- writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
+ writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
return lengths;
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index 9affff8014..2c84de5bf2 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -233,8 +233,8 @@ final class ShuffleExternalSorter extends MemoryConsumer {
// Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
// Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
// This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
- writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
- taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
+ writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());
+ taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());
}
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 744c3008ca..c8cc705697 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -298,8 +298,8 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
// final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
// to be counted as shuffle write, but this will lead to double-counting of the final
// SpillInfo's bytes.
- writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
- writeMetrics.incShuffleBytesWritten(outputFile.length());
+ writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
+ writeMetrics.incBytesWritten(outputFile.length());
return partitionLengths;
}
} catch (IOException e) {
@@ -411,7 +411,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
spillInputChannelPositions[i] += actualBytesTransferred;
bytesToTransfer -= actualBytesTransferred;
}
- writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
+ writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
bytesWrittenToMergedFile += partitionLengthInSpill;
partitionLengths[partition] += partitionLengthInSpill;
}
diff --git a/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java b/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java
index dc2aa30466..5d0555a8c2 100644
--- a/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java
+++ b/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java
@@ -42,34 +42,34 @@ public final class TimeTrackingOutputStream extends OutputStream {
public void write(int b) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b);
- writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+ writeMetrics.incWriteTime(System.nanoTime() - startTime);
}
@Override
public void write(byte[] b) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b);
- writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+ writeMetrics.incWriteTime(System.nanoTime() - startTime);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b, off, len);
- writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+ writeMetrics.incWriteTime(System.nanoTime() - startTime);
}
@Override
public void flush() throws IOException {
final long startTime = System.nanoTime();
outputStream.flush();
- writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+ writeMetrics.incWriteTime(System.nanoTime() - startTime);
}
@Override
public void close() throws IOException {
final long startTime = System.nanoTime();
outputStream.close();
- writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+ writeMetrics.incWriteTime(System.nanoTime() - startTime);
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
index 469ebe26c7..24795f8600 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
@@ -26,28 +26,39 @@ import org.apache.spark.annotation.DeveloperApi
*/
@DeveloperApi
class ShuffleWriteMetrics extends Serializable {
+
/**
* Number of bytes written for the shuffle by this task
*/
- @volatile private var _shuffleBytesWritten: Long = _
- def shuffleBytesWritten: Long = _shuffleBytesWritten
- private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
- private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
+ @volatile private var _bytesWritten: Long = _
+ def bytesWritten: Long = _bytesWritten
+ private[spark] def incBytesWritten(value: Long) = _bytesWritten += value
+ private[spark] def decBytesWritten(value: Long) = _bytesWritten -= value
/**
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
*/
- @volatile private var _shuffleWriteTime: Long = _
- def shuffleWriteTime: Long = _shuffleWriteTime
- private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
- private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
+ @volatile private var _writeTime: Long = _
+ def writeTime: Long = _writeTime
+ private[spark] def incWriteTime(value: Long) = _writeTime += value
+ private[spark] def decWriteTime(value: Long) = _writeTime -= value
/**
* Total number of records written to the shuffle by this task
*/
- @volatile private var _shuffleRecordsWritten: Long = _
- def shuffleRecordsWritten: Long = _shuffleRecordsWritten
- private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
- private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
- private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
+ @volatile private var _recordsWritten: Long = _
+ def recordsWritten: Long = _recordsWritten
+ private[spark] def incRecordsWritten(value: Long) = _recordsWritten += value
+ private[spark] def decRecordsWritten(value: Long) = _recordsWritten -= value
+ private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value
+
+ // Legacy methods for backward compatibility.
+ // TODO: remove these once we make this class private.
+ @deprecated("use bytesWritten instead", "2.0.0")
+ def shuffleBytesWritten: Long = bytesWritten
+ @deprecated("use writeTime instead", "2.0.0")
+ def shuffleWriteTime: Long = writeTime
+ @deprecated("use recordsWritten instead", "2.0.0")
+ def shuffleRecordsWritten: Long = recordsWritten
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 3130a65240..f5267f58c2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -271,7 +271,7 @@ class StatsReportListener extends SparkListener with Logging {
// Shuffle write
showBytesDistribution("shuffle bytes written:",
- (_, metric) => metric.shuffleWriteMetrics.map(_.shuffleBytesWritten), taskInfoMetrics)
+ (_, metric) => metric.shuffleWriteMetrics.map(_.bytesWritten), taskInfoMetrics)
// Fetch & I/O
showMillisDistribution("fetch wait time:",
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index 294e16cde1..2970968f0b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -90,7 +90,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, so should be included in the shuffle write time.
- writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
+ writeMetrics.incWriteTime(System.nanoTime - openStartTime)
override def releaseWriters(success: Boolean) {
shuffleState.completedMapTasks.add(mapId)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index f83cf8859e..5c5a5f5a4c 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -94,7 +94,7 @@ private[spark] class SortShuffleWriter[K, V, C](
val startTime = System.nanoTime()
sorter.stop()
context.taskMetrics.shuffleWriteMetrics.foreach(
- _.incShuffleWriteTime(System.nanoTime - startTime))
+ _.incWriteTime(System.nanoTime - startTime))
sorter = null
}
}
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 341ae78236..078718ba11 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -214,9 +214,9 @@ private[v1] object AllStagesResource {
raw.shuffleWriteMetrics
}
def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions(
- writeBytes = submetricQuantiles(_.shuffleBytesWritten),
- writeRecords = submetricQuantiles(_.shuffleRecordsWritten),
- writeTime = submetricQuantiles(_.shuffleWriteTime)
+ writeBytes = submetricQuantiles(_.bytesWritten),
+ writeRecords = submetricQuantiles(_.recordsWritten),
+ writeTime = submetricQuantiles(_.writeTime)
)
}.metricOption
@@ -283,9 +283,9 @@ private[v1] object AllStagesResource {
def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = {
new ShuffleWriteMetrics(
- bytesWritten = internal.shuffleBytesWritten,
- writeTime = internal.shuffleWriteTime,
- recordsWritten = internal.shuffleRecordsWritten
+ bytesWritten = internal.bytesWritten,
+ writeTime = internal.writeTime,
+ recordsWritten = internal.recordsWritten
)
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index e36a367323..c34d49c0d9 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -102,7 +102,7 @@ private[spark] class DiskBlockObjectWriter(
objOut.flush()
val start = System.nanoTime()
fos.getFD.sync()
- writeMetrics.incShuffleWriteTime(System.nanoTime() - start)
+ writeMetrics.incWriteTime(System.nanoTime() - start)
}
} {
objOut.close()
@@ -132,7 +132,7 @@ private[spark] class DiskBlockObjectWriter(
close()
finalPosition = file.length()
// In certain compression codecs, more bytes are written after close() is called
- writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
+ writeMetrics.incBytesWritten(finalPosition - reportedPosition)
} else {
finalPosition = file.length()
}
@@ -152,8 +152,8 @@ private[spark] class DiskBlockObjectWriter(
// truncating the file to its initial position.
try {
if (initialized) {
- writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
- writeMetrics.decShuffleRecordsWritten(numRecordsWritten)
+ writeMetrics.decBytesWritten(reportedPosition - initialPosition)
+ writeMetrics.decRecordsWritten(numRecordsWritten)
objOut.flush()
bs.flush()
close()
@@ -201,7 +201,7 @@ private[spark] class DiskBlockObjectWriter(
*/
def recordWritten(): Unit = {
numRecordsWritten += 1
- writeMetrics.incShuffleRecordsWritten(1)
+ writeMetrics.incRecordsWritten(1)
if (numRecordsWritten % 32 == 0) {
updateBytesWritten()
@@ -226,7 +226,7 @@ private[spark] class DiskBlockObjectWriter(
*/
private def updateBytesWritten() {
val pos = channel.position()
- writeMetrics.incShuffleBytesWritten(pos - reportedPosition)
+ writeMetrics.incBytesWritten(pos - reportedPosition)
reportedPosition = pos
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 2d955a6660..160d7a4dff 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -129,7 +129,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
}
metrics.shuffleWriteMetrics.foreach { shuffleWrite =>
executorToShuffleWrite(eid) =
- executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten
+ executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.bytesWritten
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index ca37829216..4a9f8b3052 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -426,14 +426,14 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new ExecutorSummary)
val shuffleWriteDelta =
- (taskMetrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L)
- - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L))
+ (taskMetrics.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L)
+ - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.bytesWritten).getOrElse(0L))
stageData.shuffleWriteBytes += shuffleWriteDelta
execSummary.shuffleWrite += shuffleWriteDelta
val shuffleWriteRecordsDelta =
- (taskMetrics.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L)
- - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleRecordsWritten).getOrElse(0L))
+ (taskMetrics.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L)
+ - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.recordsWritten).getOrElse(0L))
stageData.shuffleWriteRecords += shuffleWriteRecordsDelta
execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 6d4066a870..914f6183cc 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -500,11 +500,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
getFormattedSizeQuantiles(shuffleReadRemoteSizes)
val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
+ metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
}
val shuffleWriteRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L).toDouble
+ metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
}
val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +:
@@ -619,7 +619,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val shuffleReadTimeProportion = toProportion(shuffleReadTime)
val shuffleWriteTime =
(metricsOpt.flatMap(_.shuffleWriteMetrics
- .map(_.shuffleWriteTime)).getOrElse(0L) / 1e6).toLong
+ .map(_.writeTime)).getOrElse(0L) / 1e6).toLong
val shuffleWriteTimeProportion = toProportion(shuffleWriteTime)
val serializationTime = metricsOpt.map(_.resultSerializationTime).getOrElse(0L)
@@ -930,13 +930,13 @@ private[ui] class TaskDataSource(
val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
- val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten).getOrElse(0L)
+ val shuffleWriteSortable = maybeShuffleWrite.map(_.bytesWritten).getOrElse(0L)
val shuffleWriteReadable = maybeShuffleWrite
- .map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("")
+ .map(m => s"${Utils.bytesToString(m.bytesWritten)}").getOrElse("")
val shuffleWriteRecords = maybeShuffleWrite
- .map(_.shuffleRecordsWritten.toString).getOrElse("")
+ .map(_.recordsWritten.toString).getOrElse("")
- val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
+ val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.writeTime)
val writeTimeSortable = maybeWriteTime.getOrElse(0L)
val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
if (ms == 0) "" else UIUtils.formatDuration(ms)
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a6460bc8b8..b88221a249 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -331,10 +331,11 @@ private[spark] object JsonProtocol {
("Total Records Read" -> shuffleReadMetrics.recordsRead)
}
+ // TODO: Drop the redundant "Shuffle" since it's inconsistent with related classes.
def shuffleWriteMetricsToJson(shuffleWriteMetrics: ShuffleWriteMetrics): JValue = {
- ("Shuffle Bytes Written" -> shuffleWriteMetrics.shuffleBytesWritten) ~
- ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime) ~
- ("Shuffle Records Written" -> shuffleWriteMetrics.shuffleRecordsWritten)
+ ("Shuffle Bytes Written" -> shuffleWriteMetrics.bytesWritten) ~
+ ("Shuffle Write Time" -> shuffleWriteMetrics.writeTime) ~
+ ("Shuffle Records Written" -> shuffleWriteMetrics.recordsWritten)
}
def inputMetricsToJson(inputMetrics: InputMetrics): JValue = {
@@ -752,9 +753,9 @@ private[spark] object JsonProtocol {
def shuffleWriteMetricsFromJson(json: JValue): ShuffleWriteMetrics = {
val metrics = new ShuffleWriteMetrics
- metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long])
- metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long])
- metrics.setShuffleRecordsWritten((json \ "Shuffle Records Written")
+ metrics.incBytesWritten((json \ "Shuffle Bytes Written").extract[Long])
+ metrics.incWriteTime((json \ "Shuffle Write Time").extract[Long])
+ metrics.setRecordsWritten((json \ "Shuffle Records Written")
.extractOpt[Long].getOrElse(0))
metrics
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 4a44481cf4..ff9dad7d38 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -193,8 +193,8 @@ class ExternalAppendOnlyMap[K, V, C](
val w = writer
writer = null
w.commitAndClose()
- _diskBytesSpilled += curWriteMetrics.shuffleBytesWritten
- batchSizes.append(curWriteMetrics.shuffleBytesWritten)
+ _diskBytesSpilled += curWriteMetrics.bytesWritten
+ batchSizes.append(curWriteMetrics.bytesWritten)
objectsWritten = 0
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 63ba954a7f..4c7416e00b 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -262,8 +262,8 @@ private[spark] class ExternalSorter[K, V, C](
val w = writer
writer = null
w.commitAndClose()
- _diskBytesSpilled += spillMetrics.shuffleBytesWritten
- batchSizes.append(spillMetrics.shuffleBytesWritten)
+ _diskBytesSpilled += spillMetrics.bytesWritten
+ batchSizes.append(spillMetrics.bytesWritten)
spillMetrics = null
objectsWritten = 0
}
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 5fe64bde36..625fdd57eb 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -279,8 +279,8 @@ public class UnsafeShuffleWriterSuite {
assertTrue(mapStatus.isDefined());
assertTrue(mergedOutputFile.exists());
assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
- assertEquals(0, taskMetrics.shuffleWriteMetrics().get().shuffleRecordsWritten());
- assertEquals(0, taskMetrics.shuffleWriteMetrics().get().shuffleBytesWritten());
+ assertEquals(0, taskMetrics.shuffleWriteMetrics().get().recordsWritten());
+ assertEquals(0, taskMetrics.shuffleWriteMetrics().get().bytesWritten());
assertEquals(0, taskMetrics.diskBytesSpilled());
assertEquals(0, taskMetrics.memoryBytesSpilled());
}
@@ -311,10 +311,10 @@ public class UnsafeShuffleWriterSuite {
HashMultiset.create(readRecordsFromFile()));
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
- assertEquals(dataToWrite.size(), shuffleWriteMetrics.shuffleRecordsWritten());
+ assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertEquals(0, taskMetrics.diskBytesSpilled());
assertEquals(0, taskMetrics.memoryBytesSpilled());
- assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.shuffleBytesWritten());
+ assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}
private void testMergingSpills(
@@ -354,11 +354,11 @@ public class UnsafeShuffleWriterSuite {
assertEquals(HashMultiset.create(dataToWrite), HashMultiset.create(readRecordsFromFile()));
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
- assertEquals(dataToWrite.size(), shuffleWriteMetrics.shuffleRecordsWritten());
+ assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
- assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.shuffleBytesWritten());
+ assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}
@Test
@@ -416,11 +416,11 @@ public class UnsafeShuffleWriterSuite {
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
- assertEquals(dataToWrite.size(), shuffleWriteMetrics.shuffleRecordsWritten());
+ assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
- assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.shuffleBytesWritten());
+ assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}
@Test
@@ -437,11 +437,11 @@ public class UnsafeShuffleWriterSuite {
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
- assertEquals(dataToWrite.size(), shuffleWriteMetrics.shuffleRecordsWritten());
+ assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
- assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.shuffleBytesWritten());
+ assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}
@Test
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index c45d81459e..6ffa1c8ac1 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -450,8 +450,8 @@ object ShuffleSuite {
val listener = new SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
taskEnd.taskMetrics.shuffleWriteMetrics.foreach { m =>
- recordsWritten += m.shuffleRecordsWritten
- bytesWritten += m.shuffleBytesWritten
+ recordsWritten += m.recordsWritten
+ bytesWritten += m.bytesWritten
}
taskEnd.taskMetrics.shuffleReadMetrics.foreach { m =>
recordsRead += m.recordsRead
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index aaf62e0f91..e5a448298a 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -212,7 +212,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
metrics.inputMetrics.foreach(inputRead += _.recordsRead)
metrics.outputMetrics.foreach(outputWritten += _.recordsWritten)
metrics.shuffleReadMetrics.foreach(shuffleRead += _.recordsRead)
- metrics.shuffleWriteMetrics.foreach(shuffleWritten += _.shuffleRecordsWritten)
+ metrics.shuffleWriteMetrics.foreach(shuffleWritten += _.recordsWritten)
}
})
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index dc15f5932d..c87158d89f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -269,7 +269,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
taskMetrics.inputMetrics should not be ('defined)
taskMetrics.outputMetrics should not be ('defined)
taskMetrics.shuffleWriteMetrics should be ('defined)
- taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0L)
+ taskMetrics.shuffleWriteMetrics.get.bytesWritten should be > (0L)
}
if (stageInfo.rddInfos.exists(_.name == d4.name)) {
taskMetrics.shuffleReadMetrics should be ('defined)
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index ef6ce04e3f..fdacd8c9f5 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -145,8 +145,8 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
assert(outputFile.length() === 0)
assert(temporaryFilesCreated.isEmpty)
val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
- assert(shuffleWriteMetrics.shuffleBytesWritten === 0)
- assert(shuffleWriteMetrics.shuffleRecordsWritten === 0)
+ assert(shuffleWriteMetrics.bytesWritten === 0)
+ assert(shuffleWriteMetrics.recordsWritten === 0)
assert(taskMetrics.diskBytesSpilled === 0)
assert(taskMetrics.memoryBytesSpilled === 0)
}
@@ -169,8 +169,8 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
assert(writer.getPartitionLengths.filter(_ == 0L).size === 4) // should be 4 zero length files
assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted
val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
- assert(shuffleWriteMetrics.shuffleBytesWritten === outputFile.length())
- assert(shuffleWriteMetrics.shuffleRecordsWritten === records.length)
+ assert(shuffleWriteMetrics.bytesWritten === outputFile.length())
+ assert(shuffleWriteMetrics.recordsWritten === records.length)
assert(taskMetrics.diskBytesSpilled === 0)
assert(taskMetrics.memoryBytesSpilled === 0)
}
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index 5d36617cfc..8eff3c2970 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -50,18 +50,18 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
writer.write(Long.box(20), Long.box(30))
// Record metrics update on every write
- assert(writeMetrics.shuffleRecordsWritten === 1)
+ assert(writeMetrics.recordsWritten === 1)
// Metrics don't update on every write
- assert(writeMetrics.shuffleBytesWritten == 0)
+ assert(writeMetrics.bytesWritten == 0)
// After 32 writes, metrics should update
for (i <- 0 until 32) {
writer.flush()
writer.write(Long.box(i), Long.box(i))
}
- assert(writeMetrics.shuffleBytesWritten > 0)
- assert(writeMetrics.shuffleRecordsWritten === 33)
+ assert(writeMetrics.bytesWritten > 0)
+ assert(writeMetrics.recordsWritten === 33)
writer.commitAndClose()
- assert(file.length() == writeMetrics.shuffleBytesWritten)
+ assert(file.length() == writeMetrics.bytesWritten)
}
test("verify write metrics on revert") {
@@ -72,19 +72,19 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
writer.write(Long.box(20), Long.box(30))
// Record metrics update on every write
- assert(writeMetrics.shuffleRecordsWritten === 1)
+ assert(writeMetrics.recordsWritten === 1)
// Metrics don't update on every write
- assert(writeMetrics.shuffleBytesWritten == 0)
+ assert(writeMetrics.bytesWritten == 0)
// After 32 writes, metrics should update
for (i <- 0 until 32) {
writer.flush()
writer.write(Long.box(i), Long.box(i))
}
- assert(writeMetrics.shuffleBytesWritten > 0)
- assert(writeMetrics.shuffleRecordsWritten === 33)
+ assert(writeMetrics.bytesWritten > 0)
+ assert(writeMetrics.recordsWritten === 33)
writer.revertPartialWritesAndClose()
- assert(writeMetrics.shuffleBytesWritten == 0)
- assert(writeMetrics.shuffleRecordsWritten == 0)
+ assert(writeMetrics.bytesWritten == 0)
+ assert(writeMetrics.recordsWritten == 0)
}
test("Reopening a closed block writer") {
@@ -109,11 +109,11 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
writer.write(i, i)
}
writer.commitAndClose()
- val bytesWritten = writeMetrics.shuffleBytesWritten
- assert(writeMetrics.shuffleRecordsWritten === 1000)
+ val bytesWritten = writeMetrics.bytesWritten
+ assert(writeMetrics.recordsWritten === 1000)
writer.revertPartialWritesAndClose()
- assert(writeMetrics.shuffleRecordsWritten === 1000)
- assert(writeMetrics.shuffleBytesWritten === bytesWritten)
+ assert(writeMetrics.recordsWritten === 1000)
+ assert(writeMetrics.bytesWritten === bytesWritten)
}
test("commitAndClose() should be idempotent") {
@@ -125,13 +125,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
writer.write(i, i)
}
writer.commitAndClose()
- val bytesWritten = writeMetrics.shuffleBytesWritten
- val writeTime = writeMetrics.shuffleWriteTime
- assert(writeMetrics.shuffleRecordsWritten === 1000)
+ val bytesWritten = writeMetrics.bytesWritten
+ val writeTime = writeMetrics.writeTime
+ assert(writeMetrics.recordsWritten === 1000)
writer.commitAndClose()
- assert(writeMetrics.shuffleRecordsWritten === 1000)
- assert(writeMetrics.shuffleBytesWritten === bytesWritten)
- assert(writeMetrics.shuffleWriteTime === writeTime)
+ assert(writeMetrics.recordsWritten === 1000)
+ assert(writeMetrics.bytesWritten === bytesWritten)
+ assert(writeMetrics.writeTime === writeTime)
}
test("revertPartialWritesAndClose() should be idempotent") {
@@ -143,13 +143,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
writer.write(i, i)
}
writer.revertPartialWritesAndClose()
- val bytesWritten = writeMetrics.shuffleBytesWritten
- val writeTime = writeMetrics.shuffleWriteTime
- assert(writeMetrics.shuffleRecordsWritten === 0)
+ val bytesWritten = writeMetrics.bytesWritten
+ val writeTime = writeMetrics.writeTime
+ assert(writeMetrics.recordsWritten === 0)
writer.revertPartialWritesAndClose()
- assert(writeMetrics.shuffleRecordsWritten === 0)
- assert(writeMetrics.shuffleBytesWritten === bytesWritten)
- assert(writeMetrics.shuffleWriteTime === writeTime)
+ assert(writeMetrics.recordsWritten === 0)
+ assert(writeMetrics.bytesWritten === bytesWritten)
+ assert(writeMetrics.writeTime === writeTime)
}
test("fileSegment() can only be called after commitAndClose() has been called") {
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index e02f5a1b20..ee2d56a679 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -277,7 +277,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
shuffleReadMetrics.incRemoteBytesRead(base + 1)
shuffleReadMetrics.incLocalBytesRead(base + 9)
shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
- shuffleWriteMetrics.incShuffleBytesWritten(base + 3)
+ shuffleWriteMetrics.incBytesWritten(base + 3)
taskMetrics.setExecutorRunTime(base + 4)
taskMetrics.incDiskBytesSpilled(base + 5)
taskMetrics.incMemoryBytesSpilled(base + 6)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 068e8397c8..9dd400fc1c 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -227,7 +227,7 @@ class JsonProtocolSuite extends SparkFunSuite {
.removeField { case (field, _) => field == "Shuffle Records Written" }
val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
assert(newMetrics.shuffleReadMetrics.get.recordsRead == 0)
- assert(newMetrics.shuffleWriteMetrics.get.shuffleRecordsWritten == 0)
+ assert(newMetrics.shuffleWriteMetrics.get.recordsWritten == 0)
}
test("OutputMetrics backward compatibility") {
@@ -568,8 +568,8 @@ class JsonProtocolSuite extends SparkFunSuite {
}
private def assertEquals(metrics1: ShuffleWriteMetrics, metrics2: ShuffleWriteMetrics) {
- assert(metrics1.shuffleBytesWritten === metrics2.shuffleBytesWritten)
- assert(metrics1.shuffleWriteTime === metrics2.shuffleWriteTime)
+ assert(metrics1.bytesWritten === metrics2.bytesWritten)
+ assert(metrics1.writeTime === metrics2.writeTime)
}
private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) {
@@ -794,9 +794,9 @@ class JsonProtocolSuite extends SparkFunSuite {
t.outputMetrics = Some(outputMetrics)
} else {
val sw = new ShuffleWriteMetrics
- sw.incShuffleBytesWritten(a + b + c)
- sw.incShuffleWriteTime(b + c + d)
- sw.setShuffleRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
+ sw.incBytesWritten(a + b + c)
+ sw.incWriteTime(b + c + d)
+ sw.setRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
t.shuffleWriteMetrics = Some(sw)
}
// Make at most 6 blocks