aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java9
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java13
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala102
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala1
10 files changed, 158 insertions, 46 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index ee82d67993..a1a1fb0142 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -125,7 +125,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
assert (partitionWriters == null);
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
- shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
+ shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
@@ -155,9 +155,10 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
writer.commitAndClose();
}
- partitionLengths =
- writePartitionedFile(shuffleBlockResolver.getDataFile(shuffleId, mapId));
- shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
+ File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
+ File tmp = Utils.tempFileWith(output);
+ partitionLengths = writePartitionedFile(tmp);
+ shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 6a0a89e81c..744c3008ca 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -41,7 +41,7 @@ import org.apache.spark.annotation.Private;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.io.CompressionCodec;
import org.apache.spark.io.CompressionCodec$;
-import org.apache.spark.io.LZFCompressionCodec;
+import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
@@ -53,7 +53,7 @@ import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.unsafe.Platform;
-import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.util.Utils;
@Private
public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@@ -206,8 +206,10 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
+ final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
+ final File tmp = Utils.tempFileWith(output);
try {
- partitionLengths = mergeSpills(spills);
+ partitionLengths = mergeSpills(spills, tmp);
} finally {
for (SpillInfo spill : spills) {
if (spill.file.exists() && ! spill.file.delete()) {
@@ -215,7 +217,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
}
}
}
- shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
+ shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
@@ -248,8 +250,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
*
* @return the partition lengths in the merged file.
*/
- private long[] mergeSpills(SpillInfo[] spills) throws IOException {
- final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
+ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);
final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
final boolean fastMergeEnabled =
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index cd253a78c2..39fadd8783 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -21,13 +21,13 @@ import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._
-import org.apache.spark.{Logging, SparkConf, SparkEnv}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage._
-import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
+import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
+import org.apache.spark.{Logging, SparkConf, SparkEnv}
/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
@@ -84,17 +84,8 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
Array.tabulate[DiskBlockObjectWriter](numReducers) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockFile = blockManager.diskBlockManager.getFile(blockId)
- // Because of previous failures, the shuffle file may already exist on this machine.
- // If so, remove it.
- if (blockFile.exists) {
- if (blockFile.delete()) {
- logInfo(s"Removed existing shuffle file $blockFile")
- } else {
- logWarning(s"Failed to remove existing shuffle file $blockFile")
- }
- }
- blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize,
- writeMetrics)
+ val tmp = Utils.tempFileWith(blockFile)
+ blockManager.getDiskWriter(blockId, tmp, serializerInstance, bufferSize, writeMetrics)
}
}
// Creating the file to write to and creating a disk writer both involve interacting with
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 5e4c2b5d0a..05b1eed7f3 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -21,13 +21,12 @@ import java.io._
import com.google.common.io.ByteStreams
-import org.apache.spark.{SparkConf, SparkEnv, Logging}
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.storage._
import org.apache.spark.util.Utils
-
-import IndexShuffleBlockResolver.NOOP_REDUCE_ID
+import org.apache.spark.{SparkEnv, Logging, SparkConf}
/**
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
@@ -40,10 +39,13 @@ import IndexShuffleBlockResolver.NOOP_REDUCE_ID
*/
// Note: Changes to the format in this file should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData().
-private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver
+private[spark] class IndexShuffleBlockResolver(
+ conf: SparkConf,
+ _blockManager: BlockManager = null)
+ extends ShuffleBlockResolver
with Logging {
- private lazy val blockManager = SparkEnv.get.blockManager
+ private lazy val blockManager = Option(_blockManager).getOrElse(SparkEnv.get.blockManager)
private val transportConf = SparkTransportConf.fromSparkConf(conf)
@@ -75,13 +77,68 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
}
/**
+ * Check whether the given index and data files match each other.
+ * If so, return the partition lengths in the data file. Otherwise return null.
+ */
+ private def checkIndexAndDataFile(index: File, data: File, blocks: Int): Array[Long] = {
+ // the index file should have `block + 1` longs as offset.
+ if (index.length() != (blocks + 1) * 8) {
+ return null
+ }
+ val lengths = new Array[Long](blocks)
+ // Read the lengths of blocks
+ val in = try {
+ new DataInputStream(new BufferedInputStream(new FileInputStream(index)))
+ } catch {
+ case e: IOException =>
+ return null
+ }
+ try {
+ // Convert the offsets into lengths of each block
+ var offset = in.readLong()
+ if (offset != 0L) {
+ return null
+ }
+ var i = 0
+ while (i < blocks) {
+ val off = in.readLong()
+ lengths(i) = off - offset
+ offset = off
+ i += 1
+ }
+ } catch {
+ case e: IOException =>
+ return null
+ } finally {
+ in.close()
+ }
+
+ // the size of data file should match with index file
+ if (data.length() == lengths.sum) {
+ lengths
+ } else {
+ null
+ }
+ }
+
+ /**
* Write an index file with the offsets of each block, plus a final offset at the end for the
* end of the output file. This will be used by getBlockData to figure out where each block
* begins and ends.
+ *
+ * It will commit the data and index file as an atomic operation, use the existing ones, or
+ * replace them with new ones.
+ *
+ * Note: the `lengths` will be updated to match the existing index file if use the existing ones.
* */
- def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = {
+ def writeIndexFileAndCommit(
+ shuffleId: Int,
+ mapId: Int,
+ lengths: Array[Long],
+ dataTmp: File): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
- val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
+ val indexTmp = Utils.tempFileWith(indexFile)
+ val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
@@ -93,6 +150,37 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
} {
out.close()
}
+
+ val dataFile = getDataFile(shuffleId, mapId)
+ // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
+ // the following check and rename are atomic.
+ synchronized {
+ val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
+ if (existingLengths != null) {
+ // Another attempt for the same task has already written our map outputs successfully,
+ // so just use the existing partition lengths and delete our temporary map outputs.
+ System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
+ if (dataTmp != null && dataTmp.exists()) {
+ dataTmp.delete()
+ }
+ indexTmp.delete()
+ } else {
+ // This is the first successful attempt in writing the map outputs for this task,
+ // so override any existing index and data files with the ones we wrote.
+ if (indexFile.exists()) {
+ indexFile.delete()
+ }
+ if (dataFile.exists()) {
+ dataFile.delete()
+ }
+ if (!indexTmp.renameTo(indexFile)) {
+ throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
+ }
+ if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
+ throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
+ }
+ }
+ }
}
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 41df70c602..412bf70000 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -17,6 +17,8 @@
package org.apache.spark.shuffle.hash
+import java.io.IOException
+
import org.apache.spark._
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.scheduler.MapStatus
@@ -106,6 +108,29 @@ private[spark] class HashShuffleWriter[K, V](
writer.commitAndClose()
writer.fileSegment().length
}
+ // rename all shuffle files to final paths
+ // Note: there is only one ShuffleBlockResolver in executor
+ shuffleBlockResolver.synchronized {
+ shuffle.writers.zipWithIndex.foreach { case (writer, i) =>
+ val output = blockManager.diskBlockManager.getFile(writer.blockId)
+ if (sizes(i) > 0) {
+ if (output.exists()) {
+ // Use length of existing file and delete our own temporary one
+ sizes(i) = output.length()
+ writer.file.delete()
+ } else {
+ // Commit by renaming our temporary file to something the fetcher expects
+ if (!writer.file.renameTo(output)) {
+ throw new IOException(s"fail to rename ${writer.file} to $output")
+ }
+ }
+ } else {
+ if (output.exists()) {
+ output.delete()
+ }
+ }
+ }
+ }
MapStatus(blockManager.shuffleServerId, sizes)
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 808317b017..f83cf8859e 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -20,8 +20,9 @@ package org.apache.spark.shuffle.sort
import org.apache.spark._
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.scheduler.MapStatus
-import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle}
+import org.apache.spark.shuffle.{BaseShuffleHandle, IndexShuffleBlockResolver, ShuffleWriter}
import org.apache.spark.storage.ShuffleBlockId
+import org.apache.spark.util.Utils
import org.apache.spark.util.collection.ExternalSorter
private[spark] class SortShuffleWriter[K, V, C](
@@ -65,11 +66,11 @@ private[spark] class SortShuffleWriter[K, V, C](
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
- val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
+ val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
+ val tmp = Utils.tempFileWith(output)
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
- val partitionLengths = sorter.writePartitionedFile(blockId, outputFile)
- shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
-
+ val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
+ shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index c374b93766..661c706af3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -21,10 +21,10 @@ import java.io._
import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.concurrent.{ExecutionContext, Await, Future}
import scala.concurrent.duration._
-import scala.util.control.NonFatal
+import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Random
+import scala.util.control.NonFatal
import sun.nio.ch.DirectBuffer
@@ -38,9 +38,8 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.ExternalShuffleClient
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.serializer.{SerializerInstance, Serializer}
+import org.apache.spark.serializer.{Serializer, SerializerInstance}
import org.apache.spark.shuffle.ShuffleManager
-import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.util._
private[spark] sealed trait BlockValues
@@ -660,7 +659,7 @@ private[spark] class BlockManager(
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
new DiskBlockObjectWriter(file, serializerInstance, bufferSize, compressStream,
- syncWrites, writeMetrics)
+ syncWrites, writeMetrics, blockId)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index 80d426fadc..e2dd80f243 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -34,14 +34,15 @@ import org.apache.spark.util.Utils
* reopened again.
*/
private[spark] class DiskBlockObjectWriter(
- file: File,
+ val file: File,
serializerInstance: SerializerInstance,
bufferSize: Int,
compressStream: OutputStream => OutputStream,
syncWrites: Boolean,
// These write metrics concurrently shared with other active DiskBlockObjectWriters who
// are themselves performing writes. All updates must be relative.
- writeMetrics: ShuffleWriteMetrics)
+ writeMetrics: ShuffleWriteMetrics,
+ val blockId: BlockId = null)
extends OutputStream
with Logging {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 316c194ff3..1b3acb8ef7 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -21,8 +21,8 @@ import java.io._
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
-import java.util.{Properties, Locale, Random, UUID}
import java.util.concurrent._
+import java.util.{Locale, Properties, Random, UUID}
import javax.net.ssl.HttpsURLConnection
import scala.collection.JavaConverters._
@@ -30,7 +30,7 @@ import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag
-import scala.util.{Failure, Success, Try}
+import scala.util.Try
import scala.util.control.{ControlThrowable, NonFatal}
import com.google.common.io.{ByteStreams, Files}
@@ -42,7 +42,6 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.log4j.PropertyConfigurator
import org.eclipse.jetty.util.MultiException
import org.json4s._
-
import tachyon.TachyonURI
import tachyon.client.{TachyonFS, TachyonFile}
@@ -2169,6 +2168,13 @@ private[spark] object Utils extends Logging {
val resource = createResource
try f.apply(resource) finally resource.close()
}
+
+ /**
+ * Returns a path of temporary file which is in the same directory with `path`.
+ */
+ def tempFileWith(path: File): File = {
+ new File(path.getAbsolutePath + "." + UUID.randomUUID())
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index bd6844d045..2440139ac9 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -638,7 +638,6 @@ private[spark] class ExternalSorter[K, V, C](
* called by the SortShuffleWriter.
*
* @param blockId block ID to write to. The index file will be blockId.name + ".index".
- * @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
*/
def writePartitionedFile(