aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-11-12 22:44:57 -0800
committerDavies Liu <davies.liu@gmail.com>2015-11-12 22:44:57 -0800
commitad960885bfee7850c18eb5338546cecf2b2e9876 (patch)
tree07f56b97c6e7f38a7400dabb98be6f1942fab184 /core/src/main/java
parentea5ae2705afa4eaadd4192c37d74c97364378cf9 (diff)
downloadspark-ad960885bfee7850c18eb5338546cecf2b2e9876.tar.gz
spark-ad960885bfee7850c18eb5338546cecf2b2e9876.tar.bz2
spark-ad960885bfee7850c18eb5338546cecf2b2e9876.zip
[SPARK-8029] Robust shuffle writer
Currently, all the shuffle writer will write to target path directly, the file could be corrupted by other attempt of the same partition on the same executor. They should write to temporary file then rename to target path, as what we do in output committer. In order to make the rename atomic, the temporary file should be created in the same local directory (FileSystem). This PR is based on #9214 , thanks to squito . Closes #9214 Author: Davies Liu <davies@databricks.com> Closes #9610 from davies/safe_shuffle.
Diffstat (limited to 'core/src/main/java')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java9
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java13
2 files changed, 12 insertions, 10 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index ee82d67993..a1a1fb0142 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -125,7 +125,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
assert (partitionWriters == null);
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
- shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
+ shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
@@ -155,9 +155,10 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
writer.commitAndClose();
}
- partitionLengths =
- writePartitionedFile(shuffleBlockResolver.getDataFile(shuffleId, mapId));
- shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
+ File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
+ File tmp = Utils.tempFileWith(output);
+ partitionLengths = writePartitionedFile(tmp);
+ shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 6a0a89e81c..744c3008ca 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -41,7 +41,7 @@ import org.apache.spark.annotation.Private;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.io.CompressionCodec;
import org.apache.spark.io.CompressionCodec$;
-import org.apache.spark.io.LZFCompressionCodec;
+import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
@@ -53,7 +53,7 @@ import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.unsafe.Platform;
-import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.util.Utils;
@Private
public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@@ -206,8 +206,10 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
+ final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
+ final File tmp = Utils.tempFileWith(output);
try {
- partitionLengths = mergeSpills(spills);
+ partitionLengths = mergeSpills(spills, tmp);
} finally {
for (SpillInfo spill : spills) {
if (spill.file.exists() && ! spill.file.delete()) {
@@ -215,7 +217,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
}
}
}
- shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
+ shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
@@ -248,8 +250,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
*
* @return the partition lengths in the merged file.
*/
- private long[] mergeSpills(SpillInfo[] spills) throws IOException {
- final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
+ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);
final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
final boolean fastMergeEnabled =