From d6969ffc0f86c8a4ea0e94d06cb227178b000962 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 2 Mar 2016 10:26:47 -0800 Subject: [SPARK-12817] Add BlockManager.getOrElseUpdate and remove CacheManager CacheManager directly calls MemoryStore.unrollSafely() and has its own logic for handling graceful fallback to disk when cached data does not fit in memory. However, this logic also exists inside of the MemoryStore itself, so this appears to be unnecessary duplication. Thanks to the addition of block-level read/write locks in #10705, we can refactor the code to remove the CacheManager and replace it with an atomic `BlockManager.getOrElseUpdate()` method. This pull request replaces / subsumes #10748. /cc andrewor14 and nongli for review. Note that this changes the locking semantics of a couple of internal BlockManager methods (`doPut()` and `lockNewBlockForWriting`), so please pay attention to the Scaladoc changes and new test cases for those methods. Author: Josh Rosen Closes #11436 from JoshRosen/remove-cachemanager. --- .../org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala | 4 ---- 1 file changed, 4 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 3d9c085013..e22e320b17 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -91,8 +91,6 @@ private[streaming] class BlockManagerBasedBlockHandler( if (!putSucceeded) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") - } else { - blockManager.releaseLock(blockId) } BlockManagerBasedStoreResult(blockId, numRecords) } @@ -191,8 +189,6 @@ private[streaming] class WriteAheadLogBasedBlockHandler( if (!putSucceeded) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") - } else { - blockManager.releaseLock(blockId) } } -- cgit v1.2.3