aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-01-18 19:22:29 -0800
committerJosh Rosen <joshrosen@databricks.com>2016-01-18 19:22:29 -0800
commit2b5d11f34d73eb7117c0c4668c1abb27dcc3a403 (patch)
tree424b549a6ad291ead0468de16654d06c25f3b5d0 /core/src/test/scala/org/apache
parent323d51f1dadf733e413203d678cb3f76e4d68981 (diff)
downloadspark-2b5d11f34d73eb7117c0c4668c1abb27dcc3a403.tar.gz
spark-2b5d11f34d73eb7117c0c4668c1abb27dcc3a403.tar.bz2
spark-2b5d11f34d73eb7117c0c4668c1abb27dcc3a403.zip
[SPARK-12885][MINOR] Rename 3 fields in ShuffleWriteMetrics
This is a small step in implementing SPARK-10620, which migrates TaskMetrics to accumulators. This patch is strictly a cleanup patch and introduces no change in functionality. It literally just renames 3 fields for consistency. Today we have: ``` inputMetrics.recordsRead outputMetrics.bytesWritten shuffleReadMetrics.localBlocksFetched ... shuffleWriteMetrics.shuffleRecordsWritten shuffleWriteMetrics.shuffleBytesWritten shuffleWriteMetrics.shuffleWriteTime ``` The shuffle write ones are kind of redundant. We can drop the `shuffle` part in the method names. I added backward compatible (but deprecated) methods with the old names. Parent PR: #10717 Author: Andrew Or <andrew@databricks.com> Closes #10811 from andrewor14/rename-things.
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala54
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala12
7 files changed, 42 insertions, 42 deletions
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index c45d81459e..6ffa1c8ac1 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -450,8 +450,8 @@ object ShuffleSuite {
val listener = new SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
taskEnd.taskMetrics.shuffleWriteMetrics.foreach { m =>
- recordsWritten += m.shuffleRecordsWritten
- bytesWritten += m.shuffleBytesWritten
+ recordsWritten += m.recordsWritten
+ bytesWritten += m.bytesWritten
}
taskEnd.taskMetrics.shuffleReadMetrics.foreach { m =>
recordsRead += m.recordsRead
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index aaf62e0f91..e5a448298a 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -212,7 +212,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
metrics.inputMetrics.foreach(inputRead += _.recordsRead)
metrics.outputMetrics.foreach(outputWritten += _.recordsWritten)
metrics.shuffleReadMetrics.foreach(shuffleRead += _.recordsRead)
- metrics.shuffleWriteMetrics.foreach(shuffleWritten += _.shuffleRecordsWritten)
+ metrics.shuffleWriteMetrics.foreach(shuffleWritten += _.recordsWritten)
}
})
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index dc15f5932d..c87158d89f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -269,7 +269,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
taskMetrics.inputMetrics should not be ('defined)
taskMetrics.outputMetrics should not be ('defined)
taskMetrics.shuffleWriteMetrics should be ('defined)
- taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0L)
+ taskMetrics.shuffleWriteMetrics.get.bytesWritten should be > (0L)
}
if (stageInfo.rddInfos.exists(_.name == d4.name)) {
taskMetrics.shuffleReadMetrics should be ('defined)
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index ef6ce04e3f..fdacd8c9f5 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -145,8 +145,8 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
assert(outputFile.length() === 0)
assert(temporaryFilesCreated.isEmpty)
val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
- assert(shuffleWriteMetrics.shuffleBytesWritten === 0)
- assert(shuffleWriteMetrics.shuffleRecordsWritten === 0)
+ assert(shuffleWriteMetrics.bytesWritten === 0)
+ assert(shuffleWriteMetrics.recordsWritten === 0)
assert(taskMetrics.diskBytesSpilled === 0)
assert(taskMetrics.memoryBytesSpilled === 0)
}
@@ -169,8 +169,8 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
assert(writer.getPartitionLengths.filter(_ == 0L).size === 4) // should be 4 zero length files
assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted
val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
- assert(shuffleWriteMetrics.shuffleBytesWritten === outputFile.length())
- assert(shuffleWriteMetrics.shuffleRecordsWritten === records.length)
+ assert(shuffleWriteMetrics.bytesWritten === outputFile.length())
+ assert(shuffleWriteMetrics.recordsWritten === records.length)
assert(taskMetrics.diskBytesSpilled === 0)
assert(taskMetrics.memoryBytesSpilled === 0)
}
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index 5d36617cfc..8eff3c2970 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -50,18 +50,18 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
writer.write(Long.box(20), Long.box(30))
// Record metrics update on every write
- assert(writeMetrics.shuffleRecordsWritten === 1)
+ assert(writeMetrics.recordsWritten === 1)
// Metrics don't update on every write
- assert(writeMetrics.shuffleBytesWritten == 0)
+ assert(writeMetrics.bytesWritten == 0)
// After 32 writes, metrics should update
for (i <- 0 until 32) {
writer.flush()
writer.write(Long.box(i), Long.box(i))
}
- assert(writeMetrics.shuffleBytesWritten > 0)
- assert(writeMetrics.shuffleRecordsWritten === 33)
+ assert(writeMetrics.bytesWritten > 0)
+ assert(writeMetrics.recordsWritten === 33)
writer.commitAndClose()
- assert(file.length() == writeMetrics.shuffleBytesWritten)
+ assert(file.length() == writeMetrics.bytesWritten)
}
test("verify write metrics on revert") {
@@ -72,19 +72,19 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
writer.write(Long.box(20), Long.box(30))
// Record metrics update on every write
- assert(writeMetrics.shuffleRecordsWritten === 1)
+ assert(writeMetrics.recordsWritten === 1)
// Metrics don't update on every write
- assert(writeMetrics.shuffleBytesWritten == 0)
+ assert(writeMetrics.bytesWritten == 0)
// After 32 writes, metrics should update
for (i <- 0 until 32) {
writer.flush()
writer.write(Long.box(i), Long.box(i))
}
- assert(writeMetrics.shuffleBytesWritten > 0)
- assert(writeMetrics.shuffleRecordsWritten === 33)
+ assert(writeMetrics.bytesWritten > 0)
+ assert(writeMetrics.recordsWritten === 33)
writer.revertPartialWritesAndClose()
- assert(writeMetrics.shuffleBytesWritten == 0)
- assert(writeMetrics.shuffleRecordsWritten == 0)
+ assert(writeMetrics.bytesWritten == 0)
+ assert(writeMetrics.recordsWritten == 0)
}
test("Reopening a closed block writer") {
@@ -109,11 +109,11 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
writer.write(i, i)
}
writer.commitAndClose()
- val bytesWritten = writeMetrics.shuffleBytesWritten
- assert(writeMetrics.shuffleRecordsWritten === 1000)
+ val bytesWritten = writeMetrics.bytesWritten
+ assert(writeMetrics.recordsWritten === 1000)
writer.revertPartialWritesAndClose()
- assert(writeMetrics.shuffleRecordsWritten === 1000)
- assert(writeMetrics.shuffleBytesWritten === bytesWritten)
+ assert(writeMetrics.recordsWritten === 1000)
+ assert(writeMetrics.bytesWritten === bytesWritten)
}
test("commitAndClose() should be idempotent") {
@@ -125,13 +125,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
writer.write(i, i)
}
writer.commitAndClose()
- val bytesWritten = writeMetrics.shuffleBytesWritten
- val writeTime = writeMetrics.shuffleWriteTime
- assert(writeMetrics.shuffleRecordsWritten === 1000)
+ val bytesWritten = writeMetrics.bytesWritten
+ val writeTime = writeMetrics.writeTime
+ assert(writeMetrics.recordsWritten === 1000)
writer.commitAndClose()
- assert(writeMetrics.shuffleRecordsWritten === 1000)
- assert(writeMetrics.shuffleBytesWritten === bytesWritten)
- assert(writeMetrics.shuffleWriteTime === writeTime)
+ assert(writeMetrics.recordsWritten === 1000)
+ assert(writeMetrics.bytesWritten === bytesWritten)
+ assert(writeMetrics.writeTime === writeTime)
}
test("revertPartialWritesAndClose() should be idempotent") {
@@ -143,13 +143,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
writer.write(i, i)
}
writer.revertPartialWritesAndClose()
- val bytesWritten = writeMetrics.shuffleBytesWritten
- val writeTime = writeMetrics.shuffleWriteTime
- assert(writeMetrics.shuffleRecordsWritten === 0)
+ val bytesWritten = writeMetrics.bytesWritten
+ val writeTime = writeMetrics.writeTime
+ assert(writeMetrics.recordsWritten === 0)
writer.revertPartialWritesAndClose()
- assert(writeMetrics.shuffleRecordsWritten === 0)
- assert(writeMetrics.shuffleBytesWritten === bytesWritten)
- assert(writeMetrics.shuffleWriteTime === writeTime)
+ assert(writeMetrics.recordsWritten === 0)
+ assert(writeMetrics.bytesWritten === bytesWritten)
+ assert(writeMetrics.writeTime === writeTime)
}
test("fileSegment() can only be called after commitAndClose() has been called") {
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index e02f5a1b20..ee2d56a679 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -277,7 +277,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
shuffleReadMetrics.incRemoteBytesRead(base + 1)
shuffleReadMetrics.incLocalBytesRead(base + 9)
shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
- shuffleWriteMetrics.incShuffleBytesWritten(base + 3)
+ shuffleWriteMetrics.incBytesWritten(base + 3)
taskMetrics.setExecutorRunTime(base + 4)
taskMetrics.incDiskBytesSpilled(base + 5)
taskMetrics.incMemoryBytesSpilled(base + 6)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 068e8397c8..9dd400fc1c 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -227,7 +227,7 @@ class JsonProtocolSuite extends SparkFunSuite {
.removeField { case (field, _) => field == "Shuffle Records Written" }
val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
assert(newMetrics.shuffleReadMetrics.get.recordsRead == 0)
- assert(newMetrics.shuffleWriteMetrics.get.shuffleRecordsWritten == 0)
+ assert(newMetrics.shuffleWriteMetrics.get.recordsWritten == 0)
}
test("OutputMetrics backward compatibility") {
@@ -568,8 +568,8 @@ class JsonProtocolSuite extends SparkFunSuite {
}
private def assertEquals(metrics1: ShuffleWriteMetrics, metrics2: ShuffleWriteMetrics) {
- assert(metrics1.shuffleBytesWritten === metrics2.shuffleBytesWritten)
- assert(metrics1.shuffleWriteTime === metrics2.shuffleWriteTime)
+ assert(metrics1.bytesWritten === metrics2.bytesWritten)
+ assert(metrics1.writeTime === metrics2.writeTime)
}
private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) {
@@ -794,9 +794,9 @@ class JsonProtocolSuite extends SparkFunSuite {
t.outputMetrics = Some(outputMetrics)
} else {
val sw = new ShuffleWriteMetrics
- sw.incShuffleBytesWritten(a + b + c)
- sw.incShuffleWriteTime(b + c + d)
- sw.setShuffleRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
+ sw.incBytesWritten(a + b + c)
+ sw.incWriteTime(b + c + d)
+ sw.setRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
t.shuffleWriteMetrics = Some(sw)
}
// Make at most 6 blocks