aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-09 14:09:37 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-09 14:09:37 -0800
commitde00bc63dbc8db334f28fcb428e578919a9df7a1 (patch)
tree0d4535d4a0beda229df17eed0d059ecd024895d7
parent6607f546ccadf307b0a862f1b52ab0b12316420d (diff)
downloadspark-de00bc63dbc8db334f28fcb428e578919a9df7a1.tar.gz
spark-de00bc63dbc8db334f28fcb428e578919a9df7a1.tar.bz2
spark-de00bc63dbc8db334f28fcb428e578919a9df7a1.zip
Fixed deadlock in BlockManager.
1. Changed the lock structure of BlockManager by replacing the 337 coarse-grained locks to use BlockInfo objects as per-block fine-grained locks. 2. Changed the MemoryStore lock structure by making the block putting threads lock on a different object (not the memory store) thus making sure putting threads minimally blocks to the getting treads. 3. Added spark.storage.ThreadingTest to stress test the BlockManager using 5 block producer and 5 block consumer threads.
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala111
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala79
-rw-r--r--core/src/main/scala/spark/storage/ThreadingTest.scala77
3 files changed, 180 insertions, 87 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index bd9155ef29..bf52b510b4 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -50,16 +50,6 @@ private[spark]
case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
-
-private[spark] class BlockLocker(numLockers: Int) {
- private val hashLocker = Array.fill(numLockers)(new Object())
-
- def getLock(blockId: String): Object = {
- return hashLocker(math.abs(blockId.hashCode % numLockers))
- }
-}
-
-
private[spark]
class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
extends Logging {
@@ -87,10 +77,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- private val NUM_LOCKS = 337
- private val locker = new BlockLocker(NUM_LOCKS)
-
- private val blockInfo = new ConcurrentHashMap[String, BlockInfo]()
+ private val blockInfo = new ConcurrentHashMap[String, BlockInfo](1000)
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore: BlockStore =
@@ -110,7 +97,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val maxBytesInFlight =
System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
+ // Whether to compress broadcast variables that are stored
val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean
+ // Whether to compress shuffle output that are stored
val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean
// Whether to compress RDD partitions that are stored serialized
val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
@@ -150,28 +139,28 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*/
def reportBlockStatus(blockId: String) {
- locker.getLock(blockId).synchronized {
- val curLevel = blockInfo.get(blockId) match {
- case null =>
- StorageLevel.NONE
- case info =>
+
+ val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match {
+ case null =>
+ (StorageLevel.NONE, 0L, 0L)
+ case info =>
+ info.synchronized {
info.level match {
case null =>
- StorageLevel.NONE
+ (StorageLevel.NONE, 0L, 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
- new StorageLevel(onDisk, inMem, level.deserialized, level.replication)
+ (
+ new StorageLevel(onDisk, inMem, level.deserialized, level.replication),
+ if (inMem) memoryStore.getSize(blockId) else 0L,
+ if (onDisk) diskStore.getSize(blockId) else 0L
+ )
}
- }
- master.mustHeartBeat(HeartBeat(
- blockManagerId,
- blockId,
- curLevel,
- if (curLevel.useMemory) memoryStore.getSize(blockId) else 0L,
- if (curLevel.useDisk) diskStore.getSize(blockId) else 0L))
- logDebug("Told master about block " + blockId)
+ }
}
+ master.mustHeartBeat(HeartBeat(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
+ logDebug("Told master about block " + blockId)
}
/**
@@ -213,9 +202,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- locker.getLock(blockId).synchronized {
- val info = blockInfo.get(blockId)
- if (info != null) {
+ val info = blockInfo.get(blockId)
+ if (info != null) {
+ info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
val level = info.level
logDebug("Level for block " + blockId + " is " + level)
@@ -273,9 +262,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
}
- } else {
- logDebug("Block " + blockId + " not registered locally")
}
+ } else {
+ logDebug("Block " + blockId + " not registered locally")
}
return None
}
@@ -298,9 +287,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- locker.getLock(blockId).synchronized {
- val info = blockInfo.get(blockId)
- if (info != null) {
+ val info = blockInfo.get(blockId)
+ if (info != null) {
+ info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
val level = info.level
logDebug("Level for block " + blockId + " is " + level)
@@ -338,9 +327,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
}
- } else {
- logDebug("Block " + blockId + " not registered locally")
}
+ } else {
+ logDebug("Block " + blockId + " not registered locally")
}
return None
}
@@ -583,7 +572,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Size of the block in bytes (to return to caller)
var size = 0L
- locker.getLock(blockId).synchronized {
+ myInfo.synchronized {
logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
@@ -681,7 +670,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
null
}
- locker.getLock(blockId).synchronized {
+ myInfo.synchronized {
logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
@@ -779,26 +768,30 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
*/
def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
logInfo("Dropping block " + blockId + " from memory")
- locker.getLock(blockId).synchronized {
- val info = blockInfo.get(blockId)
- val level = info.level
- if (level.useDisk && !diskStore.contains(blockId)) {
- logInfo("Writing block " + blockId + " to disk")
- data match {
- case Left(elements) =>
- diskStore.putValues(blockId, elements, level, false)
- case Right(bytes) =>
- diskStore.putBytes(blockId, bytes, level)
+ val info = blockInfo.get(blockId)
+ if (info != null) {
+ info.synchronized {
+ val level = info.level
+ if (level.useDisk && !diskStore.contains(blockId)) {
+ logInfo("Writing block " + blockId + " to disk")
+ data match {
+ case Left(elements) =>
+ diskStore.putValues(blockId, elements, level, false)
+ case Right(bytes) =>
+ diskStore.putBytes(blockId, bytes, level)
+ }
+ }
+ memoryStore.remove(blockId)
+ if (info.tellMaster) {
+ reportBlockStatus(blockId)
+ }
+ if (!level.useDisk) {
+ // The block is completely gone from this node; forget it so we can put() it again later.
+ blockInfo.remove(blockId)
}
}
- memoryStore.remove(blockId)
- if (info.tellMaster) {
- reportBlockStatus(blockId)
- }
- if (!level.useDisk) {
- // The block is completely gone from this node; forget it so we can put() it again later.
- blockInfo.remove(blockId)
- }
+ } else {
+ // The block has already been dropped
}
}
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index 074ca2b8a4..241200c07f 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -18,12 +18,16 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
private var currentMemory = 0L
+ // Object used to ensure that only one thread is putting blocks and if necessary, dropping
+ // blocks from the memory store.
+ private val putLock = new Object()
+
logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
def freeMemory: Long = maxMemory - currentMemory
override def getSize(blockId: String): Long = {
- synchronized {
+ entries.synchronized {
entries.get(blockId).size
}
}
@@ -60,7 +64,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def getBytes(blockId: String): Option[ByteBuffer] = {
- val entry = synchronized {
+ val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
@@ -73,7 +77,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def getValues(blockId: String): Option[Iterator[Any]] = {
- val entry = synchronized {
+ val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
@@ -87,7 +91,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def remove(blockId: String) {
- synchronized {
+ entries.synchronized {
val entry = entries.get(blockId)
if (entry != null) {
entries.remove(blockId)
@@ -101,7 +105,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def clear() {
- synchronized {
+ entries.synchronized {
entries.clear()
}
logInfo("MemoryStore cleared")
@@ -122,12 +126,22 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* Try to put in a set of values, if we can free up enough space. The value should either be
* an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
* size must also be passed by the caller.
+ *
+ * Locks on the object putLock to ensure that all the put requests and its associated block
+ * dropping is done by only on thread at a time. Otherwise while one thread is dropping
+ * blocks to free memory for one block, another thread may use up the freed space for
+ * another block.
*/
private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = {
- synchronized {
+ // TODO: Its possible to optimize the locking by locking entries only when selecting blocks
+ // to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been
+ // released, it must be ensured that those to-be-dropped blocks are not double counted for
+ // freeing up more space for another block that needs to be put. Only then the actually dropping
+ // of blocks (and writing to disk if necessary) can proceed in parallel.
+ putLock.synchronized {
if (ensureFreeSpace(blockId, size)) {
val entry = new Entry(value, size, deserialized)
- entries.put(blockId, entry)
+ entries.synchronized { entries.put(blockId, entry) }
currentMemory += size
if (deserialized) {
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
@@ -157,10 +171,11 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
* don't fit into memory that we want to avoid).
*
- * Assumes that a lock on the MemoryStore is held by the caller. (Otherwise, the freed space
- * might fill up before the caller puts in their new value.)
+ * Assumes that a lock is held by the caller to ensure only one thread is dropping blocks.
+ * Otherwise, the freed space may fill up before the caller puts in their new value.
*/
private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
+
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
space, currentMemory, maxMemory))
@@ -169,36 +184,44 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
return false
}
- // TODO: This should relinquish the lock on the MemoryStore while flushing out old blocks
- // in order to allow parallelism in writing to disk
if (maxMemory - currentMemory < space) {
val rddToAdd = getRddId(blockIdToAdd)
val selectedBlocks = new ArrayBuffer[String]()
var selectedMemory = 0L
- val iterator = entries.entrySet().iterator()
- while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
- val pair = iterator.next()
- val blockId = pair.getKey
- if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
- logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
- "block from the same RDD")
- return false
+ // This is synchronized to ensure that the set of entries is not changed
+ // (because of getValue or getBytes) while traversing the iterator, as that
+ // can lead to exceptions.
+ entries.synchronized {
+ val iterator = entries.entrySet().iterator()
+ while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
+ val pair = iterator.next()
+ val blockId = pair.getKey
+ if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
+ logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
+ "block from the same RDD")
+ return false
+ }
+ selectedBlocks += blockId
+ selectedMemory += pair.getValue.size
}
- selectedBlocks += blockId
- selectedMemory += pair.getValue.size
}
if (maxMemory - (currentMemory - selectedMemory) >= space) {
logInfo(selectedBlocks.size + " blocks selected for dropping")
for (blockId <- selectedBlocks) {
- val entry = entries.get(blockId)
- val data = if (entry.deserialized) {
- Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
- } else {
- Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
+ val entry = entries.synchronized { entries.get(blockId) }
+ // This should never be null as only one thread should be dropping
+ // blocks and removing entries. However the check is still here for
+ // future safety.
+ if (entries != null) {
+ val data = if (entry.deserialized) {
+ Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
+ } else {
+ Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
+ }
+ blockManager.dropFromMemory(blockId, data)
}
- blockManager.dropFromMemory(blockId, data)
}
return true
} else {
@@ -209,7 +232,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def contains(blockId: String): Boolean = {
- synchronized { entries.containsKey(blockId) }
+ entries.synchronized { entries.containsKey(blockId) }
}
}
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
new file mode 100644
index 0000000000..13e2f20e64
--- /dev/null
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -0,0 +1,77 @@
+package spark.storage
+
+import akka.actor._
+
+import spark.KryoSerializer
+import java.util.concurrent.ArrayBlockingQueue
+import util.Random
+
+/**
+ * This class tests the BlockManager and MemoryStore for thread safety and
+ * deadlocks. It spawns a number of producer and consumer threads. Producer
+ * threads continuously pushes blocks into the BlockManager and consumer
+ * threads continuously retrieves the blocks form the BlockManager and tests
+ * whether the block is correct or not.
+ */
+private[spark] object ThreadingTest {
+
+ val numProducers = 5
+ val numBlocksPerProducer = 10000
+
+ private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread {
+ val queue = new ArrayBlockingQueue[(String, Seq[Int])](100)
+
+ override def run() {
+ for (i <- 1 to numBlocksPerProducer) {
+ val blockId = "b-" + id + "-" + i
+ val blockSize = Random.nextInt(1000)
+ val block = (1 to blockSize).map(_ => Random.nextInt())
+ val level = if (Random.nextBoolean()) StorageLevel.MEMORY_ONLY_SER else StorageLevel.MEMORY_AND_DISK
+ val startTime = System.currentTimeMillis()
+ manager.put(blockId, block.iterator, level, true)
+ println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
+ queue.add((blockId, block))
+ }
+ println("Producer thread " + id + " terminated")
+ }
+ }
+
+ private[spark] class ConsumerThread(manager: BlockManager, queue: ArrayBlockingQueue[(String, Seq[Int])]) extends Thread {
+ var numBlockConsumed = 0
+
+ override def run() {
+ println("Consumer thread started")
+ while(numBlockConsumed < numBlocksPerProducer) {
+ val (blockId, block) = queue.take()
+ val startTime = System.currentTimeMillis()
+ manager.get(blockId) match {
+ case Some(retrievedBlock) =>
+ assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, "Block " + blockId + " did not match")
+ println("Got block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
+ case None =>
+ assert(false, "Block " + blockId + " could not be retrieved")
+ }
+ numBlockConsumed += 1
+ }
+ println("Consumer thread terminated")
+ }
+ }
+
+ def main(args: Array[String]) {
+ System.setProperty("spark.kryoserializer.buffer.mb", "1")
+ val actorSystem = ActorSystem("test")
+ val serializer = new KryoSerializer
+ val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true)
+ val blockManager = new BlockManager(blockManagerMaster, serializer, 1024 * 1024)
+ val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
+ val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
+ producers.foreach(_.start)
+ consumers.foreach(_.start)
+ producers.foreach(_.join)
+ consumers.foreach(_.join)
+ blockManager.stop()
+ blockManagerMaster.stop()
+ actorSystem.shutdown()
+ actorSystem.awaitTermination()
+ }
+}