aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <patrick@databricks.com>2015-04-01 23:42:09 -0700
committerPatrick Wendell <patrick@databricks.com>2015-04-01 23:42:09 -0700
commit6562787b963204763a33e1c4e9d192db913af1fc (patch)
treebea8f2a26853552afed8a5397fac412f634e4d9e /core
parent40df5d49bb5c80cd3a1e2d7c853c0b5ea901adf3 (diff)
downloadspark-6562787b963204763a33e1c4e9d192db913af1fc.tar.gz
spark-6562787b963204763a33e1c4e9d192db913af1fc.tar.bz2
spark-6562787b963204763a33e1c4e9d192db913af1fc.zip
[SPARK-6627] Some clean-up in shuffle code.
Before diving into review #4450 I did a look through the existing shuffle code to learn how it works. Unfortunately, there are some very confusing things in this code. This patch makes a few small changes to simplify things. It is not easily to concisely describe the changes because of how convoluted the issues were, but they are fairly small logically: 1. There is a trait named `ShuffleBlockManager` that only deals with one logical function which is retrieving shuffle block data given shuffle block coordinates. This trait has two implementors FileShuffleBlockManager and IndexShuffleBlockManager. Confusingly the vast majority of those implementations have nothing to do with this particular functionality. So I've renamed the trait to ShuffleBlockResolver and documented it. 2. The aforementioned trait had two almost identical methods, for no good reason. I removed one method (getBytes) and modified callers to use the other one. I think the behavior is preserved in all cases. 3. The sort shuffle code uses an identifier "0" in the reduce slot of a BlockID as a placeholder. I made it into a constant since it needs to be consistent across multiple places. I think for (3) there is actually a better solution that would avoid the need to do this type of workaround/hack in the first place, but it's more complex so I'm punting it for now. Author: Patrick Wendell <patrick@databricks.com> Closes #5286 from pwendell/cleanup and squashes the following commits: c71fbc7 [Patrick Wendell] Open interface back up for testing f36edd5 [Patrick Wendell] Code review feedback d1c0494 [Patrick Wendell] Style fix a406079 [Patrick Wendell] [HOTFIX] Some clean-up in shuffle code.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala (renamed from core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala)14
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala2
11 files changed, 50 insertions, 50 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
index d0178dfde6..5be3ed771e 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
@@ -67,7 +67,7 @@ private[spark] trait ShuffleWriterGroup {
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getHashBasedShuffleBlockData().
private[spark]
class FileShuffleBlockManager(conf: SparkConf)
- extends ShuffleBlockManager with Logging {
+ extends ShuffleBlockResolver with Logging {
private val transportConf = SparkTransportConf.fromSparkConf(conf)
@@ -175,11 +175,6 @@ class FileShuffleBlockManager(conf: SparkConf)
}
}
- override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = {
- val segment = getBlockData(blockId)
- Some(segment.nioByteBuffer())
- }
-
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
if (consolidateShuffleFiles) {
// Search all file groups associated with this shuffle.
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
index 87fd161e06..50edb5a34e 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
@@ -27,6 +27,8 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.storage._
+import IndexShuffleBlockManager.NOOP_REDUCE_ID
+
/**
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
* Data of shuffle blocks from the same map task are stored in a single consolidated data file.
@@ -39,25 +41,18 @@ import org.apache.spark.storage._
// Note: Changes to the format in this file should be kept in sync with
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
private[spark]
-class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
+class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
private lazy val blockManager = SparkEnv.get.blockManager
private val transportConf = SparkTransportConf.fromSparkConf(conf)
- /**
- * Mapping to a single shuffleBlockId with reduce ID 0.
- * */
- def consolidateId(shuffleId: Int, mapId: Int): ShuffleBlockId = {
- ShuffleBlockId(shuffleId, mapId, 0)
- }
-
def getDataFile(shuffleId: Int, mapId: Int): File = {
- blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0))
+ blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}
private def getIndexFile(shuffleId: Int, mapId: Int): File = {
- blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0))
+ blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}
/**
@@ -97,10 +92,6 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
}
}
- override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = {
- Some(getBlockData(blockId).nioByteBuffer())
- }
-
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
// The block is actually going to be a range of a single map output file for this map, so
// find out the consolidated file, then the offset within that from our index
@@ -123,3 +114,11 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
override def stop(): Unit = {}
}
+
+private[spark] object IndexShuffleBlockManager {
+ // No-op reduce ID used in interactions with disk store and BlockObjectWriter.
+ // The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
+ // shuffle outputs for several reduces are glommed into a single file.
+ // TODO: Avoid this entirely by having the DiskBlockObjectWriter not require a BlockId.
+ val NOOP_REDUCE_ID = 0
+}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
index b521f0c7fc..4342b0d598 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
@@ -22,15 +22,19 @@ import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.storage.ShuffleBlockId
private[spark]
-trait ShuffleBlockManager {
+/**
+ * Implementers of this trait understand how to retrieve block data for a logical shuffle block
+ * identifier (i.e. map, reduce, and shuffle). Implementations may use files or file segments to
+ * encapsulate shuffle data. This is used by the BlockStore to abstract over different shuffle
+ * implementations when shuffle data is retrieved.
+ */
+trait ShuffleBlockResolver {
type ShuffleId = Int
/**
- * Get shuffle block data managed by the local ShuffleBlockManager.
- * @return Some(ByteBuffer) if block found, otherwise None.
+ * Retrieve the data for the specified block. If the data for that block is not available,
+ * throws an unspecified exception.
*/
- def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer]
-
def getBlockData(blockId: ShuffleBlockId): ManagedBuffer
def stop(): Unit
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
index a44a8e1249..978366d1a1 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
@@ -55,7 +55,10 @@ private[spark] trait ShuffleManager {
*/
def unregisterShuffle(shuffleId: Int): Boolean
- def shuffleBlockManager: ShuffleBlockManager
+ /**
+ * Return a resolver capable of retrieving shuffle block data based on block coordinates.
+ */
+ def shuffleBlockResolver: ShuffleBlockResolver
/** Shut down this ShuffleManager. */
def stop(): Unit
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
index b934480cfb..f6e6fe5def 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
@@ -23,7 +23,7 @@ import org.apache.spark.scheduler.MapStatus
* Obtained inside a map task to write out records to the shuffle system.
*/
private[spark] trait ShuffleWriter[K, V] {
- /** Write a bunch of records to this task's output */
+ /** Write a sequence of records to this task's output */
def write(records: Iterator[_ <: Product2[K, V]]): Unit
/** Close this writer, passing along whether the map completed */
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
index 62e0629b34..2a7df8dd5b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
@@ -53,20 +53,20 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager
override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
: ShuffleWriter[K, V] = {
new HashShuffleWriter(
- shuffleBlockManager, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
+ shuffleBlockResolver, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
}
/** Remove a shuffle's metadata from the ShuffleManager. */
override def unregisterShuffle(shuffleId: Int): Boolean = {
- shuffleBlockManager.removeShuffle(shuffleId)
+ shuffleBlockResolver.removeShuffle(shuffleId)
}
- override def shuffleBlockManager: FileShuffleBlockManager = {
+ override def shuffleBlockResolver: FileShuffleBlockManager = {
fileShuffleBlockManager
}
/** Shut down this ShuffleManager. */
override def stop(): Unit = {
- shuffleBlockManager.stop()
+ shuffleBlockResolver.stop()
}
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index bda30a56d8..0497036192 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -58,7 +58,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]]
shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffleId, baseShuffleHandle.numMaps)
new SortShuffleWriter(
- shuffleBlockManager, baseShuffleHandle, mapId, context)
+ shuffleBlockResolver, baseShuffleHandle, mapId, context)
}
/** Remove a shuffle's metadata from the ShuffleManager. */
@@ -66,18 +66,19 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
if (shuffleMapNumber.containsKey(shuffleId)) {
val numMaps = shuffleMapNumber.remove(shuffleId)
(0 until numMaps).map{ mapId =>
- shuffleBlockManager.removeDataByMap(shuffleId, mapId)
+ shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
}
}
true
}
- override def shuffleBlockManager: IndexShuffleBlockManager = {
+ override def shuffleBlockResolver: IndexShuffleBlockManager = {
indexShuffleBlockManager
}
/** Shut down this ShuffleManager. */
override def stop(): Unit = {
- shuffleBlockManager.stop()
+ shuffleBlockResolver.stop()
}
}
+
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 55ea0f17b1..a066435df6 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -58,8 +58,7 @@ private[spark] class SortShuffleWriter[K, V, C](
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
- sorter = new ExternalSorter[K, V, V](
- None, Some(dep.partitioner), None, dep.serializer)
+ sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)
sorter.insertAll(records)
}
@@ -67,7 +66,7 @@ private[spark] class SortShuffleWriter[K, V, C](
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
- val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
+ val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockManager.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
@@ -100,3 +99,4 @@ private[spark] class SortShuffleWriter[K, V, C](
}
}
}
+
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1dff09a75d..fc31296f4d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -301,7 +301,7 @@ private[spark] class BlockManager(
*/
override def getBlockData(blockId: BlockId): ManagedBuffer = {
if (blockId.isShuffle) {
- shuffleManager.shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
+ shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
} else {
val blockBytesOpt = doGetLocal(blockId, asBlockResult = false)
.asInstanceOf[Option[ByteBuffer]]
@@ -439,14 +439,10 @@ private[spark] class BlockManager(
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
- val shuffleBlockManager = shuffleManager.shuffleBlockManager
- shuffleBlockManager.getBytes(blockId.asInstanceOf[ShuffleBlockId]) match {
- case Some(bytes) =>
- Some(bytes)
- case None =>
- throw new BlockException(
- blockId, s"Block $blockId not found on disk, though it should be")
- }
+ val shuffleBlockManager = shuffleManager.shuffleBlockResolver
+ // TODO: This should gracefully handle case where local block is not available. Currently
+ // downstream code will throw an exception.
+ Option(shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
} else {
doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index b962c101c9..7bd3c7852a 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -664,6 +664,8 @@ private[spark] class ExternalSorter[K, V, C](
}
/**
+ * Exposed for testing purposes.
+ *
* Return an iterator over all the data written to this object, grouped by partition and
* aggregated by the requested aggregator. For each partition we then have an iterator over its
* contents, and these are expected to be accessed in order (you can't "skip ahead" to one
@@ -673,7 +675,7 @@ private[spark] class ExternalSorter[K, V, C](
* For now, we just merge all the spilled files in once pass, but this can be modified to
* support hierarchical merging.
*/
- def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
+ def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
val usingMap = aggregator.isDefined
val collection: SizeTrackingPairCollection[(Int, K), C] = if (usingMap) map else buffer
if (spills.isEmpty && partitionWriters == null) {
@@ -781,7 +783,7 @@ private[spark] class ExternalSorter[K, V, C](
/**
* Read a partition file back as an iterator (used in our iterator method)
*/
- def readPartitionFile(writer: BlockObjectWriter): Iterator[Product2[K, C]] = {
+ private def readPartitionFile(writer: BlockObjectWriter): Iterator[Product2[K, C]] = {
if (writer.isOpen) {
writer.commitAndClose()
}
diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
index 6790388f96..b834dc0e73 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
@@ -54,7 +54,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local", "test", conf)
val shuffleBlockManager =
- SparkEnv.get.shuffleManager.shuffleBlockManager.asInstanceOf[FileShuffleBlockManager]
+ SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockManager]
val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf),
new ShuffleWriteMetrics)