diff options
Diffstat (limited to 'core')
8 files changed, 57 insertions, 14 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java index ab790b7850..172c6e4b1c 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java @@ -51,7 +51,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> { ctx.flush(); return; } - long length = file.length(); + long length = fileSegment.length(); if (length > Integer.MAX_VALUE || length <= 0) { ctx.write(new FileHeader(0, blockId).buffer()); ctx.flush(); diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index cae983ed4c..7601ffe416 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -25,6 +25,7 @@ class TaskContext( val stageId: Int, val partitionId: Int, val attemptId: Long, + val executorId: String, val runningLocally: Boolean = false, @volatile var interrupted: Boolean = false, private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty() diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 032eb04f43..eb12c26d24 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -206,7 +206,7 @@ private[spark] class Executor( // Run the actual task and measure its runtime. taskStart = System.currentTimeMillis() - val value = task.run(taskId.toInt) + val value = task.run(taskId.toInt, executorId) val taskFinish = System.currentTimeMillis() // If the task has been killed, let's fail it. diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index ed1b36d18e..29c6108252 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -152,7 +152,8 @@ private[spark] class ShuffleMapTask( try { // Obtain all the block writers for shuffle blocks. val ser = SparkEnv.get.serializerManager.get(dep.serializerClass) - shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser) + shuffle = blockManager.shuffleBlockManager.forShuffle( + dep.shuffleId, context.executorId, numOutputSplits, ser) buckets = shuffle.acquireWriters(partitionId) // Write the map output to its associated buckets. diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 1fe0d0e4e2..64fe5b196a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -45,8 +45,8 @@ import org.apache.spark.util.ByteBufferInputStream */ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable { - def run(attemptId: Long): T = { - context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false) + final def run(attemptId: Long, executorId: String): T = { + context = new TaskContext(stageId, partitionId, attemptId, executorId, runningLocally = false) if (_killed) { kill() } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2f96590c57..1f173c7722 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -578,6 +578,7 @@ private[spark] class BlockManager( val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true) val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream) writer.registerCloseEventHandler(() => { + diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment()) val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false) blockInfo.put(blockId, myInfo) myInfo.markReady(writer.fileSegment().length) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 05a14c9094..6208856e56 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -17,12 +17,13 @@ package org.apache.spark.storage -import org.apache.spark.serializer.Serializer +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicInteger +import org.apache.spark.serializer.Serializer private[spark] -class ShuffleWriterGroup(val id: Int, val writers: Array[BlockObjectWriter]) - +class ShuffleWriterGroup(val id: Int, val fileId: Int, val writers: Array[BlockObjectWriter]) private[spark] trait ShuffleBlocks { @@ -30,24 +31,63 @@ trait ShuffleBlocks { def releaseWriters(group: ShuffleWriterGroup) } +/** + * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one writer + * per reducer. + * + * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle + * Blocks are aggregated into the same file. There is one "combined shuffle file" per reducer + * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files, + * it releases them for another task. + * Regarding the implementation of this feature, shuffle files are identified by a 4-tuple: + * - shuffleId: The unique id given to the entire shuffle stage. + * - executorId: The id of the executor running the task. Required in order to ensure that + * multiple executors running on the same node do not collide. + * - bucketId: The id of the output partition (i.e., reducer id) + * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a + * time owns a particular fileId, and this id is returned to a pool when the task finishes. + */ private[spark] class ShuffleBlockManager(blockManager: BlockManager) { + /** Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. */ + val consolidateShuffleFiles = + System.getProperty("spark.storage.consolidateShuffleFiles", "true").toBoolean + + var nextFileId = new AtomicInteger(0) + val unusedFileIds = new ConcurrentLinkedQueue[java.lang.Integer]() - def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = { + def forShuffle(shuffleId: Int, executorId: String, numBuckets: Int, serializer: Serializer) = { new ShuffleBlocks { // Get a group of writers for a map task. override def acquireWriters(mapId: Int): ShuffleWriterGroup = { val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 + val fileId = getUnusedFileId() val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - blockManager.getDiskWriter(blockId, blockId.name, serializer, bufferSize) + val filename = physicalFileName(shuffleId, executorId, bucketId, fileId) + blockManager.getDiskWriter(blockId, filename, serializer, bufferSize) } - new ShuffleWriterGroup(mapId, writers) + new ShuffleWriterGroup(mapId, fileId, writers) } - override def releaseWriters(group: ShuffleWriterGroup) = { - // Nothing really to release here. + override def releaseWriters(group: ShuffleWriterGroup) { + recycleFileId(group.fileId) } } } + + private def getUnusedFileId(): Int = { + val fileId = unusedFileIds.poll() + if (fileId == null) nextFileId.getAndIncrement() + else fileId + } + + private def recycleFileId(fileId: Int) { + if (!consolidateShuffleFiles) { return } // ensures we always generate new file id + unusedFileIds.add(fileId) + } + + private def physicalFileName(shuffleId: Int, executorId: String, bucketId: Int, fileId: Int) = { + "merged_shuffle_%d_%s_%d_%d".format(shuffleId, executorId, bucketId, fileId) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index e31a116a75..668cd5d489 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -40,7 +40,7 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte val func = (c: TaskContext, i: Iterator[String]) => i.next val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0) intercept[RuntimeException] { - task.run(0) + task.run(0, "test") } assert(completed === true) } |