aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java25
1 files changed, 14 insertions, 11 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index a1a1fb0142..56cdc22f36 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -138,7 +138,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
- blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open();
+ blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
@@ -185,16 +185,19 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
- final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file());
- boolean copyThrewException = true;
- try {
- lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
- copyThrewException = false;
- } finally {
- Closeables.close(in, copyThrewException);
- }
- if (!partitionWriters[i].fileSegment().file().delete()) {
- logger.error("Unable to delete file for partition {}", i);
+ final File file = partitionWriters[i].fileSegment().file();
+ if (file.exists()) {
+ final FileInputStream in = new FileInputStream(file);
+ boolean copyThrewException = true;
+ try {
+ lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
+ copyThrewException = false;
+ } finally {
+ Closeables.close(in, copyThrewException);
+ }
+ if (!file.delete()) {
+ logger.error("Unable to delete file for partition {}", i);
+ }
}
}
threwException = false;