diff options
author | Aaron Davidson <aaron@databricks.com> | 2013-10-20 02:30:23 -0700 |
---|---|---|
committer | Aaron Davidson <aaron@databricks.com> | 2013-10-20 02:58:26 -0700 |
commit | 136b9b3a3ed358bc04b28e8d62657d56d55c2c3e (patch) | |
tree | 0d8451be862e2485fcc5d8013a34307e8192ff70 /core | |
parent | 861dc409d7209c3a8d4518708016d1b843f5c52b (diff) | |
download | spark-136b9b3a3ed358bc04b28e8d62657d56d55c2c3e.tar.gz spark-136b9b3a3ed358bc04b28e8d62657d56d55c2c3e.tar.bz2 spark-136b9b3a3ed358bc04b28e8d62657d56d55c2c3e.zip |
Basic shuffle file consolidation
The Spark shuffle phase can produce a large number of files, as one file is created
per mapper per reducer. For large or repeated jobs, this often produces millions of
shuffle files, which sees extremely degredaded performance from the OS file system.
This patch seeks to reduce that burden by combining multipe shuffle files into one.
This PR draws upon the work of Jason Dai in https://github.com/mesos/spark/pull/669.
However, it simplifies the design in order to get the majority of the gain with less
overall intellectual and code burden. The vast majority of code in this pull request
is a refactor to allow the insertion of a clean layer of indirection between logical
block ids and physical files. This, I feel, provides some design clarity in addition
to enabling shuffle file consolidation.
The main goal is to produce one shuffle file per reducer per active mapper thread.
This allows us to isolate the mappers (simplifying the failure modes), while still
allowing us to reduce the number of mappers tremendously for large tasks. In order
to accomplish this, we simply create a new set of shuffle files for every parallel
task, and return the files to a pool which will be given out to the next run task.
Diffstat (limited to 'core')
8 files changed, 57 insertions, 14 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java index ab790b7850..172c6e4b1c 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java @@ -51,7 +51,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> { ctx.flush(); return; } - long length = file.length(); + long length = fileSegment.length(); if (length > Integer.MAX_VALUE || length <= 0) { ctx.write(new FileHeader(0, blockId).buffer()); ctx.flush(); diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index cae983ed4c..7601ffe416 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -25,6 +25,7 @@ class TaskContext( val stageId: Int, val partitionId: Int, val attemptId: Long, + val executorId: String, val runningLocally: Boolean = false, @volatile var interrupted: Boolean = false, private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty() diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 032eb04f43..eb12c26d24 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -206,7 +206,7 @@ private[spark] class Executor( // Run the actual task and measure its runtime. taskStart = System.currentTimeMillis() - val value = task.run(taskId.toInt) + val value = task.run(taskId.toInt, executorId) val taskFinish = System.currentTimeMillis() // If the task has been killed, let's fail it. diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index ed1b36d18e..29c6108252 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -152,7 +152,8 @@ private[spark] class ShuffleMapTask( try { // Obtain all the block writers for shuffle blocks. val ser = SparkEnv.get.serializerManager.get(dep.serializerClass) - shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser) + shuffle = blockManager.shuffleBlockManager.forShuffle( + dep.shuffleId, context.executorId, numOutputSplits, ser) buckets = shuffle.acquireWriters(partitionId) // Write the map output to its associated buckets. diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 1fe0d0e4e2..64fe5b196a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -45,8 +45,8 @@ import org.apache.spark.util.ByteBufferInputStream */ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable { - def run(attemptId: Long): T = { - context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false) + final def run(attemptId: Long, executorId: String): T = { + context = new TaskContext(stageId, partitionId, attemptId, executorId, runningLocally = false) if (_killed) { kill() } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2f96590c57..1f173c7722 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -578,6 +578,7 @@ private[spark] class BlockManager( val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true) val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream) writer.registerCloseEventHandler(() => { + diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment()) val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false) blockInfo.put(blockId, myInfo) myInfo.markReady(writer.fileSegment().length) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 05a14c9094..6208856e56 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -17,12 +17,13 @@ package org.apache.spark.storage -import org.apache.spark.serializer.Serializer +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicInteger +import org.apache.spark.serializer.Serializer private[spark] -class ShuffleWriterGroup(val id: Int, val writers: Array[BlockObjectWriter]) - +class ShuffleWriterGroup(val id: Int, val fileId: Int, val writers: Array[BlockObjectWriter]) private[spark] trait ShuffleBlocks { @@ -30,24 +31,63 @@ trait ShuffleBlocks { def releaseWriters(group: ShuffleWriterGroup) } +/** + * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one writer + * per reducer. + * + * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle + * Blocks are aggregated into the same file. There is one "combined shuffle file" per reducer + * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files, + * it releases them for another task. + * Regarding the implementation of this feature, shuffle files are identified by a 4-tuple: + * - shuffleId: The unique id given to the entire shuffle stage. + * - executorId: The id of the executor running the task. Required in order to ensure that + * multiple executors running on the same node do not collide. + * - bucketId: The id of the output partition (i.e., reducer id) + * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a + * time owns a particular fileId, and this id is returned to a pool when the task finishes. + */ private[spark] class ShuffleBlockManager(blockManager: BlockManager) { + /** Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. */ + val consolidateShuffleFiles = + System.getProperty("spark.storage.consolidateShuffleFiles", "true").toBoolean + + var nextFileId = new AtomicInteger(0) + val unusedFileIds = new ConcurrentLinkedQueue[java.lang.Integer]() - def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = { + def forShuffle(shuffleId: Int, executorId: String, numBuckets: Int, serializer: Serializer) = { new ShuffleBlocks { // Get a group of writers for a map task. override def acquireWriters(mapId: Int): ShuffleWriterGroup = { val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 + val fileId = getUnusedFileId() val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - blockManager.getDiskWriter(blockId, blockId.name, serializer, bufferSize) + val filename = physicalFileName(shuffleId, executorId, bucketId, fileId) + blockManager.getDiskWriter(blockId, filename, serializer, bufferSize) } - new ShuffleWriterGroup(mapId, writers) + new ShuffleWriterGroup(mapId, fileId, writers) } - override def releaseWriters(group: ShuffleWriterGroup) = { - // Nothing really to release here. + override def releaseWriters(group: ShuffleWriterGroup) { + recycleFileId(group.fileId) } } } + + private def getUnusedFileId(): Int = { + val fileId = unusedFileIds.poll() + if (fileId == null) nextFileId.getAndIncrement() + else fileId + } + + private def recycleFileId(fileId: Int) { + if (!consolidateShuffleFiles) { return } // ensures we always generate new file id + unusedFileIds.add(fileId) + } + + private def physicalFileName(shuffleId: Int, executorId: String, bucketId: Int, fileId: Int) = { + "merged_shuffle_%d_%s_%d_%d".format(shuffleId, executorId, bucketId, fileId) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index e31a116a75..668cd5d489 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -40,7 +40,7 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte val func = (c: TaskContext, i: Iterator[String]) => i.next val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0) intercept[RuntimeException] { - task.run(0) + task.run(0, "test") } assert(completed === true) } |