aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java10
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java18
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala80
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala14
4 files changed, 73 insertions, 49 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index 0fcc56d50a..4a15559e55 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -160,8 +160,14 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
File tmp = Utils.tempFileWith(output);
- partitionLengths = writePartitionedFile(tmp);
- shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
+ try {
+ partitionLengths = writePartitionedFile(tmp);
+ shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
+ } finally {
+ if (tmp.exists() && !tmp.delete()) {
+ logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
+ }
+ }
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 63d376b44f..f235c434be 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -210,15 +210,21 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
final File tmp = Utils.tempFileWith(output);
try {
- partitionLengths = mergeSpills(spills, tmp);
- } finally {
- for (SpillInfo spill : spills) {
- if (spill.file.exists() && ! spill.file.delete()) {
- logger.error("Error while deleting spill file {}", spill.file.getPath());
+ try {
+ partitionLengths = mergeSpills(spills, tmp);
+ } finally {
+ for (SpillInfo spill : spills) {
+ if (spill.file.exists() && ! spill.file.delete()) {
+ logger.error("Error while deleting spill file {}", spill.file.getPath());
+ }
}
}
+ shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
+ } finally {
+ if (tmp.exists() && !tmp.delete()) {
+ logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
+ }
}
- shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 94d8c0d0fd..8d6396bede 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -139,48 +139,54 @@ private[spark] class IndexShuffleBlockResolver(
dataTmp: File): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
- val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
- Utils.tryWithSafeFinally {
- // We take in lengths of each block, need to convert it to offsets.
- var offset = 0L
- out.writeLong(offset)
- for (length <- lengths) {
- offset += length
+ try {
+ val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
+ Utils.tryWithSafeFinally {
+ // We take in lengths of each block, need to convert it to offsets.
+ var offset = 0L
out.writeLong(offset)
+ for (length <- lengths) {
+ offset += length
+ out.writeLong(offset)
+ }
+ } {
+ out.close()
}
- } {
- out.close()
- }
- val dataFile = getDataFile(shuffleId, mapId)
- // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
- // the following check and rename are atomic.
- synchronized {
- val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
- if (existingLengths != null) {
- // Another attempt for the same task has already written our map outputs successfully,
- // so just use the existing partition lengths and delete our temporary map outputs.
- System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
- if (dataTmp != null && dataTmp.exists()) {
- dataTmp.delete()
- }
- indexTmp.delete()
- } else {
- // This is the first successful attempt in writing the map outputs for this task,
- // so override any existing index and data files with the ones we wrote.
- if (indexFile.exists()) {
- indexFile.delete()
- }
- if (dataFile.exists()) {
- dataFile.delete()
- }
- if (!indexTmp.renameTo(indexFile)) {
- throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
- }
- if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
- throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
+ val dataFile = getDataFile(shuffleId, mapId)
+ // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
+ // the following check and rename are atomic.
+ synchronized {
+ val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
+ if (existingLengths != null) {
+ // Another attempt for the same task has already written our map outputs successfully,
+ // so just use the existing partition lengths and delete our temporary map outputs.
+ System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
+ if (dataTmp != null && dataTmp.exists()) {
+ dataTmp.delete()
+ }
+ indexTmp.delete()
+ } else {
+ // This is the first successful attempt in writing the map outputs for this task,
+ // so override any existing index and data files with the ones we wrote.
+ if (indexFile.exists()) {
+ indexFile.delete()
+ }
+ if (dataFile.exists()) {
+ dataFile.delete()
+ }
+ if (!indexTmp.renameTo(indexFile)) {
+ throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
+ }
+ if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
+ throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
+ }
}
}
+ } finally {
+ if (indexTmp.exists() && !indexTmp.delete()) {
+ logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index cc01e6aa7e..636b88e792 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -67,10 +67,16 @@ private[spark] class SortShuffleWriter[K, V, C](
// (see SPARK-3570).
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
- val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
- val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
- shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
- mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
+ try {
+ val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
+ val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
+ shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
+ mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
+ } finally {
+ if (tmp.exists() && !tmp.delete()) {
+ logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
+ }
+ }
}
/** Close this writer, passing along whether the map completed */