diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 50 |
1 files changed, 18 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1a54d8ddb4..dc6f18a768 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -433,20 +433,31 @@ private[spark] class BlockManager( * Get block from remote block managers. */ def getRemote(blockId: BlockId): Option[Iterator[Any]] = { - if (blockId == null) { - throw new IllegalArgumentException("Block Id is null") - } logDebug("Getting remote block " + blockId) - // Get locations of block - val locations = Random.shuffle(master.getLocations(blockId)) + doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]] + } + + /** + * Get block from remote block managers as serialized bytes. + */ + def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = { + logDebug("Getting remote block " + blockId + " as bytes") + doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]] + } - // Get block from remote locations + private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = { + require(blockId != null, "BlockId is null") + val locations = Random.shuffle(master.getLocations(blockId)) for (loc <- locations) { logDebug("Getting remote block " + blockId + " from " + loc) val data = BlockManagerWorker.syncGetBlock( GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) if (data != null) { - return Some(dataDeserialize(blockId, data)) + if (asValues) { + return Some(dataDeserialize(blockId, data)) + } else { + return Some(data) + } } logDebug("The value of block " + blockId + " is null") } @@ -455,31 +466,6 @@ private[spark] class BlockManager( } /** - * Get block from remote block managers as serialized bytes. - */ - def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = { - // TODO: As with getLocalBytes, this is very similar to getRemote and perhaps should be - // refactored. - if (blockId == null) { - throw new IllegalArgumentException("Block Id is null") - } - logDebug("Getting remote block " + blockId + " as bytes") - - val locations = master.getLocations(blockId) - for (loc <- locations) { - logDebug("Getting remote block " + blockId + " from " + loc) - val data = BlockManagerWorker.syncGetBlock( - GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) - if (data != null) { - return Some(data) - } - logDebug("The value of block " + blockId + " is null") - } - logDebug("Block " + blockId + " not found") - return None - } - - /** * Get a block from the block manager (either local or remote). */ def get(blockId: BlockId): Option[Iterator[Any]] = { |