diff options
author | Davies Liu <davies@databricks.com> | 2015-11-12 22:44:57 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-11-12 22:44:57 -0800 |
commit | ad960885bfee7850c18eb5338546cecf2b2e9876 (patch) | |
tree | 07f56b97c6e7f38a7400dabb98be6f1942fab184 /core/src/main/java | |
parent | ea5ae2705afa4eaadd4192c37d74c97364378cf9 (diff) | |
download | spark-ad960885bfee7850c18eb5338546cecf2b2e9876.tar.gz spark-ad960885bfee7850c18eb5338546cecf2b2e9876.tar.bz2 spark-ad960885bfee7850c18eb5338546cecf2b2e9876.zip |
[SPARK-8029] Robust shuffle writer
Currently, all the shuffle writer will write to target path directly, the file could be corrupted by other attempt of the same partition on the same executor. They should write to temporary file then rename to target path, as what we do in output committer. In order to make the rename atomic, the temporary file should be created in the same local directory (FileSystem).
This PR is based on #9214 , thanks to squito . Closes #9214
Author: Davies Liu <davies@databricks.com>
Closes #9610 from davies/safe_shuffle.
Diffstat (limited to 'core/src/main/java')
-rw-r--r-- | core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java | 9 | ||||
-rw-r--r-- | core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 13 |
2 files changed, 12 insertions, 10 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index ee82d67993..a1a1fb0142 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -125,7 +125,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> { assert (partitionWriters == null); if (!records.hasNext()) { partitionLengths = new long[numPartitions]; - shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths); + shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); return; } @@ -155,9 +155,10 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> { writer.commitAndClose(); } - partitionLengths = - writePartitionedFile(shuffleBlockResolver.getDataFile(shuffleId, mapId)); - shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths); + File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); + File tmp = Utils.tempFileWith(output); + partitionLengths = writePartitionedFile(tmp); + shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 6a0a89e81c..744c3008ca 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -41,7 +41,7 @@ import org.apache.spark.annotation.Private; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.io.CompressionCodec; import org.apache.spark.io.CompressionCodec$; -import org.apache.spark.io.LZFCompressionCodec; +import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.network.util.LimitedInputStream; import org.apache.spark.scheduler.MapStatus; import org.apache.spark.scheduler.MapStatus$; @@ -53,7 +53,7 @@ import org.apache.spark.shuffle.ShuffleWriter; import org.apache.spark.storage.BlockManager; import org.apache.spark.storage.TimeTrackingOutputStream; import org.apache.spark.unsafe.Platform; -import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.util.Utils; @Private public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { @@ -206,8 +206,10 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { final SpillInfo[] spills = sorter.closeAndGetSpills(); sorter = null; final long[] partitionLengths; + final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); + final File tmp = Utils.tempFileWith(output); try { - partitionLengths = mergeSpills(spills); + partitionLengths = mergeSpills(spills, tmp); } finally { for (SpillInfo spill : spills) { if (spill.file.exists() && ! spill.file.delete()) { @@ -215,7 +217,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { } } } - shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths); + shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); } @@ -248,8 +250,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { * * @return the partition lengths in the merged file. */ - private long[] mergeSpills(SpillInfo[] spills) throws IOException { - final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId); + private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException { final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true); final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf); final boolean fastMergeEnabled = |