aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-10-20 02:30:23 -0700
committerAaron Davidson <aaron@databricks.com>2013-10-20 02:58:26 -0700
commit136b9b3a3ed358bc04b28e8d62657d56d55c2c3e (patch)
tree0d8451be862e2485fcc5d8013a34307e8192ff70 /core
parent861dc409d7209c3a8d4518708016d1b843f5c52b (diff)
downloadspark-136b9b3a3ed358bc04b28e8d62657d56d55c2c3e.tar.gz
spark-136b9b3a3ed358bc04b28e8d62657d56d55c2c3e.tar.bz2
spark-136b9b3a3ed358bc04b28e8d62657d56d55c2c3e.zip
Basic shuffle file consolidation
The Spark shuffle phase can produce a large number of files, as one file is created per mapper per reducer. For large or repeated jobs, this often produces millions of shuffle files, which sees extremely degredaded performance from the OS file system. This patch seeks to reduce that burden by combining multipe shuffle files into one. This PR draws upon the work of Jason Dai in https://github.com/mesos/spark/pull/669. However, it simplifies the design in order to get the majority of the gain with less overall intellectual and code burden. The vast majority of code in this pull request is a refactor to allow the insertion of a clean layer of indirection between logical block ids and physical files. This, I feel, provides some design clarity in addition to enabling shuffle file consolidation. The main goal is to produce one shuffle file per reducer per active mapper thread. This allows us to isolate the mappers (simplifying the failure modes), while still allowing us to reduce the number of mappers tremendously for large tasks. In order to accomplish this, we simply create a new set of shuffle files for every parallel task, and return the files to a pool which will be given out to the next run task.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java2
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala56
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala2
8 files changed, 57 insertions, 14 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
index ab790b7850..172c6e4b1c 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
@@ -51,7 +51,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
ctx.flush();
return;
}
- long length = file.length();
+ long length = fileSegment.length();
if (length > Integer.MAX_VALUE || length <= 0) {
ctx.write(new FileHeader(0, blockId).buffer());
ctx.flush();
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index cae983ed4c..7601ffe416 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -25,6 +25,7 @@ class TaskContext(
val stageId: Int,
val partitionId: Int,
val attemptId: Long,
+ val executorId: String,
val runningLocally: Boolean = false,
@volatile var interrupted: Boolean = false,
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty()
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 032eb04f43..eb12c26d24 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -206,7 +206,7 @@ private[spark] class Executor(
// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
- val value = task.run(taskId.toInt)
+ val value = task.run(taskId.toInt, executorId)
val taskFinish = System.currentTimeMillis()
// If the task has been killed, let's fail it.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index ed1b36d18e..29c6108252 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -152,7 +152,8 @@ private[spark] class ShuffleMapTask(
try {
// Obtain all the block writers for shuffle blocks.
val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
- shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)
+ shuffle = blockManager.shuffleBlockManager.forShuffle(
+ dep.shuffleId, context.executorId, numOutputSplits, ser)
buckets = shuffle.acquireWriters(partitionId)
// Write the map output to its associated buckets.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 1fe0d0e4e2..64fe5b196a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -45,8 +45,8 @@ import org.apache.spark.util.ByteBufferInputStream
*/
private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {
- def run(attemptId: Long): T = {
- context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)
+ final def run(attemptId: Long, executorId: String): T = {
+ context = new TaskContext(stageId, partitionId, attemptId, executorId, runningLocally = false)
if (_killed) {
kill()
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 2f96590c57..1f173c7722 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -578,6 +578,7 @@ private[spark] class BlockManager(
val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true)
val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
writer.registerCloseEventHandler(() => {
+ diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment())
val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
blockInfo.put(blockId, myInfo)
myInfo.markReady(writer.fileSegment().length)
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 05a14c9094..6208856e56 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -17,12 +17,13 @@
package org.apache.spark.storage
-import org.apache.spark.serializer.Serializer
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicInteger
+import org.apache.spark.serializer.Serializer
private[spark]
-class ShuffleWriterGroup(val id: Int, val writers: Array[BlockObjectWriter])
-
+class ShuffleWriterGroup(val id: Int, val fileId: Int, val writers: Array[BlockObjectWriter])
private[spark]
trait ShuffleBlocks {
@@ -30,24 +31,63 @@ trait ShuffleBlocks {
def releaseWriters(group: ShuffleWriterGroup)
}
+/**
+ * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one writer
+ * per reducer.
+ *
+ * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
+ * Blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
+ * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files,
+ * it releases them for another task.
+ * Regarding the implementation of this feature, shuffle files are identified by a 4-tuple:
+ * - shuffleId: The unique id given to the entire shuffle stage.
+ * - executorId: The id of the executor running the task. Required in order to ensure that
+ * multiple executors running on the same node do not collide.
+ * - bucketId: The id of the output partition (i.e., reducer id)
+ * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
+ * time owns a particular fileId, and this id is returned to a pool when the task finishes.
+ */
private[spark]
class ShuffleBlockManager(blockManager: BlockManager) {
+ /** Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. */
+ val consolidateShuffleFiles =
+ System.getProperty("spark.storage.consolidateShuffleFiles", "true").toBoolean
+
+ var nextFileId = new AtomicInteger(0)
+ val unusedFileIds = new ConcurrentLinkedQueue[java.lang.Integer]()
- def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = {
+ def forShuffle(shuffleId: Int, executorId: String, numBuckets: Int, serializer: Serializer) = {
new ShuffleBlocks {
// Get a group of writers for a map task.
override def acquireWriters(mapId: Int): ShuffleWriterGroup = {
val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
+ val fileId = getUnusedFileId()
val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
- blockManager.getDiskWriter(blockId, blockId.name, serializer, bufferSize)
+ val filename = physicalFileName(shuffleId, executorId, bucketId, fileId)
+ blockManager.getDiskWriter(blockId, filename, serializer, bufferSize)
}
- new ShuffleWriterGroup(mapId, writers)
+ new ShuffleWriterGroup(mapId, fileId, writers)
}
- override def releaseWriters(group: ShuffleWriterGroup) = {
- // Nothing really to release here.
+ override def releaseWriters(group: ShuffleWriterGroup) {
+ recycleFileId(group.fileId)
}
}
}
+
+ private def getUnusedFileId(): Int = {
+ val fileId = unusedFileIds.poll()
+ if (fileId == null) nextFileId.getAndIncrement()
+ else fileId
+ }
+
+ private def recycleFileId(fileId: Int) {
+ if (!consolidateShuffleFiles) { return } // ensures we always generate new file id
+ unusedFileIds.add(fileId)
+ }
+
+ private def physicalFileName(shuffleId: Int, executorId: String, bucketId: Int, fileId: Int) = {
+ "merged_shuffle_%d_%s_%d_%d".format(shuffleId, executorId, bucketId, fileId)
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index e31a116a75..668cd5d489 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -40,7 +40,7 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte
val func = (c: TaskContext, i: Iterator[String]) => i.next
val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0)
intercept[RuntimeException] {
- task.run(0)
+ task.run(0, "test")
}
assert(completed === true)
}