aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java2
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala56
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala2
8 files changed, 57 insertions, 14 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
index ab790b7850..172c6e4b1c 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
@@ -51,7 +51,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
ctx.flush();
return;
}
- long length = file.length();
+ long length = fileSegment.length();
if (length > Integer.MAX_VALUE || length <= 0) {
ctx.write(new FileHeader(0, blockId).buffer());
ctx.flush();
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index cae983ed4c..7601ffe416 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -25,6 +25,7 @@ class TaskContext(
val stageId: Int,
val partitionId: Int,
val attemptId: Long,
+ val executorId: String,
val runningLocally: Boolean = false,
@volatile var interrupted: Boolean = false,
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty()
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 032eb04f43..eb12c26d24 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -206,7 +206,7 @@ private[spark] class Executor(
// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
- val value = task.run(taskId.toInt)
+ val value = task.run(taskId.toInt, executorId)
val taskFinish = System.currentTimeMillis()
// If the task has been killed, let's fail it.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index ed1b36d18e..29c6108252 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -152,7 +152,8 @@ private[spark] class ShuffleMapTask(
try {
// Obtain all the block writers for shuffle blocks.
val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
- shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)
+ shuffle = blockManager.shuffleBlockManager.forShuffle(
+ dep.shuffleId, context.executorId, numOutputSplits, ser)
buckets = shuffle.acquireWriters(partitionId)
// Write the map output to its associated buckets.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 1fe0d0e4e2..64fe5b196a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -45,8 +45,8 @@ import org.apache.spark.util.ByteBufferInputStream
*/
private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {
- def run(attemptId: Long): T = {
- context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)
+ final def run(attemptId: Long, executorId: String): T = {
+ context = new TaskContext(stageId, partitionId, attemptId, executorId, runningLocally = false)
if (_killed) {
kill()
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 2f96590c57..1f173c7722 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -578,6 +578,7 @@ private[spark] class BlockManager(
val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true)
val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
writer.registerCloseEventHandler(() => {
+ diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment())
val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
blockInfo.put(blockId, myInfo)
myInfo.markReady(writer.fileSegment().length)
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 05a14c9094..6208856e56 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -17,12 +17,13 @@
package org.apache.spark.storage
-import org.apache.spark.serializer.Serializer
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicInteger
+import org.apache.spark.serializer.Serializer
private[spark]
-class ShuffleWriterGroup(val id: Int, val writers: Array[BlockObjectWriter])
-
+class ShuffleWriterGroup(val id: Int, val fileId: Int, val writers: Array[BlockObjectWriter])
private[spark]
trait ShuffleBlocks {
@@ -30,24 +31,63 @@ trait ShuffleBlocks {
def releaseWriters(group: ShuffleWriterGroup)
}
+/**
+ * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one writer
+ * per reducer.
+ *
+ * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
+ * Blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
+ * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files,
+ * it releases them for another task.
+ * Regarding the implementation of this feature, shuffle files are identified by a 4-tuple:
+ * - shuffleId: The unique id given to the entire shuffle stage.
+ * - executorId: The id of the executor running the task. Required in order to ensure that
+ * multiple executors running on the same node do not collide.
+ * - bucketId: The id of the output partition (i.e., reducer id)
+ * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
+ * time owns a particular fileId, and this id is returned to a pool when the task finishes.
+ */
private[spark]
class ShuffleBlockManager(blockManager: BlockManager) {
+ /** Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. */
+ val consolidateShuffleFiles =
+ System.getProperty("spark.storage.consolidateShuffleFiles", "true").toBoolean
+
+ var nextFileId = new AtomicInteger(0)
+ val unusedFileIds = new ConcurrentLinkedQueue[java.lang.Integer]()
- def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = {
+ def forShuffle(shuffleId: Int, executorId: String, numBuckets: Int, serializer: Serializer) = {
new ShuffleBlocks {
// Get a group of writers for a map task.
override def acquireWriters(mapId: Int): ShuffleWriterGroup = {
val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
+ val fileId = getUnusedFileId()
val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
- blockManager.getDiskWriter(blockId, blockId.name, serializer, bufferSize)
+ val filename = physicalFileName(shuffleId, executorId, bucketId, fileId)
+ blockManager.getDiskWriter(blockId, filename, serializer, bufferSize)
}
- new ShuffleWriterGroup(mapId, writers)
+ new ShuffleWriterGroup(mapId, fileId, writers)
}
- override def releaseWriters(group: ShuffleWriterGroup) = {
- // Nothing really to release here.
+ override def releaseWriters(group: ShuffleWriterGroup) {
+ recycleFileId(group.fileId)
}
}
}
+
+ private def getUnusedFileId(): Int = {
+ val fileId = unusedFileIds.poll()
+ if (fileId == null) nextFileId.getAndIncrement()
+ else fileId
+ }
+
+ private def recycleFileId(fileId: Int) {
+ if (!consolidateShuffleFiles) { return } // ensures we always generate new file id
+ unusedFileIds.add(fileId)
+ }
+
+ private def physicalFileName(shuffleId: Int, executorId: String, bucketId: Int, fileId: Int) = {
+ "merged_shuffle_%d_%s_%d_%d".format(shuffleId, executorId, bucketId, fileId)
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index e31a116a75..668cd5d489 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -40,7 +40,7 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte
val func = (c: TaskContext, i: Iterator[String]) => i.next
val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0)
intercept[RuntimeException] {
- task.run(0)
+ task.run(0, "test")
}
assert(completed === true)
}