aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-02 10:26:47 -0800
committerAndrew Or <andrew@databricks.com>2016-03-02 10:26:47 -0800
commitd6969ffc0f86c8a4ea0e94d06cb227178b000962 (patch)
tree85c4daf02f27ad110d9cba1b713eff49c3f154e4 /streaming/src
parent8f8d8a2315514cd1f3609bc06e5cf6e6d06fdd91 (diff)
downloadspark-d6969ffc0f86c8a4ea0e94d06cb227178b000962.tar.gz
spark-d6969ffc0f86c8a4ea0e94d06cb227178b000962.tar.bz2
spark-d6969ffc0f86c8a4ea0e94d06cb227178b000962.zip
[SPARK-12817] Add BlockManager.getOrElseUpdate and remove CacheManager
CacheManager directly calls MemoryStore.unrollSafely() and has its own logic for handling graceful fallback to disk when cached data does not fit in memory. However, this logic also exists inside of the MemoryStore itself, so this appears to be unnecessary duplication. Thanks to the addition of block-level read/write locks in #10705, we can refactor the code to remove the CacheManager and replace it with an atomic `BlockManager.getOrElseUpdate()` method. This pull request replaces / subsumes #10748. /cc andrewor14 and nongli for review. Note that this changes the locking semantics of a couple of internal BlockManager methods (`doPut()` and `lockNewBlockForWriting`), so please pay attention to the Scaladoc changes and new test cases for those methods. Author: Josh Rosen <joshrosen@databricks.com> Closes #11436 from JoshRosen/remove-cachemanager.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala4
1 files changed, 0 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 3d9c085013..e22e320b17 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -91,8 +91,6 @@ private[streaming] class BlockManagerBasedBlockHandler(
if (!putSucceeded) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
- } else {
- blockManager.releaseLock(blockId)
}
BlockManagerBasedStoreResult(blockId, numRecords)
}
@@ -191,8 +189,6 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
if (!putSucceeded) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
- } else {
- blockManager.releaseLock(blockId)
}
}