aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authortdas <tathagata.das1565@gmail.com>2012-11-11 22:56:14 +0000
committertdas <tathagata.das1565@gmail.com>2012-11-11 22:56:14 +0000
commit052d0b800ffe1bcfddc33a6fb3ad71e169b219bb (patch)
tree21f6c62356ea0c07395a39b12f462a31497055f8 /core
parent52d21cb682d1c4ca05e6823f8049ccedc3c5530c (diff)
parent46222dc56db4a521bd613bd3fac5b91868bb339e (diff)
downloadspark-052d0b800ffe1bcfddc33a6fb3ad71e169b219bb.tar.gz
spark-052d0b800ffe1bcfddc33a6fb3ad71e169b219bb.tar.bz2
spark-052d0b800ffe1bcfddc33a6fb3ad71e169b219bb.zip
Merge branch 'dev' of github.com:radlab/spark into dev
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala123
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala281
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala79
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala30
4 files changed, 227 insertions, 286 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index bd9155ef29..70d6d8369d 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -50,16 +50,6 @@ private[spark]
case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
-
-private[spark] class BlockLocker(numLockers: Int) {
- private val hashLocker = Array.fill(numLockers)(new Object())
-
- def getLock(blockId: String): Object = {
- return hashLocker(math.abs(blockId.hashCode % numLockers))
- }
-}
-
-
private[spark]
class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
extends Logging {
@@ -87,10 +77,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- private val NUM_LOCKS = 337
- private val locker = new BlockLocker(NUM_LOCKS)
-
- private val blockInfo = new ConcurrentHashMap[String, BlockInfo]()
+ private val blockInfo = new ConcurrentHashMap[String, BlockInfo](1000)
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore: BlockStore =
@@ -110,7 +97,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val maxBytesInFlight =
System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
+ // Whether to compress broadcast variables that are stored
val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean
+ // Whether to compress shuffle output that are stored
val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean
// Whether to compress RDD partitions that are stored serialized
val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
@@ -131,8 +120,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* BlockManagerWorker actor.
*/
private def initialize() {
- master.mustRegisterBlockManager(
- RegisterBlockManager(blockManagerId, maxMemory))
+ master.registerBlockManager(blockManagerId, maxMemory)
BlockManagerWorker.startBlockManagerWorker(this)
}
@@ -150,28 +138,27 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*/
def reportBlockStatus(blockId: String) {
- locker.getLock(blockId).synchronized {
- val curLevel = blockInfo.get(blockId) match {
- case null =>
- StorageLevel.NONE
- case info =>
+ val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match {
+ case null =>
+ (StorageLevel.NONE, 0L, 0L)
+ case info =>
+ info.synchronized {
info.level match {
case null =>
- StorageLevel.NONE
+ (StorageLevel.NONE, 0L, 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
- new StorageLevel(onDisk, inMem, level.deserialized, level.replication)
+ (
+ new StorageLevel(onDisk, inMem, level.deserialized, level.replication),
+ if (inMem) memoryStore.getSize(blockId) else 0L,
+ if (onDisk) diskStore.getSize(blockId) else 0L
+ )
}
- }
- master.mustHeartBeat(HeartBeat(
- blockManagerId,
- blockId,
- curLevel,
- if (curLevel.useMemory) memoryStore.getSize(blockId) else 0L,
- if (curLevel.useDisk) diskStore.getSize(blockId) else 0L))
- logDebug("Told master about block " + blockId)
+ }
}
+ master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)
+ logDebug("Told master about block " + blockId)
}
/**
@@ -179,7 +166,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
*/
def getLocations(blockId: String): Seq[String] = {
val startTimeMs = System.currentTimeMillis
- var managers = master.mustGetLocations(GetLocations(blockId))
+ var managers = master.getLocations(blockId)
val locations = managers.map(_.ip)
logDebug("Get block locations in " + Utils.getUsedTimeMs(startTimeMs))
return locations
@@ -190,8 +177,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
*/
def getLocations(blockIds: Array[String]): Array[Seq[String]] = {
val startTimeMs = System.currentTimeMillis
- val locations = master.mustGetLocationsMultipleBlockIds(
- GetLocationsMultipleBlockIds(blockIds)).map(_.map(_.ip).toSeq).toArray
+ val locations = master.getLocations(blockIds).map(_.map(_.ip).toSeq).toArray
logDebug("Get multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
return locations
}
@@ -213,9 +199,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- locker.getLock(blockId).synchronized {
- val info = blockInfo.get(blockId)
- if (info != null) {
+ val info = blockInfo.get(blockId)
+ if (info != null) {
+ info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
val level = info.level
logDebug("Level for block " + blockId + " is " + level)
@@ -273,9 +259,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
}
- } else {
- logDebug("Block " + blockId + " not registered locally")
}
+ } else {
+ logDebug("Block " + blockId + " not registered locally")
}
return None
}
@@ -298,9 +284,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- locker.getLock(blockId).synchronized {
- val info = blockInfo.get(blockId)
- if (info != null) {
+ val info = blockInfo.get(blockId)
+ if (info != null) {
+ info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
val level = info.level
logDebug("Level for block " + blockId + " is " + level)
@@ -338,10 +324,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
}
- } else {
- logDebug("Block " + blockId + " not registered locally")
}
+ } else {
+ logDebug("Block " + blockId + " not registered locally")
}
+
return None
}
@@ -354,7 +341,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
logDebug("Getting remote block " + blockId)
// Get locations of block
- val locations = master.mustGetLocations(GetLocations(blockId))
+ val locations = master.getLocations(blockId)
// Get block from remote locations
for (loc <- locations) {
@@ -583,7 +570,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Size of the block in bytes (to return to caller)
var size = 0L
- locker.getLock(blockId).synchronized {
+ myInfo.synchronized {
logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
@@ -681,7 +668,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
null
}
- locker.getLock(blockId).synchronized {
+ myInfo.synchronized {
logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
@@ -732,7 +719,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val tLevel: StorageLevel =
new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
if (cachedPeers == null) {
- cachedPeers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1))
+ cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
}
for (peer: BlockManagerId <- cachedPeers) {
val start = System.nanoTime
@@ -779,26 +766,30 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
*/
def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
logInfo("Dropping block " + blockId + " from memory")
- locker.getLock(blockId).synchronized {
- val info = blockInfo.get(blockId)
- val level = info.level
- if (level.useDisk && !diskStore.contains(blockId)) {
- logInfo("Writing block " + blockId + " to disk")
- data match {
- case Left(elements) =>
- diskStore.putValues(blockId, elements, level, false)
- case Right(bytes) =>
- diskStore.putBytes(blockId, bytes, level)
+ val info = blockInfo.get(blockId)
+ if (info != null) {
+ info.synchronized {
+ val level = info.level
+ if (level.useDisk && !diskStore.contains(blockId)) {
+ logInfo("Writing block " + blockId + " to disk")
+ data match {
+ case Left(elements) =>
+ diskStore.putValues(blockId, elements, level, false)
+ case Right(bytes) =>
+ diskStore.putBytes(blockId, bytes, level)
+ }
+ }
+ memoryStore.remove(blockId)
+ if (info.tellMaster) {
+ reportBlockStatus(blockId)
+ }
+ if (!level.useDisk) {
+ // The block is completely gone from this node; forget it so we can put() it again later.
+ blockInfo.remove(blockId)
}
}
- memoryStore.remove(blockId)
- if (info.tellMaster) {
- reportBlockStatus(blockId)
- }
- if (!level.useDisk) {
- // The block is completely gone from this node; forget it so we can put() it again later.
- blockInfo.remove(blockId)
- }
+ } else {
+ // The block has already been dropped
}
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index b3345623b3..4d5ee8318c 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -26,7 +26,7 @@ case class RegisterBlockManager(
extends ToBlockManagerMaster
private[spark]
-class HeartBeat(
+class UpdateBlockInfo(
var blockManagerId: BlockManagerId,
var blockId: String,
var storageLevel: StorageLevel,
@@ -57,17 +57,17 @@ class HeartBeat(
}
private[spark]
-object HeartBeat {
+object UpdateBlockInfo {
def apply(blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long): HeartBeat = {
- new HeartBeat(blockManagerId, blockId, storageLevel, memSize, diskSize)
+ diskSize: Long): UpdateBlockInfo = {
+ new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
}
// For pattern-matching
- def unapply(h: HeartBeat): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
+ def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
}
}
@@ -182,8 +182,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
case RegisterBlockManager(blockManagerId, maxMemSize) =>
register(blockManagerId, maxMemSize)
- case HeartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
- heartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size)
+ case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
+ updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
case GetLocations(blockId) =>
getLocations(blockId)
@@ -233,7 +233,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
sender ! true
}
- private def heartBeat(
+ private def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
@@ -245,7 +245,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
- logDebug("Got in heartBeat 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
+ logDebug("Got in updateBlockInfo 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
sender ! true
}
@@ -350,211 +350,124 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Boolean, isLocal: Boolean)
extends Logging {
- val AKKA_ACTOR_NAME: String = "BlockMasterManager"
- val REQUEST_RETRY_INTERVAL_MS = 100
- val DEFAULT_MASTER_IP: String = System.getProperty("spark.master.host", "localhost")
- val DEFAULT_MASTER_PORT: Int = System.getProperty("spark.master.port", "7077").toInt
- val DEFAULT_MANAGER_IP: String = Utils.localHostName()
- val DEFAULT_MANAGER_PORT: String = "10902"
-
+ val actorName = "BlockMasterManager"
val timeout = 10.seconds
- var masterActor: ActorRef = null
+ val maxAttempts = 5
- if (isMaster) {
- masterActor = actorSystem.actorOf(
- Props(new BlockManagerMasterActor(isLocal)), name = AKKA_ACTOR_NAME)
+ var masterActor = if (isMaster) {
+ val actor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)), name = actorName)
logInfo("Registered BlockManagerMaster Actor")
+ actor
} else {
- val url = "akka://spark@%s:%s/user/%s".format(
- DEFAULT_MASTER_IP, DEFAULT_MASTER_PORT, AKKA_ACTOR_NAME)
+ val host = System.getProperty("spark.master.host", "localhost")
+ val port = System.getProperty("spark.master.port", "7077").toInt
+ val url = "akka://spark@%s:%s/user/%s".format(host, port, actorName)
+ val actor = actorSystem.actorFor(url)
logInfo("Connecting to BlockManagerMaster: " + url)
- masterActor = actorSystem.actorFor(url)
+ actor
}
- def stop() {
- if (masterActor != null) {
- communicate(StopBlockManagerMaster)
- masterActor = null
- logInfo("BlockManagerMaster stopped")
+ /**
+ * Send a message to the master actor and get its result within a default timeout, or
+ * throw a SparkException if this fails.
+ */
+ private def ask[T](message: Any): T = {
+ // TODO: Consider removing multiple attempts
+ if (masterActor == null) {
+ throw new SparkException("Error sending message to BlockManager as masterActor is null " +
+ "[message = " + message + "]")
}
- }
-
- // Send a message to the master actor and get its result within a default timeout, or
- // throw a SparkException if this fails.
- def askMaster(message: Any): Any = {
- try {
- val future = masterActor.ask(message)(timeout)
- return Await.result(future, timeout)
- } catch {
- case e: Exception =>
- throw new SparkException("Error communicating with BlockManagerMaster", e)
+ var attempts = 0
+ var lastException: Exception = null
+ while (attempts < maxAttempts) {
+ attempts += 1
+ try {
+ val future = masterActor.ask(message)(timeout)
+ val result = Await.result(future, timeout)
+ if (result == null) {
+ throw new Exception("BlockManagerMaster returned null")
+ }
+ return result.asInstanceOf[T]
+ } catch {
+ case ie: InterruptedException =>
+ throw ie
+ case e: Exception =>
+ lastException = e
+ logWarning(
+ "Error sending message to BlockManagerMaster in " + attempts + " attempts", e)
+ }
+ Thread.sleep(100)
}
+ throw new SparkException(
+ "Error sending message to BlockManagerMaster [message = " + message + "]", lastException)
}
- // Send a one-way message to the master actor, to which we expect it to reply with true.
- def communicate(message: Any) {
- if (askMaster(message) != true) {
- throw new SparkException("Error reply received from BlockManagerMaster")
+ /**
+ * Send a one-way message to the master actor, to which we expect it to reply with true
+ */
+ private def tell(message: Any) {
+ if (!ask[Boolean](message)) {
+ throw new SparkException("Telling master a message returned false")
}
}
- def notifyADeadHost(host: String) {
- communicate(RemoveHost(host + ":" + DEFAULT_MANAGER_PORT))
- logInfo("Removed " + host + " successfully in notifyADeadHost")
- }
-
- def mustRegisterBlockManager(msg: RegisterBlockManager) {
+ /**
+ * Register the BlockManager's id with the master
+ */
+ def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long) {
logInfo("Trying to register BlockManager")
- while (! syncRegisterBlockManager(msg)) {
- logWarning("Failed to register " + msg)
- Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
- }
- logInfo("Done registering BlockManager")
- }
-
- def syncRegisterBlockManager(msg: RegisterBlockManager): Boolean = {
- //val masterActor = RemoteActor.select(node, name)
- val startTimeMs = System.currentTimeMillis()
- val tmp = " msg " + msg + " "
- logDebug("Got in syncRegisterBlockManager 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
- try {
- communicate(msg)
- logInfo("BlockManager registered successfully @ syncRegisterBlockManager")
- logDebug("Got in syncRegisterBlockManager 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
- return true
- } catch {
- case e: Exception =>
- logError("Failed in syncRegisterBlockManager", e)
- return false
- }
+ tell(RegisterBlockManager(blockManagerId, maxMemSize))
+ logInfo("Registered BlockManager")
}
- def mustHeartBeat(msg: HeartBeat) {
- while (! syncHeartBeat(msg)) {
- logWarning("Failed to send heartbeat" + msg)
- Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
- }
- }
-
- def syncHeartBeat(msg: HeartBeat): Boolean = {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " msg " + msg + " "
- logDebug("Got in syncHeartBeat " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
-
- try {
- communicate(msg)
- logDebug("Heartbeat sent successfully")
- logDebug("Got in syncHeartBeat 1 " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
- return true
- } catch {
- case e: Exception =>
- logError("Failed in syncHeartBeat", e)
- return false
- }
+ def updateBlockInfo(
+ blockManagerId: BlockManagerId,
+ blockId: String,
+ storageLevel: StorageLevel,
+ memSize: Long,
+ diskSize: Long
+ ) {
+ tell(UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
+ logInfo("Updated info of block " + blockId)
}
- def mustGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
- var res = syncGetLocations(msg)
- while (res == null) {
- logInfo("Failed to get locations " + msg)
- Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
- res = syncGetLocations(msg)
- }
- return res
+ /** Get locations of the blockId from the master */
+ def getLocations(blockId: String): Seq[BlockManagerId] = {
+ ask[Seq[BlockManagerId]](GetLocations(blockId))
}
- def syncGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " msg " + msg + " "
- logDebug("Got in syncGetLocations 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
- try {
- val answer = askMaster(msg).asInstanceOf[ArrayBuffer[BlockManagerId]]
- if (answer != null) {
- logDebug("GetLocations successful")
- logDebug("Got in syncGetLocations 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
- return answer
- } else {
- logError("Master replied null in response to GetLocations")
- return null
- }
- } catch {
- case e: Exception =>
- logError("GetLocations failed", e)
- return null
- }
+ /** Get locations of multiple blockIds from the master */
+ def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
+ ask[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
}
- def mustGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
- Seq[Seq[BlockManagerId]] = {
- var res: Seq[Seq[BlockManagerId]] = syncGetLocationsMultipleBlockIds(msg)
- while (res == null) {
- logWarning("Failed to GetLocationsMultipleBlockIds " + msg)
- Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
- res = syncGetLocationsMultipleBlockIds(msg)
+ /** Get ids of other nodes in the cluster from the master */
+ def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
+ val result = ask[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
+ if (result.length != numPeers) {
+ throw new SparkException(
+ "Error getting peers, only got " + result.size + " instead of " + numPeers)
}
- return res
+ result
}
- def syncGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
- Seq[Seq[BlockManagerId]] = {
- val startTimeMs = System.currentTimeMillis
- val tmp = " msg " + msg + " "
- logDebug("Got in syncGetLocationsMultipleBlockIds 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
- try {
- val answer = askMaster(msg).asInstanceOf[Seq[Seq[BlockManagerId]]]
- if (answer != null) {
- logDebug("GetLocationsMultipleBlockIds successful")
- logDebug("Got in syncGetLocationsMultipleBlockIds 1 " + tmp +
- Utils.getUsedTimeMs(startTimeMs))
- return answer
- } else {
- logError("Master replied null in response to GetLocationsMultipleBlockIds")
- return null
- }
- } catch {
- case e: Exception =>
- logError("GetLocationsMultipleBlockIds failed", e)
- return null
- }
+ /** Notify the master of a dead node */
+ def notifyADeadHost(host: String) {
+ tell(RemoveHost(host + ":10902"))
+ logInfo("Told BlockManagerMaster to remove dead host " + host)
}
- def mustGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
- var res = syncGetPeers(msg)
- while ((res == null) || (res.length != msg.size)) {
- logInfo("Failed to get peers " + msg)
- Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
- res = syncGetPeers(msg)
- }
-
- return res
+ /** Get the memory status form the master */
+ def getMemoryStatus(): Map[BlockManagerId, (Long, Long)] = {
+ ask[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}
- def syncGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
- val startTimeMs = System.currentTimeMillis
- val tmp = " msg " + msg + " "
- logDebug("Got in syncGetPeers 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
- try {
- val answer = askMaster(msg).asInstanceOf[Seq[BlockManagerId]]
- if (answer != null) {
- logDebug("GetPeers successful")
- logDebug("Got in syncGetPeers 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
- return answer
- } else {
- logError("Master replied null in response to GetPeers")
- return null
- }
- } catch {
- case e: Exception =>
- logError("GetPeers failed", e)
- return null
+ /** Stop the master actor, called only on the Spark master node */
+ def stop() {
+ if (masterActor != null) {
+ tell(StopBlockManagerMaster)
+ masterActor = null
+ logInfo("BlockManagerMaster stopped")
}
}
-
- def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
- askMaster(GetMemoryStatus).asInstanceOf[Map[BlockManagerId, (Long, Long)]]
- }
}
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index 773970446a..09769d1f7d 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -17,13 +17,16 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
private var currentMemory = 0L
+ // Object used to ensure that only one thread is putting blocks and if necessary, dropping
+ // blocks from the memory store.
+ private val putLock = new Object()
logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
def freeMemory: Long = maxMemory - currentMemory
override def getSize(blockId: String): Long = {
- synchronized {
+ entries.synchronized {
entries.get(blockId).size
}
}
@@ -38,8 +41,6 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
tryToPut(blockId, elements, sizeEstimate, true)
} else {
val entry = new Entry(bytes, bytes.limit, false)
- ensureFreeSpace(blockId, bytes.limit)
- synchronized { entries.put(blockId, entry) }
tryToPut(blockId, bytes, bytes.limit, false)
}
}
@@ -63,7 +64,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def getBytes(blockId: String): Option[ByteBuffer] = {
- val entry = synchronized {
+ val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
@@ -76,7 +77,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def getValues(blockId: String): Option[Iterator[Any]] = {
- val entry = synchronized {
+ val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
@@ -90,7 +91,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def remove(blockId: String) {
- synchronized {
+ entries.synchronized {
val entry = entries.get(blockId)
if (entry != null) {
entries.remove(blockId)
@@ -104,7 +105,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def clear() {
- synchronized {
+ entries.synchronized {
entries.clear()
}
logInfo("MemoryStore cleared")
@@ -125,12 +126,22 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* Try to put in a set of values, if we can free up enough space. The value should either be
* an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
* size must also be passed by the caller.
+ *
+ * Locks on the object putLock to ensure that all the put requests and its associated block
+ * dropping is done by only on thread at a time. Otherwise while one thread is dropping
+ * blocks to free memory for one block, another thread may use up the freed space for
+ * another block.
*/
private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = {
- synchronized {
+ // TODO: Its possible to optimize the locking by locking entries only when selecting blocks
+ // to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been
+ // released, it must be ensured that those to-be-dropped blocks are not double counted for
+ // freeing up more space for another block that needs to be put. Only then the actually dropping
+ // of blocks (and writing to disk if necessary) can proceed in parallel.
+ putLock.synchronized {
if (ensureFreeSpace(blockId, size)) {
val entry = new Entry(value, size, deserialized)
- entries.put(blockId, entry)
+ entries.synchronized { entries.put(blockId, entry) }
currentMemory += size
if (deserialized) {
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
@@ -160,8 +171,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
* don't fit into memory that we want to avoid).
*
- * Assumes that a lock on the MemoryStore is held by the caller. (Otherwise, the freed space
- * might fill up before the caller puts in their new value.)
+ * Assumes that a lock is held by the caller to ensure only one thread is dropping blocks.
+ * Otherwise, the freed space may fill up before the caller puts in their new value.
*/
private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
@@ -172,36 +183,44 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
return false
}
- // TODO: This should relinquish the lock on the MemoryStore while flushing out old blocks
- // in order to allow parallelism in writing to disk
if (maxMemory - currentMemory < space) {
val rddToAdd = getRddId(blockIdToAdd)
val selectedBlocks = new ArrayBuffer[String]()
var selectedMemory = 0L
- val iterator = entries.entrySet().iterator()
- while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
- val pair = iterator.next()
- val blockId = pair.getKey
- if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
- logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
- "block from the same RDD")
- return false
+ // This is synchronized to ensure that the set of entries is not changed
+ // (because of getValue or getBytes) while traversing the iterator, as that
+ // can lead to exceptions.
+ entries.synchronized {
+ val iterator = entries.entrySet().iterator()
+ while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
+ val pair = iterator.next()
+ val blockId = pair.getKey
+ if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
+ logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
+ "block from the same RDD")
+ return false
+ }
+ selectedBlocks += blockId
+ selectedMemory += pair.getValue.size
}
- selectedBlocks += blockId
- selectedMemory += pair.getValue.size
}
if (maxMemory - (currentMemory - selectedMemory) >= space) {
logInfo(selectedBlocks.size + " blocks selected for dropping")
for (blockId <- selectedBlocks) {
- val entry = entries.get(blockId)
- val data = if (entry.deserialized) {
- Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
- } else {
- Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
+ val entry = entries.synchronized { entries.get(blockId) }
+ // This should never be null as only one thread should be dropping
+ // blocks and removing entries. However the check is still here for
+ // future safety.
+ if (entry != null) {
+ val data = if (entry.deserialized) {
+ Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
+ } else {
+ Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
+ }
+ blockManager.dropFromMemory(blockId, data)
}
- blockManager.dropFromMemory(blockId, data)
}
return true
} else {
@@ -212,7 +231,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def contains(blockId: String): Boolean = {
- synchronized { entries.containsKey(blockId) }
+ entries.synchronized { entries.containsKey(blockId) }
}
}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index b9c19e61cd..0e78228134 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -20,9 +20,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
var oldOops: String = null
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+ System.setProperty("spark.kryoserializer.buffer.mb", "1")
val serializer = new KryoSerializer
before {
+
actorSystem = ActorSystem("test")
master = new BlockManagerMaster(actorSystem, true, true)
@@ -55,7 +57,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
}
- test("manager-master interaction") {
+ test("master + 1 manager interaction") {
store = new BlockManager(master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
@@ -72,17 +74,33 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a3") != None, "a3 was not in store")
// Checking whether master knows about the blocks or not
- assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
- assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
- assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
+ assert(master.getLocations("a1").size === 1, "master was not told about a1")
+ assert(master.getLocations("a2").size === 1, "master was not told about a2")
+ assert(master.getLocations("a3").size === 0, "master was told about a3")
// Drop a1 and a2 from memory; this should be reported back to the master
store.dropFromMemory("a1", null)
store.dropFromMemory("a2", null)
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
- assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
- assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
+ assert(master.getLocations("a1").size === 0, "master did not remove a1")
+ assert(master.getLocations("a2").size === 0, "master did not remove a2")
+ }
+
+ test("master + 2 managers interaction") {
+ store = new BlockManager(master, serializer, 2000)
+ val otherStore = new BlockManager(master, new KryoSerializer, 2000)
+
+ val peers = master.getPeers(store.blockManagerId, 1)
+ assert(peers.size === 1, "master did not return the other manager as a peer")
+ assert(peers.head === otherStore.blockManagerId, "peer returned by master is not the other manager")
+
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
+ otherStore.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
+ assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
+ assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
}
test("in-memory LRU storage") {