aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-05-19 11:47:17 -0700
committerAndrew Or <andrew@databricks.com>2016-05-19 11:47:17 -0700
commitad182086cc3bd7951aaf82693d9bcb56815b43e4 (patch)
tree14e801ca91c5b50ee1a36136dfe66c2d87a7d55c /core
parentef7a5e0bcaee45b907a10b73f11c838ef6e23614 (diff)
downloadspark-ad182086cc3bd7951aaf82693d9bcb56815b43e4.tar.gz
spark-ad182086cc3bd7951aaf82693d9bcb56815b43e4.tar.bz2
spark-ad182086cc3bd7951aaf82693d9bcb56815b43e4.zip
[SPARK-15300] Fix writer lock conflict when remove a block
## What changes were proposed in this pull request? A writer lock could be acquired when 1) create a new block 2) remove a block 3) evict a block to disk. 1) and 3) could happen in the same time within the same task, all of them could happen in the same time outside a task. It's OK that when someone try to grab the write block for a block, but the block is acquired by another one that has the same task attempt id. This PR remove the check. ## How was this patch tested? Updated existing tests. Author: Davies Liu <davies@databricks.com> Closes #13082 from davies/write_lock_conflict.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala6
2 files changed, 3 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index ca53534b61..20ffe1342e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -228,10 +228,7 @@ private[storage] class BlockInfoManager extends Logging {
infos.get(blockId) match {
case None => return None
case Some(info) =>
- if (info.writerTask == currentTaskAttemptId) {
- throw new IllegalStateException(
- s"Task $currentTaskAttemptId has already locked $blockId for writing")
- } else if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) {
+ if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) {
info.writerTask = currentTaskAttemptId
writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index 9ee83b76e7..1b325801e2 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -208,16 +208,14 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}
- test("cannot call lockForWriting while already holding a write lock") {
+ test("cannot grab a writer lock while already holding a write lock") {
withTaskId(0) {
assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
blockInfoManager.unlock("block")
}
withTaskId(1) {
assert(blockInfoManager.lockForWriting("block").isDefined)
- intercept[IllegalStateException] {
- blockInfoManager.lockForWriting("block")
- }
+ assert(blockInfoManager.lockForWriting("block", false).isEmpty)
blockInfoManager.assertBlockIsLockedForWriting("block")
}
}