aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMridul Muralidharan <mridul@gmail.com>2013-04-16 03:19:24 +0530
committerMridul Muralidharan <mridul@gmail.com>2013-04-16 03:19:24 +0530
commitdd2b64ec97ad241b6f171cac0dbb1841b185675a (patch)
tree9032642536092f2029bcca87fc6b59d2287b1518 /core
parent5540ab8243a8488e30a21e1d4bb1720f1a9a555f (diff)
downloadspark-dd2b64ec97ad241b6f171cac0dbb1841b185675a.tar.gz
spark-dd2b64ec97ad241b6f171cac0dbb1841b185675a.tar.bz2
spark-dd2b64ec97ad241b6f171cac0dbb1841b185675a.zip
Fix bug with atomic update
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala44
1 files changed, 31 insertions, 13 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 10e70723db..483b6de34b 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -527,13 +527,22 @@ class BlockManager(
// Remember the block's storage level so that we can correctly drop it to disk if it needs
// to be dropped right after it got put into memory. Note, however, that other threads will
// not be able to get() this block until we call markReady on its BlockInfo.
- val myInfo = new BlockInfo(level, tellMaster)
- // Do atomically !
- val oldBlockOpt = blockInfo.putIfAbsent(blockId, myInfo)
+ val myInfo = {
+ val tinfo = new BlockInfo(level, tellMaster)
+ // Do atomically !
+ val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
+
+ if (oldBlockOpt.isDefined) {
+ if (oldBlockOpt.get.waitForReady()) {
+ logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
+ return oldBlockOpt.get.size
+ }
- if (oldBlockOpt.isDefined && oldBlockOpt.get.waitForReady()) {
- logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
- return oldBlockOpt.get.size
+ // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ?
+ oldBlockOpt.get
+ } else {
+ tinfo
+ }
}
val startTimeMs = System.currentTimeMillis
@@ -638,13 +647,22 @@ class BlockManager(
// Remember the block's storage level so that we can correctly drop it to disk if it needs
// to be dropped right after it got put into memory. Note, however, that other threads will
// not be able to get() this block until we call markReady on its BlockInfo.
- val myInfo = new BlockInfo(level, tellMaster)
- // Do atomically !
- val prevInfo = blockInfo.putIfAbsent(blockId, myInfo)
- if (prevInfo != null) {
- // Should we check for prevInfo.waitForReady() here ?
- logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
- return
+ val myInfo = {
+ val tinfo = new BlockInfo(level, tellMaster)
+ // Do atomically !
+ val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
+
+ if (oldBlockOpt.isDefined) {
+ if (oldBlockOpt.get.waitForReady()) {
+ logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
+ return
+ }
+
+ // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ?
+ oldBlockOpt.get
+ } else {
+ tinfo
+ }
}
val startTimeMs = System.currentTimeMillis