diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-10-18 14:41:53 -0700 |
---|---|---|
committer | Josh Rosen <rosenville@gmail.com> | 2013-10-19 19:53:12 -0700 |
commit | 6925a1322b7c46ed8fa2ec0e1aee4c4f4b28fa0a (patch) | |
tree | 0031d3383691217838c2512dc4b7cc6cef1d5761 /core | |
parent | 82791856519a5237a7bc9261e788d73fcac695fb (diff) | |
download | spark-6925a1322b7c46ed8fa2ec0e1aee4c4f4b28fa0a.tar.gz spark-6925a1322b7c46ed8fa2ec0e1aee4c4f4b28fa0a.tar.bz2 spark-6925a1322b7c46ed8fa2ec0e1aee4c4f4b28fa0a.zip |
Code de-duplication in put() and putBytes().
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 229 |
1 files changed, 89 insertions, 140 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index dc6f18a768..9bf1994d17 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -530,17 +530,24 @@ private[spark] class BlockManager( * Put a new block of values to the block manager. Returns its (estimated) size in bytes. */ def put(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel, - tellMaster: Boolean = true) : Long = { + tellMaster: Boolean = true) : Long = { + require(values != null, "Values is null") + doPut(blockId, Left(values), level, tellMaster) + } - if (blockId == null) { - throw new IllegalArgumentException("Block Id is null") - } - if (values == null) { - throw new IllegalArgumentException("Values is null") - } - if (level == null || !level.isValid) { - throw new IllegalArgumentException("Storage level is null or invalid") - } + /** + * Put a new block of serialized bytes to the block manager. + */ + def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel, + tellMaster: Boolean = true) { + require(bytes != null, "Bytes is null") + doPut(blockId, Right(bytes), level, tellMaster) + } + + private def doPut(blockId: BlockId, data: Either[ArrayBuffer[Any], ByteBuffer], + level: StorageLevel, tellMaster: Boolean = true): Long = { + require(blockId != null, "BlockId is null") + require(level != null && level.isValid, "StorageLevel is null or invalid") // Remember the block's storage level so that we can correctly drop it to disk if it needs // to be dropped right after it got put into memory. Note, however, that other threads will @@ -556,7 +563,8 @@ private[spark] class BlockManager( return oldBlockOpt.get.size } - // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ? + // TODO: So the block info exists - but previous attempt to load it (?) failed. + // What do we do now ? Retry on it ? oldBlockOpt.get } else { tinfo @@ -565,10 +573,10 @@ private[spark] class BlockManager( val startTimeMs = System.currentTimeMillis - // If we need to replicate the data, we'll want access to the values, but because our - // put will read the whole iterator, there will be no values left. For the case where - // the put serializes data, we'll remember the bytes, above; but for the case where it - // doesn't, such as deserialized storage, let's rely on the put returning an Iterator. + // If we're storing values and we need to replicate the data, we'll want access to the values, + // but because our put will read the whole iterator, there will be no values left. For the + // case where the put serializes data, we'll remember the bytes, above; but for the case where + // it doesn't, such as deserialized storage, let's rely on the put returning an Iterator. var valuesAfterPut: Iterator[Any] = null // Ditto for the bytes after the put @@ -577,30 +585,57 @@ private[spark] class BlockManager( // Size of the block in bytes (to return to caller) var size = 0L + // If we're storing bytes, then initiate the replication before storing them locally. + // This is faster as data is already serialized and ready to send. + val replicationFuture = if (data.isRight && level.replication > 1) { + val bufferView = data.right.get.duplicate() // Doesn't copy the bytes, just creates a wrapper + Future { + replicate(blockId, bufferView, level) + } + } else { + null + } + myInfo.synchronized { logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") var marked = false try { - if (level.useMemory) { - // Save it just to memory first, even if it also has useDisk set to true; we will later - // drop it to disk if the memory store can't hold it. - val res = memoryStore.putValues(blockId, values, level, true) - size = res.size - res.data match { - case Right(newBytes) => bytesAfterPut = newBytes - case Left(newIterator) => valuesAfterPut = newIterator + data match { + case Left(values) => { + if (level.useMemory) { + // Save it just to memory first, even if it also has useDisk set to true; we will + // drop it to disk later if the memory store can't hold it. + val res = memoryStore.putValues(blockId, values, level, true) + size = res.size + res.data match { + case Right(newBytes) => bytesAfterPut = newBytes + case Left(newIterator) => valuesAfterPut = newIterator + } + } else { + // Save directly to disk. + // Don't get back the bytes unless we replicate them. + val askForBytes = level.replication > 1 + val res = diskStore.putValues(blockId, values, level, askForBytes) + size = res.size + res.data match { + case Right(newBytes) => bytesAfterPut = newBytes + case _ => + } + } } - } else { - // Save directly to disk. - // Don't get back the bytes unless we replicate them. - val askForBytes = level.replication > 1 - val res = diskStore.putValues(blockId, values, level, askForBytes) - size = res.size - res.data match { - case Right(newBytes) => bytesAfterPut = newBytes - case _ => + case Right(bytes) => { + if (level.useMemory) { + // Store it only in memory at first, even if useDisk is also set to true + bytes.rewind() + memoryStore.putBytes(blockId, bytes, level) + size = bytes.limit + } else { + bytes.rewind() + diskStore.putBytes(blockId, bytes, level) + size = bytes.limit + } } } @@ -625,125 +660,39 @@ private[spark] class BlockManager( } logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs)) - // Replicate block if required + // Either we're storing bytes and we asynchronously started replication, or we're storing + // values and need to serialize and replicate them now: if (level.replication > 1) { - val remoteStartTime = System.currentTimeMillis - // Serialize the block if not already done - if (bytesAfterPut == null) { - if (valuesAfterPut == null) { - throw new SparkException( - "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.") - } - bytesAfterPut = dataSerialize(blockId, valuesAfterPut) - } - replicate(blockId, bytesAfterPut, level) - logDebug("Put block " + blockId + " remotely took " + Utils.getUsedTimeMs(remoteStartTime)) - } - BlockManager.dispose(bytesAfterPut) - - return size - } - - - /** - * Put a new block of serialized bytes to the block manager. - */ - def putBytes( - blockId: BlockId, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) { - - if (blockId == null) { - throw new IllegalArgumentException("Block Id is null") - } - if (bytes == null) { - throw new IllegalArgumentException("Bytes is null") - } - if (level == null || !level.isValid) { - throw new IllegalArgumentException("Storage level is null or invalid") - } - - // Remember the block's storage level so that we can correctly drop it to disk if it needs - // to be dropped right after it got put into memory. Note, however, that other threads will - // not be able to get() this block until we call markReady on its BlockInfo. - val myInfo = { - val tinfo = new BlockInfo(level, tellMaster) - // Do atomically ! - val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) - - if (oldBlockOpt.isDefined) { - if (oldBlockOpt.get.waitForReady()) { - logWarning("Block " + blockId + " already exists on this machine; not re-adding it") - return - } - - // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ? - oldBlockOpt.get - } else { - tinfo - } - } - - val startTimeMs = System.currentTimeMillis - - // Initiate the replication before storing it locally. This is faster as - // data is already serialized and ready for sending - val replicationFuture = if (level.replication > 1) { - val bufferView = bytes.duplicate() // Doesn't copy the bytes, just creates a wrapper - Future { - replicate(blockId, bufferView, level) - } - } else { - null - } - - myInfo.synchronized { - logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) - + " to get into synchronized block") - - var marked = false - try { - if (level.useMemory) { - // Store it only in memory at first, even if useDisk is also set to true - bytes.rewind() - memoryStore.putBytes(blockId, bytes, level) - } else { - bytes.rewind() - diskStore.putBytes(blockId, bytes, level) - } - - // assert (0 == bytes.position(), "" + bytes) - - // Now that the block is in either the memory or disk store, let other threads read it, - // and tell the master about it. - marked = true - myInfo.markReady(bytes.limit) - if (tellMaster) { - reportBlockStatus(blockId, myInfo) - } - } finally { - // If we failed at putting the block to memory/disk, notify other possible readers - // that it has failed, and then remove it from the block info map. - if (! marked) { - // Note that the remove must happen before markFailure otherwise another thread - // could've inserted a new BlockInfo before we remove it. - blockInfo.remove(blockId) - myInfo.markFailure() - logWarning("Putting block " + blockId + " failed") + data match { + case Right(bytes) => Await.ready(replicationFuture, Duration.Inf) + case Left(values) => { + val remoteStartTime = System.currentTimeMillis + // Serialize the block if not already done + if (bytesAfterPut == null) { + if (valuesAfterPut == null) { + throw new SparkException( + "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.") + } + bytesAfterPut = dataSerialize(blockId, valuesAfterPut) + } + replicate(blockId, bytesAfterPut, level) + logDebug("Put block " + blockId + " remotely took " + + Utils.getUsedTimeMs(remoteStartTime)) } } } - // If replication had started, then wait for it to finish - if (level.replication > 1) { - Await.ready(replicationFuture, Duration.Inf) - } + BlockManager.dispose(bytesAfterPut) if (level.replication > 1) { - logDebug("PutBytes for block " + blockId + " with replication took " + + logDebug("Put for block " + blockId + " with replication took " + Utils.getUsedTimeMs(startTimeMs)) } else { - logDebug("PutBytes for block " + blockId + " without replication took " + + logDebug("Put for block " + blockId + " without replication took " + Utils.getUsedTimeMs(startTimeMs)) } + + size } /** |