diff options
author | Mridul Muralidharan <mridul@gmail.com> | 2013-04-16 03:19:24 +0530 |
---|---|---|
committer | Mridul Muralidharan <mridul@gmail.com> | 2013-04-16 03:19:24 +0530 |
commit | dd2b64ec97ad241b6f171cac0dbb1841b185675a (patch) | |
tree | 9032642536092f2029bcca87fc6b59d2287b1518 /core | |
parent | 5540ab8243a8488e30a21e1d4bb1720f1a9a555f (diff) | |
download | spark-dd2b64ec97ad241b6f171cac0dbb1841b185675a.tar.gz spark-dd2b64ec97ad241b6f171cac0dbb1841b185675a.tar.bz2 spark-dd2b64ec97ad241b6f171cac0dbb1841b185675a.zip |
Fix bug with atomic update
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManager.scala | 44 |
1 files changed, 31 insertions, 13 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 10e70723db..483b6de34b 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -527,13 +527,22 @@ class BlockManager( // Remember the block's storage level so that we can correctly drop it to disk if it needs // to be dropped right after it got put into memory. Note, however, that other threads will // not be able to get() this block until we call markReady on its BlockInfo. - val myInfo = new BlockInfo(level, tellMaster) - // Do atomically ! - val oldBlockOpt = blockInfo.putIfAbsent(blockId, myInfo) + val myInfo = { + val tinfo = new BlockInfo(level, tellMaster) + // Do atomically ! + val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) + + if (oldBlockOpt.isDefined) { + if (oldBlockOpt.get.waitForReady()) { + logWarning("Block " + blockId + " already exists on this machine; not re-adding it") + return oldBlockOpt.get.size + } - if (oldBlockOpt.isDefined && oldBlockOpt.get.waitForReady()) { - logWarning("Block " + blockId + " already exists on this machine; not re-adding it") - return oldBlockOpt.get.size + // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ? + oldBlockOpt.get + } else { + tinfo + } } val startTimeMs = System.currentTimeMillis @@ -638,13 +647,22 @@ class BlockManager( // Remember the block's storage level so that we can correctly drop it to disk if it needs // to be dropped right after it got put into memory. Note, however, that other threads will // not be able to get() this block until we call markReady on its BlockInfo. - val myInfo = new BlockInfo(level, tellMaster) - // Do atomically ! - val prevInfo = blockInfo.putIfAbsent(blockId, myInfo) - if (prevInfo != null) { - // Should we check for prevInfo.waitForReady() here ? - logWarning("Block " + blockId + " already exists on this machine; not re-adding it") - return + val myInfo = { + val tinfo = new BlockInfo(level, tellMaster) + // Do atomically ! + val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) + + if (oldBlockOpt.isDefined) { + if (oldBlockOpt.get.waitForReady()) { + logWarning("Block " + blockId + " already exists on this machine; not re-adding it") + return + } + + // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ? + oldBlockOpt.get + } else { + tinfo + } } val startTimeMs = System.currentTimeMillis |