aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-01-13 16:34:23 -0800
committerJosh Rosen <joshrosen@databricks.com>2016-01-13 16:34:23 -0800
commitcd81fc9e8652c07b84f0887a24d67381b4e605fa (patch)
tree83f75886b61d38a0650101f8dfddcd418ad0b018 /core
parenteabc7b8ee7e809bab05361ed154f87bff467bd88 (diff)
downloadspark-cd81fc9e8652c07b84f0887a24d67381b4e605fa.tar.gz
spark-cd81fc9e8652c07b84f0887a24d67381b4e605fa.tar.bz2
spark-cd81fc9e8652c07b84f0887a24d67381b4e605fa.zip
[SPARK-12400][SHUFFLE] Avoid generating temp shuffle files for empty partitions
This problem lies in `BypassMergeSortShuffleWriter`, empty partition will also generate a temp shuffle file with several bytes. So here change to only create file when partition is not empty. This problem only lies in here, no such issue in `HashShuffleWriter`. Please help to review, thanks a lot. Author: jerryshao <sshao@hortonworks.com> Closes #10376 from jerryshao/SPARK-12400.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java25
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala38
2 files changed, 51 insertions, 12 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index a1a1fb0142..56cdc22f36 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -138,7 +138,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
- blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open();
+ blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
@@ -185,16 +185,19 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
- final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file());
- boolean copyThrewException = true;
- try {
- lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
- copyThrewException = false;
- } finally {
- Closeables.close(in, copyThrewException);
- }
- if (!partitionWriters[i].fileSegment().file().delete()) {
- logger.error("Unable to delete file for partition {}", i);
+ final File file = partitionWriters[i].fileSegment().file();
+ if (file.exists()) {
+ final FileInputStream in = new FileInputStream(file);
+ boolean copyThrewException = true;
+ try {
+ lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
+ copyThrewException = false;
+ } finally {
+ Closeables.close(in, copyThrewException);
+ }
+ if (!file.delete()) {
+ logger.error("Unable to delete file for partition {}", i);
+ }
}
}
threwException = false;
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index e33408b94e..ef6ce04e3f 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -105,7 +105,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
new Answer[(TempShuffleBlockId, File)] {
override def answer(invocation: InvocationOnMock): (TempShuffleBlockId, File) = {
val blockId = new TempShuffleBlockId(UUID.randomUUID)
- val file = File.createTempFile(blockId.toString, null, tempDir)
+ val file = new File(tempDir, blockId.name)
blockIdToFileMap.put(blockId, file)
temporaryFilesCreated.append(file)
(blockId, file)
@@ -166,6 +166,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
writer.stop( /* success = */ true)
assert(temporaryFilesCreated.nonEmpty)
assert(writer.getPartitionLengths.sum === outputFile.length())
+ assert(writer.getPartitionLengths.filter(_ == 0L).size === 4) // should be 4 zero length files
assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted
val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
assert(shuffleWriteMetrics.shuffleBytesWritten === outputFile.length())
@@ -174,6 +175,41 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
assert(taskMetrics.memoryBytesSpilled === 0)
}
+ test("only generate temp shuffle file for non-empty partition") {
+ // Using exception to test whether only non-empty partition creates temp shuffle file,
+ // because temp shuffle file will only be cleaned after calling stop(false) in the failure
+ // case, so we could use it to validate the temp shuffle files.
+ def records: Iterator[(Int, Int)] =
+ Iterator((1, 1), (5, 5)) ++
+ (0 until 100000).iterator.map { i =>
+ if (i == 99990) {
+ throw new SparkException("intentional failure")
+ } else {
+ (2, 2)
+ }
+ }
+
+ val writer = new BypassMergeSortShuffleWriter[Int, Int](
+ blockManager,
+ blockResolver,
+ shuffleHandle,
+ 0, // MapId
+ taskContext,
+ conf
+ )
+
+ intercept[SparkException] {
+ writer.write(records)
+ }
+
+ assert(temporaryFilesCreated.nonEmpty)
+ // Only 3 temp shuffle files will be created
+ assert(temporaryFilesCreated.count(_.exists()) === 3)
+
+ writer.stop( /* success = */ false)
+ assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted
+ }
+
test("cleanup of intermediate files after errors") {
val writer = new BypassMergeSortShuffleWriter[Int, Int](
blockManager,