aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorBrian Cho <bcho@fb.com>2016-07-24 19:36:58 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-07-24 19:36:58 -0700
commitdaace6014216b996bcc8937f1fdcea732b6910ca (patch)
treeae328f4d9fb1e11cc0034ed26665082dd92507a8 /core/src/test/scala
parent1221ce04029154778ccb5453e348f6d116092cc5 (diff)
downloadspark-daace6014216b996bcc8937f1fdcea732b6910ca.tar.gz
spark-daace6014216b996bcc8937f1fdcea732b6910ca.tar.bz2
spark-daace6014216b996bcc8937f1fdcea732b6910ca.zip
[SPARK-5581][CORE] When writing sorted map output file, avoid open / …
…close between each partition ## What changes were proposed in this pull request? Replace commitAndClose with separate commit and close to avoid opening and closing the file between partitions. ## How was this patch tested? Run existing unit tests, add a few unit tests regarding reverts. Observed a ~20% reduction in total time in tasks on stages with shuffle writes to many partitions. JoshRosen Author: Brian Cho <bcho@fb.com> Closes #13382 from dafrista/separatecommit-master.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala67
1 files changed, 46 insertions, 21 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index ec4ef4b2fc..059c2c2444 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -60,7 +60,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
}
assert(writeMetrics.bytesWritten > 0)
assert(writeMetrics.recordsWritten === 16385)
- writer.commitAndClose()
+ writer.commitAndGet()
+ writer.close()
assert(file.length() == writeMetrics.bytesWritten)
}
@@ -100,6 +101,40 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}
+ test("calling revertPartialWritesAndClose() on a partial write should truncate up to commit") {
+ val file = new File(tempDir, "somefile")
+ val writeMetrics = new ShuffleWriteMetrics()
+ val writer = new DiskBlockObjectWriter(
+ file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+
+ writer.write(Long.box(20), Long.box(30))
+ val firstSegment = writer.commitAndGet()
+ assert(firstSegment.length === file.length())
+ assert(writeMetrics.shuffleBytesWritten === file.length())
+
+ writer.write(Long.box(40), Long.box(50))
+
+ writer.revertPartialWritesAndClose()
+ assert(firstSegment.length === file.length())
+ assert(writeMetrics.shuffleBytesWritten === file.length())
+ }
+
+ test("calling revertPartialWritesAndClose() after commit() should have no effect") {
+ val file = new File(tempDir, "somefile")
+ val writeMetrics = new ShuffleWriteMetrics()
+ val writer = new DiskBlockObjectWriter(
+ file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+
+ writer.write(Long.box(20), Long.box(30))
+ val firstSegment = writer.commitAndGet()
+ assert(firstSegment.length === file.length())
+ assert(writeMetrics.shuffleBytesWritten === file.length())
+
+ writer.revertPartialWritesAndClose()
+ assert(firstSegment.length === file.length())
+ assert(writeMetrics.shuffleBytesWritten === file.length())
+ }
+
test("calling revertPartialWritesAndClose() on a closed block writer should have no effect") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
@@ -108,7 +143,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
for (i <- 1 to 1000) {
writer.write(i, i)
}
- writer.commitAndClose()
+ writer.commitAndGet()
+ writer.close()
val bytesWritten = writeMetrics.bytesWritten
assert(writeMetrics.recordsWritten === 1000)
writer.revertPartialWritesAndClose()
@@ -116,7 +152,7 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(writeMetrics.bytesWritten === bytesWritten)
}
- test("commitAndClose() should be idempotent") {
+ test("commit() and close() should be idempotent") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(
@@ -124,11 +160,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
for (i <- 1 to 1000) {
writer.write(i, i)
}
- writer.commitAndClose()
+ writer.commitAndGet()
+ writer.close()
val bytesWritten = writeMetrics.bytesWritten
val writeTime = writeMetrics.writeTime
assert(writeMetrics.recordsWritten === 1000)
- writer.commitAndClose()
+ writer.commitAndGet()
+ writer.close()
assert(writeMetrics.recordsWritten === 1000)
assert(writeMetrics.bytesWritten === bytesWritten)
assert(writeMetrics.writeTime === writeTime)
@@ -152,26 +190,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(writeMetrics.writeTime === writeTime)
}
- test("fileSegment() can only be called after commitAndClose() has been called") {
+ test("commit() and close() without ever opening or writing") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(
file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
- for (i <- 1 to 1000) {
- writer.write(i, i)
- }
- intercept[IllegalStateException] {
- writer.fileSegment()
- }
+ val segment = writer.commitAndGet()
writer.close()
- }
-
- test("commitAndClose() without ever opening or writing") {
- val file = new File(tempDir, "somefile")
- val writeMetrics = new ShuffleWriteMetrics()
- val writer = new DiskBlockObjectWriter(
- file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
- writer.commitAndClose()
- assert(writer.fileSegment().length === 0)
+ assert(segment.length === 0)
}
}