aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorhushan[胡珊] <hushan@xiaomi.com>2014-12-09 15:11:20 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-12-09 15:54:40 -0800
commit30dca924df0efbdc1b638fa7c705fe8743570783 (patch)
treeb6d36c88d0a676c70b73a0efe65eda5edfb6adf8 /core
parent1f5110630c1abb13a357b463c805a39772923b82 (diff)
downloadspark-30dca924df0efbdc1b638fa7c705fe8743570783.tar.gz
spark-30dca924df0efbdc1b638fa7c705fe8743570783.tar.bz2
spark-30dca924df0efbdc1b638fa7c705fe8743570783.zip
[SPARK-4714] BlockManager.dropFromMemory() should check whether block has been removed after synchronizing on BlockInfo instance.
After synchronizing on the `info` lock in the `removeBlock`/`dropOldBlocks`/`dropFromMemory` methods in BlockManager, the block that `info` represented may have already removed. The three methods have the same logic to get the `info` lock: ``` info = blockInfo.get(id) if (info != null) { info.synchronized { // do something } } ``` So, there is chance that when a thread enters the `info.synchronized` block, `info` has already been removed from the `blockInfo` map by some other thread who entered `info.synchronized` first. The `removeBlock` and `dropOldBlocks` methods are idempotent, so it's safe for them to run on blocks that have already been removed. But in `dropFromMemory` it may be problematic since it may drop block data which already removed into the diskstore, and this calls data store operations that are not designed to handle missing blocks. This patch fixes this issue by adding a check to `dropFromMemory` to test whether blocks have been removed by a racing thread. Author: hushan[胡珊] <hushan@xiaomi.com> Closes #3574 from suyanNone/refine-block-concurrency and squashes the following commits: edb989d [hushan[胡珊]] Refine code style and comments position 55fa4ba [hushan[胡珊]] refine code e57e270 [hushan[胡珊]] add check info is already remove or not while having gotten info.syn
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala4
1 files changed, 3 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 308c59eda5..d7b184f8a1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1014,8 +1014,10 @@ private[spark] class BlockManager(
// If we get here, the block write failed.
logWarning(s"Block $blockId was marked as failure. Nothing to drop")
return None
+ } else if (blockInfo.get(blockId).isEmpty) {
+ logWarning(s"Block $blockId was already dropped.")
+ return None
}
-
var blockIsUpdated = false
val level = info.level