diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/BlockManager.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 22 |
1 files changed, 14 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index cfe5b6c50a..a714142763 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -64,8 +64,8 @@ private[spark] class BlockManager( extends BlockDataProvider with Logging { private val port = conf.getInt("spark.blockManager.port", 0) - val shuffleBlockManager = new ShuffleBlockManager(this, shuffleManager) - val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf) + + val diskBlockManager = new DiskBlockManager(this, conf) val connectionManager = new ConnectionManager(port, conf, securityManager, "Connection manager for block manager") @@ -83,7 +83,7 @@ private[spark] class BlockManager( val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}" val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998") val tachyonBlockManager = - new TachyonBlockManager(shuffleBlockManager, tachyonStorePath, tachyonMaster) + new TachyonBlockManager(this, tachyonStorePath, tachyonMaster) tachyonInitialized = true new TachyonStore(this, tachyonBlockManager) } @@ -215,7 +215,7 @@ private[spark] class BlockManager( override def getBlockData(blockId: String): Either[FileSegment, ByteBuffer] = { val bid = BlockId(blockId) if (bid.isShuffle) { - Left(diskBlockManager.getBlockLocation(bid)) + shuffleManager.shuffleBlockManager.getBlockData(bid.asInstanceOf[ShuffleBlockId]) } else { val blockBytesOpt = doGetLocal(bid, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] if (blockBytesOpt.isDefined) { @@ -333,8 +333,14 @@ private[spark] class BlockManager( * shuffle blocks. It is safe to do so without a lock on block info since disk store * never deletes (recent) items. */ - def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { - diskStore.getValues(blockId, serializer).orElse { + def getLocalShuffleFromDisk( + blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { + + val shuffleBlockManager = shuffleManager.shuffleBlockManager + val values = shuffleBlockManager.getBytes(blockId.asInstanceOf[ShuffleBlockId]).map( + bytes => this.dataDeserialize(blockId, bytes, serializer)) + + values.orElse { throw new BlockException(blockId, s"Block $blockId not found on disk, though it should be") } } @@ -355,7 +361,8 @@ private[spark] class BlockManager( // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work if (blockId.isShuffle) { - diskStore.getBytes(blockId) match { + val shuffleBlockManager = shuffleManager.shuffleBlockManager + shuffleBlockManager.getBytes(blockId.asInstanceOf[ShuffleBlockId]) match { case Some(bytes) => Some(bytes) case None => @@ -1045,7 +1052,6 @@ private[spark] class BlockManager( def stop(): Unit = { connectionManager.stop() - shuffleBlockManager.stop() diskBlockManager.stop() actorSystem.stop(slaveActor) blockInfo.clear() |