aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-09 16:28:45 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-09 16:28:45 -0800
commit355c8e4b17cc3e67b1e18cc24e74d88416b5779b (patch)
tree0cf988a5420b573aab367538ae21dff09cc014fc /core
parentfc3d0b602a08fdd182c2138506d1cd9952631f95 (diff)
downloadspark-355c8e4b17cc3e67b1e18cc24e74d88416b5779b.tar.gz
spark-355c8e4b17cc3e67b1e18cc24e74d88416b5779b.tar.bz2
spark-355c8e4b17cc3e67b1e18cc24e74d88416b5779b.zip
Fixed deadlock in BlockManager.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala111
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala79
2 files changed, 101 insertions, 89 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index bd9155ef29..8c7b1417be 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -50,16 +50,6 @@ private[spark]
case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
-
-private[spark] class BlockLocker(numLockers: Int) {
- private val hashLocker = Array.fill(numLockers)(new Object())
-
- def getLock(blockId: String): Object = {
- return hashLocker(math.abs(blockId.hashCode % numLockers))
- }
-}
-
-
private[spark]
class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
extends Logging {
@@ -87,10 +77,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- private val NUM_LOCKS = 337
- private val locker = new BlockLocker(NUM_LOCKS)
-
- private val blockInfo = new ConcurrentHashMap[String, BlockInfo]()
+ private val blockInfo = new ConcurrentHashMap[String, BlockInfo](1000)
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore: BlockStore =
@@ -110,7 +97,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val maxBytesInFlight =
System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
+ // Whether to compress broadcast variables that are stored
val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean
+ // Whether to compress shuffle output that are stored
val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean
// Whether to compress RDD partitions that are stored serialized
val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
@@ -150,28 +139,27 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*/
def reportBlockStatus(blockId: String) {
- locker.getLock(blockId).synchronized {
- val curLevel = blockInfo.get(blockId) match {
- case null =>
- StorageLevel.NONE
- case info =>
+ val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match {
+ case null =>
+ (StorageLevel.NONE, 0L, 0L)
+ case info =>
+ info.synchronized {
info.level match {
case null =>
- StorageLevel.NONE
+ (StorageLevel.NONE, 0L, 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
- new StorageLevel(onDisk, inMem, level.deserialized, level.replication)
+ (
+ new StorageLevel(onDisk, inMem, level.deserialized, level.replication),
+ if (inMem) memoryStore.getSize(blockId) else 0L,
+ if (onDisk) diskStore.getSize(blockId) else 0L
+ )
}
- }
- master.mustHeartBeat(HeartBeat(
- blockManagerId,
- blockId,
- curLevel,
- if (curLevel.useMemory) memoryStore.getSize(blockId) else 0L,
- if (curLevel.useDisk) diskStore.getSize(blockId) else 0L))
- logDebug("Told master about block " + blockId)
+ }
}
+ master.mustHeartBeat(HeartBeat(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
+ logDebug("Told master about block " + blockId)
}
/**
@@ -213,9 +201,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- locker.getLock(blockId).synchronized {
- val info = blockInfo.get(blockId)
- if (info != null) {
+ val info = blockInfo.get(blockId)
+ if (info != null) {
+ info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
val level = info.level
logDebug("Level for block " + blockId + " is " + level)
@@ -273,9 +261,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
}
- } else {
- logDebug("Block " + blockId + " not registered locally")
}
+ } else {
+ logDebug("Block " + blockId + " not registered locally")
}
return None
}
@@ -298,9 +286,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- locker.getLock(blockId).synchronized {
- val info = blockInfo.get(blockId)
- if (info != null) {
+ val info = blockInfo.get(blockId)
+ if (info != null) {
+ info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
val level = info.level
logDebug("Level for block " + blockId + " is " + level)
@@ -338,10 +326,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
}
- } else {
- logDebug("Block " + blockId + " not registered locally")
}
+ } else {
+ logDebug("Block " + blockId + " not registered locally")
}
+
return None
}
@@ -583,7 +572,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Size of the block in bytes (to return to caller)
var size = 0L
- locker.getLock(blockId).synchronized {
+ myInfo.synchronized {
logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
@@ -681,7 +670,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
null
}
- locker.getLock(blockId).synchronized {
+ myInfo.synchronized {
logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
@@ -779,26 +768,30 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
*/
def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
logInfo("Dropping block " + blockId + " from memory")
- locker.getLock(blockId).synchronized {
- val info = blockInfo.get(blockId)
- val level = info.level
- if (level.useDisk && !diskStore.contains(blockId)) {
- logInfo("Writing block " + blockId + " to disk")
- data match {
- case Left(elements) =>
- diskStore.putValues(blockId, elements, level, false)
- case Right(bytes) =>
- diskStore.putBytes(blockId, bytes, level)
+ val info = blockInfo.get(blockId)
+ if (info != null) {
+ info.synchronized {
+ val level = info.level
+ if (level.useDisk && !diskStore.contains(blockId)) {
+ logInfo("Writing block " + blockId + " to disk")
+ data match {
+ case Left(elements) =>
+ diskStore.putValues(blockId, elements, level, false)
+ case Right(bytes) =>
+ diskStore.putBytes(blockId, bytes, level)
+ }
+ }
+ memoryStore.remove(blockId)
+ if (info.tellMaster) {
+ reportBlockStatus(blockId)
+ }
+ if (!level.useDisk) {
+ // The block is completely gone from this node; forget it so we can put() it again later.
+ blockInfo.remove(blockId)
}
}
- memoryStore.remove(blockId)
- if (info.tellMaster) {
- reportBlockStatus(blockId)
- }
- if (!level.useDisk) {
- // The block is completely gone from this node; forget it so we can put() it again later.
- blockInfo.remove(blockId)
- }
+ } else {
+ // The block has already been dropped
}
}
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index 773970446a..09769d1f7d 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -17,13 +17,16 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
private var currentMemory = 0L
+ // Object used to ensure that only one thread is putting blocks and if necessary, dropping
+ // blocks from the memory store.
+ private val putLock = new Object()
logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
def freeMemory: Long = maxMemory - currentMemory
override def getSize(blockId: String): Long = {
- synchronized {
+ entries.synchronized {
entries.get(blockId).size
}
}
@@ -38,8 +41,6 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
tryToPut(blockId, elements, sizeEstimate, true)
} else {
val entry = new Entry(bytes, bytes.limit, false)
- ensureFreeSpace(blockId, bytes.limit)
- synchronized { entries.put(blockId, entry) }
tryToPut(blockId, bytes, bytes.limit, false)
}
}
@@ -63,7 +64,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def getBytes(blockId: String): Option[ByteBuffer] = {
- val entry = synchronized {
+ val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
@@ -76,7 +77,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def getValues(blockId: String): Option[Iterator[Any]] = {
- val entry = synchronized {
+ val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
@@ -90,7 +91,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def remove(blockId: String) {
- synchronized {
+ entries.synchronized {
val entry = entries.get(blockId)
if (entry != null) {
entries.remove(blockId)
@@ -104,7 +105,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def clear() {
- synchronized {
+ entries.synchronized {
entries.clear()
}
logInfo("MemoryStore cleared")
@@ -125,12 +126,22 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* Try to put in a set of values, if we can free up enough space. The value should either be
* an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
* size must also be passed by the caller.
+ *
+ * Locks on the object putLock to ensure that all the put requests and its associated block
+ * dropping is done by only on thread at a time. Otherwise while one thread is dropping
+ * blocks to free memory for one block, another thread may use up the freed space for
+ * another block.
*/
private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = {
- synchronized {
+ // TODO: Its possible to optimize the locking by locking entries only when selecting blocks
+ // to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been
+ // released, it must be ensured that those to-be-dropped blocks are not double counted for
+ // freeing up more space for another block that needs to be put. Only then the actually dropping
+ // of blocks (and writing to disk if necessary) can proceed in parallel.
+ putLock.synchronized {
if (ensureFreeSpace(blockId, size)) {
val entry = new Entry(value, size, deserialized)
- entries.put(blockId, entry)
+ entries.synchronized { entries.put(blockId, entry) }
currentMemory += size
if (deserialized) {
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
@@ -160,8 +171,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
* don't fit into memory that we want to avoid).
*
- * Assumes that a lock on the MemoryStore is held by the caller. (Otherwise, the freed space
- * might fill up before the caller puts in their new value.)
+ * Assumes that a lock is held by the caller to ensure only one thread is dropping blocks.
+ * Otherwise, the freed space may fill up before the caller puts in their new value.
*/
private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
@@ -172,36 +183,44 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
return false
}
- // TODO: This should relinquish the lock on the MemoryStore while flushing out old blocks
- // in order to allow parallelism in writing to disk
if (maxMemory - currentMemory < space) {
val rddToAdd = getRddId(blockIdToAdd)
val selectedBlocks = new ArrayBuffer[String]()
var selectedMemory = 0L
- val iterator = entries.entrySet().iterator()
- while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
- val pair = iterator.next()
- val blockId = pair.getKey
- if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
- logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
- "block from the same RDD")
- return false
+ // This is synchronized to ensure that the set of entries is not changed
+ // (because of getValue or getBytes) while traversing the iterator, as that
+ // can lead to exceptions.
+ entries.synchronized {
+ val iterator = entries.entrySet().iterator()
+ while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
+ val pair = iterator.next()
+ val blockId = pair.getKey
+ if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
+ logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
+ "block from the same RDD")
+ return false
+ }
+ selectedBlocks += blockId
+ selectedMemory += pair.getValue.size
}
- selectedBlocks += blockId
- selectedMemory += pair.getValue.size
}
if (maxMemory - (currentMemory - selectedMemory) >= space) {
logInfo(selectedBlocks.size + " blocks selected for dropping")
for (blockId <- selectedBlocks) {
- val entry = entries.get(blockId)
- val data = if (entry.deserialized) {
- Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
- } else {
- Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
+ val entry = entries.synchronized { entries.get(blockId) }
+ // This should never be null as only one thread should be dropping
+ // blocks and removing entries. However the check is still here for
+ // future safety.
+ if (entry != null) {
+ val data = if (entry.deserialized) {
+ Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
+ } else {
+ Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
+ }
+ blockManager.dropFromMemory(blockId, data)
}
- blockManager.dropFromMemory(blockId, data)
}
return true
} else {
@@ -212,7 +231,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def contains(blockId: String): Boolean = {
- synchronized { entries.containsKey(blockId) }
+ entries.synchronized { entries.containsKey(blockId) }
}
}