aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/BlockManager.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala22
1 files changed, 14 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index cfe5b6c50a..a714142763 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -64,8 +64,8 @@ private[spark] class BlockManager(
extends BlockDataProvider with Logging {
private val port = conf.getInt("spark.blockManager.port", 0)
- val shuffleBlockManager = new ShuffleBlockManager(this, shuffleManager)
- val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
+
+ val diskBlockManager = new DiskBlockManager(this, conf)
val connectionManager =
new ConnectionManager(port, conf, securityManager, "Connection manager for block manager")
@@ -83,7 +83,7 @@ private[spark] class BlockManager(
val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998")
val tachyonBlockManager =
- new TachyonBlockManager(shuffleBlockManager, tachyonStorePath, tachyonMaster)
+ new TachyonBlockManager(this, tachyonStorePath, tachyonMaster)
tachyonInitialized = true
new TachyonStore(this, tachyonBlockManager)
}
@@ -215,7 +215,7 @@ private[spark] class BlockManager(
override def getBlockData(blockId: String): Either[FileSegment, ByteBuffer] = {
val bid = BlockId(blockId)
if (bid.isShuffle) {
- Left(diskBlockManager.getBlockLocation(bid))
+ shuffleManager.shuffleBlockManager.getBlockData(bid.asInstanceOf[ShuffleBlockId])
} else {
val blockBytesOpt = doGetLocal(bid, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
if (blockBytesOpt.isDefined) {
@@ -333,8 +333,14 @@ private[spark] class BlockManager(
* shuffle blocks. It is safe to do so without a lock on block info since disk store
* never deletes (recent) items.
*/
- def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
- diskStore.getValues(blockId, serializer).orElse {
+ def getLocalShuffleFromDisk(
+ blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
+
+ val shuffleBlockManager = shuffleManager.shuffleBlockManager
+ val values = shuffleBlockManager.getBytes(blockId.asInstanceOf[ShuffleBlockId]).map(
+ bytes => this.dataDeserialize(blockId, bytes, serializer))
+
+ values.orElse {
throw new BlockException(blockId, s"Block $blockId not found on disk, though it should be")
}
}
@@ -355,7 +361,8 @@ private[spark] class BlockManager(
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
- diskStore.getBytes(blockId) match {
+ val shuffleBlockManager = shuffleManager.shuffleBlockManager
+ shuffleBlockManager.getBytes(blockId.asInstanceOf[ShuffleBlockId]) match {
case Some(bytes) =>
Some(bytes)
case None =>
@@ -1045,7 +1052,6 @@ private[spark] class BlockManager(
def stop(): Unit = {
connectionManager.stop()
- shuffleBlockManager.stop()
diskBlockManager.stop()
actorSystem.stop(slaveActor)
blockInfo.clear()