aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala (renamed from core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala)14
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala2
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala2
12 files changed, 51 insertions, 51 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
index d0178dfde6..5be3ed771e 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
@@ -67,7 +67,7 @@ private[spark] trait ShuffleWriterGroup {
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getHashBasedShuffleBlockData().
private[spark]
class FileShuffleBlockManager(conf: SparkConf)
- extends ShuffleBlockManager with Logging {
+ extends ShuffleBlockResolver with Logging {
private val transportConf = SparkTransportConf.fromSparkConf(conf)
@@ -175,11 +175,6 @@ class FileShuffleBlockManager(conf: SparkConf)
}
}
- override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = {
- val segment = getBlockData(blockId)
- Some(segment.nioByteBuffer())
- }
-
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
if (consolidateShuffleFiles) {
// Search all file groups associated with this shuffle.
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
index 87fd161e06..50edb5a34e 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
@@ -27,6 +27,8 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.storage._
+import IndexShuffleBlockManager.NOOP_REDUCE_ID
+
/**
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
* Data of shuffle blocks from the same map task are stored in a single consolidated data file.
@@ -39,25 +41,18 @@ import org.apache.spark.storage._
// Note: Changes to the format in this file should be kept in sync with
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
private[spark]
-class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
+class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
private lazy val blockManager = SparkEnv.get.blockManager
private val transportConf = SparkTransportConf.fromSparkConf(conf)
- /**
- * Mapping to a single shuffleBlockId with reduce ID 0.
- * */
- def consolidateId(shuffleId: Int, mapId: Int): ShuffleBlockId = {
- ShuffleBlockId(shuffleId, mapId, 0)
- }
-
def getDataFile(shuffleId: Int, mapId: Int): File = {
- blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0))
+ blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}
private def getIndexFile(shuffleId: Int, mapId: Int): File = {
- blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0))
+ blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}
/**
@@ -97,10 +92,6 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
}
}
- override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = {
- Some(getBlockData(blockId).nioByteBuffer())
- }
-
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
// The block is actually going to be a range of a single map output file for this map, so
// find out the consolidated file, then the offset within that from our index
@@ -123,3 +114,11 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
override def stop(): Unit = {}
}
+
+private[spark] object IndexShuffleBlockManager {
+ // No-op reduce ID used in interactions with disk store and BlockObjectWriter.
+ // The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
+ // shuffle outputs for several reduces are glommed into a single file.
+ // TODO: Avoid this entirely by having the DiskBlockObjectWriter not require a BlockId.
+ val NOOP_REDUCE_ID = 0
+}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
index b521f0c7fc..4342b0d598 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
@@ -22,15 +22,19 @@ import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.storage.ShuffleBlockId
private[spark]
-trait ShuffleBlockManager {
+/**
+ * Implementers of this trait understand how to retrieve block data for a logical shuffle block
+ * identifier (i.e. map, reduce, and shuffle). Implementations may use files or file segments to
+ * encapsulate shuffle data. This is used by the BlockStore to abstract over different shuffle
+ * implementations when shuffle data is retrieved.
+ */
+trait ShuffleBlockResolver {
type ShuffleId = Int
/**
- * Get shuffle block data managed by the local ShuffleBlockManager.
- * @return Some(ByteBuffer) if block found, otherwise None.
+ * Retrieve the data for the specified block. If the data for that block is not available,
+ * throws an unspecified exception.
*/
- def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer]
-
def getBlockData(blockId: ShuffleBlockId): ManagedBuffer
def stop(): Unit
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
index a44a8e1249..978366d1a1 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
@@ -55,7 +55,10 @@ private[spark] trait ShuffleManager {
*/
def unregisterShuffle(shuffleId: Int): Boolean
- def shuffleBlockManager: ShuffleBlockManager
+ /**
+ * Return a resolver capable of retrieving shuffle block data based on block coordinates.
+ */
+ def shuffleBlockResolver: ShuffleBlockResolver
/** Shut down this ShuffleManager. */
def stop(): Unit
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
index b934480cfb..f6e6fe5def 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
@@ -23,7 +23,7 @@ import org.apache.spark.scheduler.MapStatus
* Obtained inside a map task to write out records to the shuffle system.
*/
private[spark] trait ShuffleWriter[K, V] {
- /** Write a bunch of records to this task's output */
+ /** Write a sequence of records to this task's output */
def write(records: Iterator[_ <: Product2[K, V]]): Unit
/** Close this writer, passing along whether the map completed */
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
index 62e0629b34..2a7df8dd5b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
@@ -53,20 +53,20 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager
override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
: ShuffleWriter[K, V] = {
new HashShuffleWriter(
- shuffleBlockManager, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
+ shuffleBlockResolver, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
}
/** Remove a shuffle's metadata from the ShuffleManager. */
override def unregisterShuffle(shuffleId: Int): Boolean = {
- shuffleBlockManager.removeShuffle(shuffleId)
+ shuffleBlockResolver.removeShuffle(shuffleId)
}
- override def shuffleBlockManager: FileShuffleBlockManager = {
+ override def shuffleBlockResolver: FileShuffleBlockManager = {
fileShuffleBlockManager
}
/** Shut down this ShuffleManager. */
override def stop(): Unit = {
- shuffleBlockManager.stop()
+ shuffleBlockResolver.stop()
}
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index bda30a56d8..0497036192 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -58,7 +58,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]]
shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffleId, baseShuffleHandle.numMaps)
new SortShuffleWriter(
- shuffleBlockManager, baseShuffleHandle, mapId, context)
+ shuffleBlockResolver, baseShuffleHandle, mapId, context)
}
/** Remove a shuffle's metadata from the ShuffleManager. */
@@ -66,18 +66,19 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
if (shuffleMapNumber.containsKey(shuffleId)) {
val numMaps = shuffleMapNumber.remove(shuffleId)
(0 until numMaps).map{ mapId =>
- shuffleBlockManager.removeDataByMap(shuffleId, mapId)
+ shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
}
}
true
}
- override def shuffleBlockManager: IndexShuffleBlockManager = {
+ override def shuffleBlockResolver: IndexShuffleBlockManager = {
indexShuffleBlockManager
}
/** Shut down this ShuffleManager. */
override def stop(): Unit = {
- shuffleBlockManager.stop()
+ shuffleBlockResolver.stop()
}
}
+
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 55ea0f17b1..a066435df6 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -58,8 +58,7 @@ private[spark] class SortShuffleWriter[K, V, C](
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
- sorter = new ExternalSorter[K, V, V](
- None, Some(dep.partitioner), None, dep.serializer)
+ sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)
sorter.insertAll(records)
}
@@ -67,7 +66,7 @@ private[spark] class SortShuffleWriter[K, V, C](
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
- val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
+ val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockManager.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
@@ -100,3 +99,4 @@ private[spark] class SortShuffleWriter[K, V, C](
}
}
}
+
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1dff09a75d..fc31296f4d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -301,7 +301,7 @@ private[spark] class BlockManager(
*/
override def getBlockData(blockId: BlockId): ManagedBuffer = {
if (blockId.isShuffle) {
- shuffleManager.shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
+ shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
} else {
val blockBytesOpt = doGetLocal(blockId, asBlockResult = false)
.asInstanceOf[Option[ByteBuffer]]
@@ -439,14 +439,10 @@ private[spark] class BlockManager(
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
- val shuffleBlockManager = shuffleManager.shuffleBlockManager
- shuffleBlockManager.getBytes(blockId.asInstanceOf[ShuffleBlockId]) match {
- case Some(bytes) =>
- Some(bytes)
- case None =>
- throw new BlockException(
- blockId, s"Block $blockId not found on disk, though it should be")
- }
+ val shuffleBlockManager = shuffleManager.shuffleBlockResolver
+ // TODO: This should gracefully handle case where local block is not available. Currently
+ // downstream code will throw an exception.
+ Option(shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
} else {
doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index b962c101c9..7bd3c7852a 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -664,6 +664,8 @@ private[spark] class ExternalSorter[K, V, C](
}
/**
+ * Exposed for testing purposes.
+ *
* Return an iterator over all the data written to this object, grouped by partition and
* aggregated by the requested aggregator. For each partition we then have an iterator over its
* contents, and these are expected to be accessed in order (you can't "skip ahead" to one
@@ -673,7 +675,7 @@ private[spark] class ExternalSorter[K, V, C](
* For now, we just merge all the spilled files in once pass, but this can be modified to
* support hierarchical merging.
*/
- def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
+ def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
val usingMap = aggregator.isDefined
val collection: SizeTrackingPairCollection[(Int, K), C] = if (usingMap) map else buffer
if (spills.isEmpty && partitionWriters == null) {
@@ -781,7 +783,7 @@ private[spark] class ExternalSorter[K, V, C](
/**
* Read a partition file back as an iterator (used in our iterator method)
*/
- def readPartitionFile(writer: BlockObjectWriter): Iterator[Product2[K, C]] = {
+ private def readPartitionFile(writer: BlockObjectWriter): Iterator[Product2[K, C]] = {
if (writer.isOpen) {
writer.commitAndClose()
}
diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
index 6790388f96..b834dc0e73 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
@@ -54,7 +54,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local", "test", conf)
val shuffleBlockManager =
- SparkEnv.get.shuffleManager.shuffleBlockManager.asInstanceOf[FileShuffleBlockManager]
+ SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockManager]
val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf),
new ShuffleWriteMetrics)
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index 15ee95070a..6b666a0384 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -59,7 +59,7 @@ object StoragePerfTester {
val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager]
def writeOutputBytes(mapId: Int, total: AtomicLong) = {
- val shuffle = hashShuffleManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
+ val shuffle = hashShuffleManager.shuffleBlockResolver.forMapTask(1, mapId, numOutputSplits,
new KryoSerializer(sc.conf), new ShuffleWriteMetrics())
val writers = shuffle.writers
for (i <- 1 to recordsPerMap) {