aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-12-20 11:33:38 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-12-20 11:33:38 -0800
commit5e51b889feac28787bfdb546b04d4b3752f3d8d1 (patch)
tree0b0d097787e7d479632252e1a188560370aec628 /core
parente7051767f70de0b1696a6d540a71492820988c38 (diff)
parent9397c5014e17a96c3cf24661c0edb40e524589e7 (diff)
downloadspark-5e51b889feac28787bfdb546b04d4b3752f3d8d1.tar.gz
spark-5e51b889feac28787bfdb546b04d4b3752f3d8d1.tar.bz2
spark-5e51b889feac28787bfdb546b04d4b3752f3d8d1.zip
Merge pull request #327 from rxin/spark-633
Added the ability in block manager to remove blocks.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala11
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala202
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerId.scala48
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala727
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala401
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMessages.scala102
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala16
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala7
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala5
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala5
-rw-r--r--core/src/main/scala/spark/storage/StorageLevel.scala32
-rw-r--r--core/src/main/scala/spark/storage/ThreadingTest.scala13
-rw-r--r--core/src/main/scala/spark/util/IdGenerator.scala14
-rw-r--r--core/src/main/scala/spark/util/MetadataCleaner.scala35
-rw-r--r--core/src/main/scala/spark/util/TimeStampedHashMap.scala87
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala160
16 files changed, 1096 insertions, 769 deletions
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 272d7cdad3..41441720a7 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -86,10 +86,13 @@ object SparkEnv extends Logging {
}
val serializer = instantiateClass[Serializer]("spark.serializer", "spark.JavaSerializer")
-
- val blockManagerMaster = new BlockManagerMaster(actorSystem, isMaster, isLocal)
+
+ val masterIp: String = System.getProperty("spark.master.host", "localhost")
+ val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
+ val blockManagerMaster = new BlockManagerMaster(
+ actorSystem, isMaster, isLocal, masterIp, masterPort)
val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer)
-
+
val connectionManager = blockManager.connectionManager
val broadcastManager = new BroadcastManager(isMaster)
@@ -104,7 +107,7 @@ object SparkEnv extends Logging {
val shuffleFetcher = instantiateClass[ShuffleFetcher](
"spark.shuffle.fetcher", "spark.BlockStoreShuffleFetcher")
-
+
val httpFileServer = new HttpFileServer()
httpFileServer.initialize()
System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index df295b1820..7a8ac10cdd 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -1,59 +1,39 @@
package spark.storage
-import akka.actor.{ActorSystem, Cancellable}
+import java.io.{InputStream, OutputStream}
+import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
+import scala.collection.JavaConversions._
+
+import akka.actor.{ActorSystem, Cancellable, Props}
import akka.dispatch.{Await, Future}
import akka.util.Duration
import akka.util.duration._
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
-import java.io.{InputStream, OutputStream, Externalizable, ObjectInput, ObjectOutput}
-import java.nio.{MappedByteBuffer, ByteBuffer}
-import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
-import scala.collection.JavaConversions._
+import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils}
import spark.network._
import spark.serializer.Serializer
-import spark.util.ByteBufferInputStream
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-import sun.nio.ch.DirectBuffer
-
-
-private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
- def this() = this(null, 0) // For deserialization only
-
- def this(in: ObjectInput) = this(in.readUTF(), in.readInt())
-
- override def writeExternal(out: ObjectOutput) {
- out.writeUTF(ip)
- out.writeInt(port)
- }
+import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap}
- override def readExternal(in: ObjectInput) {
- ip = in.readUTF()
- port = in.readInt()
- }
-
- override def toString = "BlockManagerId(" + ip + ", " + port + ")"
-
- override def hashCode = ip.hashCode * 41 + port
+import sun.nio.ch.DirectBuffer
- override def equals(that: Any) = that match {
- case id: BlockManagerId => port == id.port && ip == id.ip
- case _ => false
- }
-}
private[spark]
case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
private[spark]
-class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
- val serializer: Serializer, maxMemory: Long)
+class BlockManager(
+ actorSystem: ActorSystem,
+ val master: BlockManagerMaster,
+ val serializer: Serializer,
+ maxMemory: Long)
extends Logging {
class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
@@ -79,7 +59,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
}
}
- private val blockInfo = new ConcurrentHashMap[String, BlockInfo](1000)
+ private val blockInfo = new TimeStampedHashMap[String, BlockInfo]
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore: BlockStore =
@@ -110,16 +90,20 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
val host = System.getProperty("spark.hostname", Utils.localHostName())
+ val slaveActor = master.actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
+ name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
+
@volatile private var shuttingDown = false
private def heartBeat() {
- if (!master.mustHeartBeat(HeartBeat(blockManagerId))) {
+ if (!master.sendHeartBeat(blockManagerId)) {
reregister()
}
}
var heartBeatTask: Cancellable = null
+ val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
initialize()
/**
@@ -134,8 +118,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
* BlockManagerWorker actor.
*/
private def initialize() {
- master.mustRegisterBlockManager(
- RegisterBlockManager(blockManagerId, maxMemory))
+ master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
BlockManagerWorker.startBlockManagerWorker(this)
if (!BlockManager.getDisableHeartBeatsForTesting) {
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
@@ -156,8 +139,8 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
*/
private def reportAllBlocks() {
logInfo("Reporting " + blockInfo.size + " blocks to the master.")
- for (blockId <- blockInfo.keys) {
- if (!tryToReportBlockStatus(blockId)) {
+ for ((blockId, info) <- blockInfo) {
+ if (!tryToReportBlockStatus(blockId, info)) {
logError("Failed to report " + blockId + " to master; giving up.")
return
}
@@ -171,26 +154,22 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
def reregister() {
// TODO: We might need to rate limit reregistering.
logInfo("BlockManager reregistering with master")
- master.mustRegisterBlockManager(
- RegisterBlockManager(blockManagerId, maxMemory))
+ master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
reportAllBlocks()
}
/**
* Get storage level of local block. If no info exists for the block, then returns null.
*/
- def getLevel(blockId: String): StorageLevel = {
- val info = blockInfo.get(blockId)
- if (info != null) info.level else null
- }
+ def getLevel(blockId: String): StorageLevel = blockInfo.get(blockId).map(_.level).orNull
/**
* Tell the master about the current storage status of a block. This will send a block update
* message reflecting the current status, *not* the desired storage level in its block info.
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*/
- def reportBlockStatus(blockId: String) {
- val needReregister = !tryToReportBlockStatus(blockId)
+ def reportBlockStatus(blockId: String, info: BlockInfo) {
+ val needReregister = !tryToReportBlockStatus(blockId, info)
if (needReregister) {
logInfo("Got told to reregister updating block " + blockId)
// Reregistering will report our new block for free.
@@ -200,33 +179,27 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
}
/**
- * Actually send a BlockUpdate message. Returns the mater's response, which will be true if the
- * block was successfully recorded and false if the slave needs to re-register.
+ * Actually send a UpdateBlockInfo message. Returns the mater's response,
+ * which will be true if the block was successfully recorded and false if
+ * the slave needs to re-register.
*/
- private def tryToReportBlockStatus(blockId: String): Boolean = {
- val (curLevel, inMemSize, onDiskSize, tellMaster) = blockInfo.get(blockId) match {
- case null =>
- (StorageLevel.NONE, 0L, 0L, false)
- case info =>
- info.synchronized {
- info.level match {
- case null =>
- (StorageLevel.NONE, 0L, 0L, false)
- case level =>
- val inMem = level.useMemory && memoryStore.contains(blockId)
- val onDisk = level.useDisk && diskStore.contains(blockId)
- (
- new StorageLevel(onDisk, inMem, level.deserialized, level.replication),
- if (inMem) memoryStore.getSize(blockId) else 0L,
- if (onDisk) diskStore.getSize(blockId) else 0L,
- info.tellMaster
- )
- }
- }
+ private def tryToReportBlockStatus(blockId: String, info: BlockInfo): Boolean = {
+ val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized {
+ info.level match {
+ case null =>
+ (StorageLevel.NONE, 0L, 0L, false)
+ case level =>
+ val inMem = level.useMemory && memoryStore.contains(blockId)
+ val onDisk = level.useDisk && diskStore.contains(blockId)
+ val storageLevel = new StorageLevel(onDisk, inMem, level.deserialized, level.replication)
+ val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
+ val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
+ (storageLevel, memSize, diskSize, info.tellMaster)
+ }
}
if (tellMaster) {
- master.mustBlockUpdate(BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
+ master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)
} else {
true
}
@@ -238,7 +211,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
*/
def getLocations(blockId: String): Seq[String] = {
val startTimeMs = System.currentTimeMillis
- var managers = master.mustGetLocations(GetLocations(blockId))
+ var managers = master.getLocations(blockId)
val locations = managers.map(_.ip)
logDebug("Get block locations in " + Utils.getUsedTimeMs(startTimeMs))
return locations
@@ -249,8 +222,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
*/
def getLocations(blockIds: Array[String]): Array[Seq[String]] = {
val startTimeMs = System.currentTimeMillis
- val locations = master.mustGetLocationsMultipleBlockIds(
- GetLocationsMultipleBlockIds(blockIds)).map(_.map(_.ip).toSeq).toArray
+ val locations = master.getLocations(blockIds).map(_.map(_.ip).toSeq).toArray
logDebug("Get multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
return locations
}
@@ -272,7 +244,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
}
}
- val info = blockInfo.get(blockId)
+ val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
@@ -357,7 +329,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
}
}
- val info = blockInfo.get(blockId)
+ val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
@@ -413,7 +385,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
}
logDebug("Getting remote block " + blockId)
// Get locations of block
- val locations = master.mustGetLocations(GetLocations(blockId))
+ val locations = master.getLocations(blockId)
// Get block from remote locations
for (loc <- locations) {
@@ -615,7 +587,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
throw new IllegalArgumentException("Storage level is null or invalid")
}
- val oldBlock = blockInfo.get(blockId)
+ val oldBlock = blockInfo.get(blockId).orNull
if (oldBlock != null) {
logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
oldBlock.waitForReady()
@@ -670,7 +642,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
// and tell the master about it.
myInfo.markReady(size)
if (tellMaster) {
- reportBlockStatus(blockId)
+ reportBlockStatus(blockId, myInfo)
}
}
logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
@@ -716,7 +688,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
throw new IllegalArgumentException("Storage level is null or invalid")
}
- if (blockInfo.containsKey(blockId)) {
+ if (blockInfo.contains(blockId)) {
logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
return
}
@@ -757,7 +729,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
// and tell the master about it.
myInfo.markReady(bytes.limit)
if (tellMaster) {
- reportBlockStatus(blockId)
+ reportBlockStatus(blockId, myInfo)
}
}
@@ -791,7 +763,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
val tLevel: StorageLevel =
new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
if (cachedPeers == null) {
- cachedPeers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1))
+ cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
}
for (peer: BlockManagerId <- cachedPeers) {
val start = System.nanoTime
@@ -838,7 +810,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
*/
def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
logInfo("Dropping block " + blockId + " from memory")
- val info = blockInfo.get(blockId)
+ val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
val level = info.level
@@ -851,9 +823,12 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
diskStore.putBytes(blockId, bytes, level)
}
}
- memoryStore.remove(blockId)
+ val blockWasRemoved = memoryStore.remove(blockId)
+ if (!blockWasRemoved) {
+ logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")
+ }
if (info.tellMaster) {
- reportBlockStatus(blockId)
+ reportBlockStatus(blockId, info)
}
if (!level.useDisk) {
// The block is completely gone from this node; forget it so we can put() it again later.
@@ -865,6 +840,53 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
}
}
+ /**
+ * Remove a block from both memory and disk.
+ */
+ def removeBlock(blockId: String) {
+ logInfo("Removing block " + blockId)
+ val info = blockInfo.get(blockId).orNull
+ if (info != null) info.synchronized {
+ // Removals are idempotent in disk store and memory store. At worst, we get a warning.
+ val removedFromMemory = memoryStore.remove(blockId)
+ val removedFromDisk = diskStore.remove(blockId)
+ if (!removedFromMemory && !removedFromDisk) {
+ logWarning("Block " + blockId + " could not be removed as it was not found in either " +
+ "the disk or memory store")
+ }
+ blockInfo.remove(blockId)
+ if (info.tellMaster) {
+ reportBlockStatus(blockId, info)
+ }
+ } else {
+ // The block has already been removed; do nothing.
+ logWarning("Asked to remove block " + blockId + ", which does not exist")
+ }
+ }
+
+ def dropOldBlocks(cleanupTime: Long) {
+ logInfo("Dropping blocks older than " + cleanupTime)
+ val iterator = blockInfo.internalMap.entrySet().iterator()
+ while (iterator.hasNext) {
+ val entry = iterator.next()
+ val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
+ if (time < cleanupTime) {
+ info.synchronized {
+ val level = info.level
+ if (level.useMemory) {
+ memoryStore.remove(id)
+ }
+ if (level.useDisk) {
+ diskStore.remove(id)
+ }
+ iterator.remove()
+ logInfo("Dropped block " + id)
+ }
+ reportBlockStatus(id, info)
+ }
+ }
+ }
+
def shouldCompress(blockId: String): Boolean = {
if (blockId.startsWith("shuffle_")) {
compressShuffle
@@ -914,6 +936,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
heartBeatTask.cancel()
}
connectionManager.stop()
+ master.actorSystem.stop(slaveActor)
blockInfo.clear()
memoryStore.clear()
diskStore.clear()
@@ -923,6 +946,9 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
private[spark]
object BlockManager extends Logging {
+
+ val ID_GENERATOR = new IdGenerator
+
def getMaxMemoryFromSystemProperties: Long = {
val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala
new file mode 100644
index 0000000000..488679f049
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerId.scala
@@ -0,0 +1,48 @@
+package spark.storage
+
+import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
+import java.util.concurrent.ConcurrentHashMap
+
+
+private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
+ def this() = this(null, 0) // For deserialization only
+
+ def this(in: ObjectInput) = this(in.readUTF(), in.readInt())
+
+ override def writeExternal(out: ObjectOutput) {
+ out.writeUTF(ip)
+ out.writeInt(port)
+ }
+
+ override def readExternal(in: ObjectInput) {
+ ip = in.readUTF()
+ port = in.readInt()
+ }
+
+ @throws(classOf[IOException])
+ private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
+
+ override def toString = "BlockManagerId(" + ip + ", " + port + ")"
+
+ override def hashCode = ip.hashCode * 41 + port
+
+ override def equals(that: Any) = that match {
+ case id: BlockManagerId => port == id.port && ip == id.ip
+ case _ => false
+ }
+}
+
+
+private[spark] object BlockManagerId {
+
+ val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()
+
+ def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
+ if (blockManagerIdCache.containsKey(id)) {
+ blockManagerIdCache.get(id)
+ } else {
+ blockManagerIdCache.put(id, id)
+ id
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 0a4e68f437..a3d8671834 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -1,676 +1,167 @@
package spark.storage
-import java.io._
-import java.util.{HashMap => JHashMap}
-
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable.ArrayBuffer
import scala.util.Random
-import akka.actor._
-import akka.dispatch._
+import akka.actor.{Actor, ActorRef, ActorSystem, Props}
+import akka.dispatch.Await
import akka.pattern.ask
-import akka.remote._
import akka.util.{Duration, Timeout}
import akka.util.duration._
import spark.{Logging, SparkException, Utils}
-private[spark]
-sealed trait ToBlockManagerMaster
-
-private[spark]
-case class RegisterBlockManager(
- blockManagerId: BlockManagerId,
- maxMemSize: Long)
- extends ToBlockManagerMaster
-
-private[spark]
-case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
-
-private[spark]
-class BlockUpdate(
- var blockManagerId: BlockManagerId,
- var blockId: String,
- var storageLevel: StorageLevel,
- var memSize: Long,
- var diskSize: Long)
- extends ToBlockManagerMaster
- with Externalizable {
-
- def this() = this(null, null, null, 0, 0) // For deserialization only
-
- override def writeExternal(out: ObjectOutput) {
- blockManagerId.writeExternal(out)
- out.writeUTF(blockId)
- storageLevel.writeExternal(out)
- out.writeInt(memSize.toInt)
- out.writeInt(diskSize.toInt)
- }
-
- override def readExternal(in: ObjectInput) {
- blockManagerId = new BlockManagerId()
- blockManagerId.readExternal(in)
- blockId = in.readUTF()
- storageLevel = new StorageLevel()
- storageLevel.readExternal(in)
- memSize = in.readInt()
- diskSize = in.readInt()
- }
-}
-
-private[spark]
-object BlockUpdate {
- def apply(blockManagerId: BlockManagerId,
- blockId: String,
- storageLevel: StorageLevel,
- memSize: Long,
- diskSize: Long): BlockUpdate = {
- new BlockUpdate(blockManagerId, blockId, storageLevel, memSize, diskSize)
- }
-
- // For pattern-matching
- def unapply(h: BlockUpdate): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
- Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
- }
-}
-
-private[spark]
-case class GetLocations(blockId: String) extends ToBlockManagerMaster
-
-private[spark]
-case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
-
-private[spark]
-case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
-
-private[spark]
-case class RemoveHost(host: String) extends ToBlockManagerMaster
-
-private[spark]
-case object StopBlockManagerMaster extends ToBlockManagerMaster
-
-private[spark]
-case object GetMemoryStatus extends ToBlockManagerMaster
-
-private[spark]
-case object ExpireDeadHosts extends ToBlockManagerMaster
-
-
-private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
-
- class BlockManagerInfo(
- val blockManagerId: BlockManagerId,
- timeMs: Long,
- val maxMem: Long) {
- private var _lastSeenMs = timeMs
- private var _remainingMem = maxMem
- private val _blocks = new JHashMap[String, StorageLevel]
-
- logInfo("Registering block manager %s:%d with %s RAM".format(
- blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
-
- def updateLastSeenMs() {
- _lastSeenMs = System.currentTimeMillis()
- }
-
- def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
- : Unit = synchronized {
-
- updateLastSeenMs()
-
- if (_blocks.containsKey(blockId)) {
- // The block exists on the slave already.
- val originalLevel: StorageLevel = _blocks.get(blockId)
-
- if (originalLevel.useMemory) {
- _remainingMem += memSize
- }
- }
-
- if (storageLevel.isValid) {
- // isValid means it is either stored in-memory or on-disk.
- _blocks.put(blockId, storageLevel)
- if (storageLevel.useMemory) {
- _remainingMem -= memSize
- logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
- blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
- Utils.memoryBytesToString(_remainingMem)))
- }
- if (storageLevel.useDisk) {
- logInfo("Added %s on disk on %s:%d (size: %s)".format(
- blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
- }
- } else if (_blocks.containsKey(blockId)) {
- // If isValid is not true, drop the block.
- val originalLevel: StorageLevel = _blocks.get(blockId)
- _blocks.remove(blockId)
- if (originalLevel.useMemory) {
- _remainingMem += memSize
- logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
- blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
- Utils.memoryBytesToString(_remainingMem)))
- }
- if (originalLevel.useDisk) {
- logInfo("Removed %s on %s:%d on disk (size: %s)".format(
- blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
- }
- }
- }
-
- def remainingMem: Long = _remainingMem
-
- def lastSeenMs: Long = _lastSeenMs
-
- def blocks: JHashMap[String, StorageLevel] = _blocks
-
- override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
-
- def clear() {
- _blocks.clear()
- }
- }
-
- private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo]
- private val blockManagerIdByHost = new HashMap[String, BlockManagerId]
- private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
-
- initLogging()
-
- val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
- "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
-
- val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
- "5000").toLong
-
- var timeoutCheckingTask: Cancellable = null
+private[spark] class BlockManagerMaster(
+ val actorSystem: ActorSystem,
+ isMaster: Boolean,
+ isLocal: Boolean,
+ masterIp: String,
+ masterPort: Int)
+ extends Logging {
- override def preStart() {
- if (!BlockManager.getDisableHeartBeatsForTesting) {
- timeoutCheckingTask = context.system.scheduler.schedule(
- 0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
- }
- super.preStart()
- }
+ val AKKA_RETRY_ATTEMPS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
+ val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
- def removeBlockManager(blockManagerId: BlockManagerId) {
- val info = blockManagerInfo(blockManagerId)
- blockManagerIdByHost.remove(blockManagerId.ip)
- blockManagerInfo.remove(blockManagerId)
- var iterator = info.blocks.keySet.iterator
- while (iterator.hasNext) {
- val blockId = iterator.next
- val locations = blockInfo.get(blockId)._2
- locations -= blockManagerId
- if (locations.size == 0) {
- blockInfo.remove(locations)
- }
- }
- }
-
- def expireDeadHosts() {
- logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.")
- val now = System.currentTimeMillis()
- val minSeenTime = now - slaveTimeout
- val toRemove = new HashSet[BlockManagerId]
- for (info <- blockManagerInfo.values) {
- if (info.lastSeenMs < minSeenTime) {
- logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
- toRemove += info.blockManagerId
- }
- }
- // TODO: Remove corresponding block infos
- toRemove.foreach(removeBlockManager)
- }
-
- def removeHost(host: String) {
- logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
- logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
- blockManagerIdByHost.get(host).foreach(removeBlockManager)
- logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
- sender ! true
- }
+ val MASTER_AKKA_ACTOR_NAME = "BlockMasterManager"
+ val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager"
+ val DEFAULT_MANAGER_IP: String = Utils.localHostName()
- def heartBeat(blockManagerId: BlockManagerId) {
- if (!blockManagerInfo.contains(blockManagerId)) {
- if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
- sender ! true
- } else {
- sender ! false
- }
+ val timeout = 10.seconds
+ var masterActor: ActorRef = {
+ if (isMaster) {
+ val masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)),
+ name = MASTER_AKKA_ACTOR_NAME)
+ logInfo("Registered BlockManagerMaster Actor")
+ masterActor
} else {
- blockManagerInfo(blockManagerId).updateLastSeenMs()
- sender ! true
+ val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME)
+ logInfo("Connecting to BlockManagerMaster: " + url)
+ actorSystem.actorFor(url)
}
}
- def receive = {
- case RegisterBlockManager(blockManagerId, maxMemSize) =>
- register(blockManagerId, maxMemSize)
-
- case BlockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
- blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size)
-
- case GetLocations(blockId) =>
- getLocations(blockId)
-
- case GetLocationsMultipleBlockIds(blockIds) =>
- getLocationsMultipleBlockIds(blockIds)
-
- case GetPeers(blockManagerId, size) =>
- getPeersDeterministic(blockManagerId, size)
- /*getPeers(blockManagerId, size)*/
-
- case GetMemoryStatus =>
- getMemoryStatus
-
- case RemoveHost(host) =>
- removeHost(host)
- sender ! true
-
- case StopBlockManagerMaster =>
- logInfo("Stopping BlockManagerMaster")
- sender ! true
- if (timeoutCheckingTask != null) {
- timeoutCheckingTask.cancel
- }
- context.stop(self)
-
- case ExpireDeadHosts =>
- expireDeadHosts()
-
- case HeartBeat(blockManagerId) =>
- heartBeat(blockManagerId)
-
- case other =>
- logInfo("Got unknown message: " + other)
+ /** Remove a dead host from the master actor. This is only called on the master side. */
+ def notifyADeadHost(host: String) {
+ tell(RemoveHost(host))
+ logInfo("Removed " + host + " successfully in notifyADeadHost")
}
- // Return a map from the block manager id to max memory and remaining memory.
- private def getMemoryStatus() {
- val res = blockManagerInfo.map { case(blockManagerId, info) =>
- (blockManagerId, (info.maxMem, info.remainingMem))
- }.toMap
- sender ! res
+ /**
+ * Send the master actor a heart beat from the slave. Returns true if everything works out,
+ * false if the master does not know about the given block manager, which means the block
+ * manager should re-register.
+ */
+ def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = {
+ askMasterWithRetry[Boolean](HeartBeat(blockManagerId))
}
- private def register(blockManagerId: BlockManagerId, maxMemSize: Long) {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " " + blockManagerId + " "
- logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
- if (blockManagerIdByHost.contains(blockManagerId.ip) &&
- blockManagerIdByHost(blockManagerId.ip) != blockManagerId) {
- val oldId = blockManagerIdByHost(blockManagerId.ip)
- logInfo("Got second registration for host " + blockManagerId +
- "; removing old slave " + oldId)
- removeBlockManager(oldId)
- }
- if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
- logInfo("Got Register Msg from master node, don't register it")
- } else {
- blockManagerInfo += (blockManagerId -> new BlockManagerInfo(
- blockManagerId, System.currentTimeMillis(), maxMemSize))
- }
- blockManagerIdByHost += (blockManagerId.ip -> blockManagerId)
- logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
- sender ! true
+ /** Register the BlockManager's id with the master. */
+ def registerBlockManager(
+ blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+ logInfo("Trying to register BlockManager")
+ tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
+ logInfo("Registered BlockManager")
}
- private def blockUpdate(
+ def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long) {
-
- val startTimeMs = System.currentTimeMillis()
- val tmp = " " + blockManagerId + " " + blockId + " "
-
- if (!blockManagerInfo.contains(blockManagerId)) {
- if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
- // We intentionally do not register the master (except in local mode),
- // so we should not indicate failure.
- sender ! true
- } else {
- sender ! false
- }
- return
- }
-
- if (blockId == null) {
- blockManagerInfo(blockManagerId).updateLastSeenMs()
- logDebug("Got in block update 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
- sender ! true
- return
- }
-
- blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
-
- var locations: HashSet[BlockManagerId] = null
- if (blockInfo.containsKey(blockId)) {
- locations = blockInfo.get(blockId)._2
- } else {
- locations = new HashSet[BlockManagerId]
- blockInfo.put(blockId, (storageLevel.replication, locations))
- }
-
- if (storageLevel.isValid) {
- locations += blockManagerId
- } else {
- locations.remove(blockManagerId)
- }
-
- if (locations.size == 0) {
- blockInfo.remove(blockId)
- }
- sender ! true
+ diskSize: Long): Boolean = {
+ val res = askMasterWithRetry[Boolean](
+ UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
+ logInfo("Updated info of block " + blockId)
+ res
}
- private def getLocations(blockId: String) {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " " + blockId + " "
- logDebug("Got in getLocations 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
- if (blockInfo.containsKey(blockId)) {
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- res.appendAll(blockInfo.get(blockId)._2)
- logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at "
- + Utils.getUsedTimeMs(startTimeMs))
- sender ! res.toSeq
- } else {
- logDebug("Got in getLocations 2" + tmp + Utils.getUsedTimeMs(startTimeMs))
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- sender ! res
- }
+ /** Get locations of the blockId from the master */
+ def getLocations(blockId: String): Seq[BlockManagerId] = {
+ askMasterWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
}
- private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
- def getLocations(blockId: String): Seq[BlockManagerId] = {
- val tmp = blockId
- logDebug("Got in getLocationsMultipleBlockIds Sub 0 " + tmp)
- if (blockInfo.containsKey(blockId)) {
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- res.appendAll(blockInfo.get(blockId)._2)
- logDebug("Got in getLocationsMultipleBlockIds Sub 1 " + tmp + " " + res.toSeq)
- return res.toSeq
- } else {
- logDebug("Got in getLocationsMultipleBlockIds Sub 2 " + tmp)
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- return res.toSeq
- }
- }
-
- logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq)
- var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]]
- for (blockId <- blockIds) {
- res.append(getLocations(blockId))
- }
- logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq + " : " + res.toSeq)
- sender ! res.toSeq
+ /** Get locations of multiple blockIds from the master */
+ def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
+ askMasterWithRetry[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
}
- private def getPeers(blockManagerId: BlockManagerId, size: Int) {
- var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- res.appendAll(peers)
- res -= blockManagerId
- val rand = new Random(System.currentTimeMillis())
- while (res.length > size) {
- res.remove(rand.nextInt(res.length))
+ /** Get ids of other nodes in the cluster from the master */
+ def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
+ val result = askMasterWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
+ if (result.length != numPeers) {
+ throw new SparkException(
+ "Error getting peers, only got " + result.size + " instead of " + numPeers)
}
- sender ! res.toSeq
+ result
}
- private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) {
- var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
-
- val peersWithIndices = peers.zipWithIndex
- val selfIndex = peersWithIndices.find(_._1 == blockManagerId).map(_._2).getOrElse(-1)
- if (selfIndex == -1) {
- throw new Exception("Self index for " + blockManagerId + " not found")
- }
-
- var index = selfIndex
- while (res.size < size) {
- index += 1
- if (index == selfIndex) {
- throw new Exception("More peer expected than available")
- }
- res += peers(index % peers.size)
- }
- sender ! res.toSeq
+ /**
+ * Remove a block from the slaves that have it. This can only be used to remove
+ * blocks that the master knows about.
+ */
+ def removeBlock(blockId: String) {
+ askMasterWithRetry(RemoveBlock(blockId))
}
-}
-
-private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Boolean, isLocal: Boolean)
- extends Logging {
-
- val AKKA_ACTOR_NAME: String = "BlockMasterManager"
- val REQUEST_RETRY_INTERVAL_MS = 100
- val DEFAULT_MASTER_IP: String = System.getProperty("spark.master.host", "localhost")
- val DEFAULT_MASTER_PORT: Int = System.getProperty("spark.master.port", "7077").toInt
- val DEFAULT_MANAGER_IP: String = Utils.localHostName()
- val timeout = 10.seconds
- var masterActor: ActorRef = null
-
- if (isMaster) {
- masterActor = actorSystem.actorOf(
- Props(new BlockManagerMasterActor(isLocal)), name = AKKA_ACTOR_NAME)
- logInfo("Registered BlockManagerMaster Actor")
- } else {
- val url = "akka://spark@%s:%s/user/%s".format(
- DEFAULT_MASTER_IP, DEFAULT_MASTER_PORT, AKKA_ACTOR_NAME)
- logInfo("Connecting to BlockManagerMaster: " + url)
- masterActor = actorSystem.actorFor(url)
+ /**
+ * Return the memory status for each block manager, in the form of a map from
+ * the block manager's id to two long values. The first value is the maximum
+ * amount of memory allocated for the block manager, while the second is the
+ * amount of remaining memory.
+ */
+ def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
+ askMasterWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}
+ /** Stop the master actor, called only on the Spark master node */
def stop() {
if (masterActor != null) {
- communicate(StopBlockManagerMaster)
+ tell(StopBlockManagerMaster)
masterActor = null
logInfo("BlockManagerMaster stopped")
}
}
- // Send a message to the master actor and get its result within a default timeout, or
- // throw a SparkException if this fails.
- def askMaster(message: Any): Any = {
- try {
- val future = masterActor.ask(message)(timeout)
- return Await.result(future, timeout)
- } catch {
- case e: Exception =>
- throw new SparkException("Error communicating with BlockManagerMaster", e)
- }
- }
-
- // Send a one-way message to the master actor, to which we expect it to reply with true.
- def communicate(message: Any) {
- if (askMaster(message) != true) {
- throw new SparkException("Error reply received from BlockManagerMaster")
- }
- }
-
- def notifyADeadHost(host: String) {
- communicate(RemoveHost(host))
- logInfo("Removed " + host + " successfully in notifyADeadHost")
- }
-
- def mustRegisterBlockManager(msg: RegisterBlockManager) {
- logInfo("Trying to register BlockManager")
- while (! syncRegisterBlockManager(msg)) {
- logWarning("Failed to register " + msg)
- Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
- }
- logInfo("Done registering BlockManager")
- }
-
- def syncRegisterBlockManager(msg: RegisterBlockManager): Boolean = {
- //val masterActor = RemoteActor.select(node, name)
- val startTimeMs = System.currentTimeMillis()
- val tmp = " msg " + msg + " "
- logDebug("Got in syncRegisterBlockManager 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
- try {
- communicate(msg)
- logInfo("BlockManager registered successfully @ syncRegisterBlockManager")
- logDebug("Got in syncRegisterBlockManager 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
- return true
- } catch {
- case e: Exception =>
- logError("Failed in syncRegisterBlockManager", e)
- return false
- }
- }
-
- def mustHeartBeat(msg: HeartBeat): Boolean = {
- var res = syncHeartBeat(msg)
- while (!res.isDefined) {
- logWarning("Failed to send heart beat " + msg)
- Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
+ /** Send a one-way message to the master actor, to which we expect it to reply with true. */
+ private def tell(message: Any) {
+ if (!askMasterWithRetry[Boolean](message)) {
+ throw new SparkException("BlockManagerMasterActor returned false, expected true.")
}
- return res.get
}
- def syncHeartBeat(msg: HeartBeat): Option[Boolean] = {
- try {
- val answer = askMaster(msg).asInstanceOf[Boolean]
- return Some(answer)
- } catch {
- case e: Exception =>
- logError("Failed in syncHeartBeat", e)
- return None
+ /**
+ * Send a message to the master actor and get its result within a default timeout, or
+ * throw a SparkException if this fails.
+ */
+ private def askMasterWithRetry[T](message: Any): T = {
+ // TODO: Consider removing multiple attempts
+ if (masterActor == null) {
+ throw new SparkException("Error sending message to BlockManager as masterActor is null " +
+ "[message = " + message + "]")
}
- }
-
- def mustBlockUpdate(msg: BlockUpdate): Boolean = {
- var res = syncBlockUpdate(msg)
- while (!res.isDefined) {
- logWarning("Failed to send block update " + msg)
- Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
- }
- return res.get
- }
-
- def syncBlockUpdate(msg: BlockUpdate): Option[Boolean] = {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " msg " + msg + " "
- logDebug("Got in syncBlockUpdate " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
-
- try {
- val answer = askMaster(msg).asInstanceOf[Boolean]
- logDebug("Block update sent successfully")
- logDebug("Got in synbBlockUpdate " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
- return Some(answer)
- } catch {
- case e: Exception =>
- logError("Failed in syncBlockUpdate", e)
- return None
- }
- }
-
- def mustGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
- var res = syncGetLocations(msg)
- while (res == null) {
- logInfo("Failed to get locations " + msg)
- Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
- res = syncGetLocations(msg)
- }
- return res
- }
-
- def syncGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " msg " + msg + " "
- logDebug("Got in syncGetLocations 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
- try {
- val answer = askMaster(msg).asInstanceOf[ArrayBuffer[BlockManagerId]]
- if (answer != null) {
- logDebug("GetLocations successful")
- logDebug("Got in syncGetLocations 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
- return answer
- } else {
- logError("Master replied null in response to GetLocations")
- return null
+ var attempts = 0
+ var lastException: Exception = null
+ while (attempts < AKKA_RETRY_ATTEMPS) {
+ attempts += 1
+ try {
+ val future = masterActor.ask(message)(timeout)
+ val result = Await.result(future, timeout)
+ if (result == null) {
+ throw new Exception("BlockManagerMaster returned null")
+ }
+ return result.asInstanceOf[T]
+ } catch {
+ case ie: InterruptedException => throw ie
+ case e: Exception =>
+ lastException = e
+ logWarning("Error sending message to BlockManagerMaster in " + attempts + " attempts", e)
}
- } catch {
- case e: Exception =>
- logError("GetLocations failed", e)
- return null
+ Thread.sleep(AKKA_RETRY_INTERVAL_MS)
}
- }
- def mustGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
- Seq[Seq[BlockManagerId]] = {
- var res: Seq[Seq[BlockManagerId]] = syncGetLocationsMultipleBlockIds(msg)
- while (res == null) {
- logWarning("Failed to GetLocationsMultipleBlockIds " + msg)
- Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
- res = syncGetLocationsMultipleBlockIds(msg)
- }
- return res
+ throw new SparkException(
+ "Error sending message to BlockManagerMaster [message = " + message + "]", lastException)
}
- def syncGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
- Seq[Seq[BlockManagerId]] = {
- val startTimeMs = System.currentTimeMillis
- val tmp = " msg " + msg + " "
- logDebug("Got in syncGetLocationsMultipleBlockIds 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
- try {
- val answer = askMaster(msg).asInstanceOf[Seq[Seq[BlockManagerId]]]
- if (answer != null) {
- logDebug("GetLocationsMultipleBlockIds successful")
- logDebug("Got in syncGetLocationsMultipleBlockIds 1 " + tmp +
- Utils.getUsedTimeMs(startTimeMs))
- return answer
- } else {
- logError("Master replied null in response to GetLocationsMultipleBlockIds")
- return null
- }
- } catch {
- case e: Exception =>
- logError("GetLocationsMultipleBlockIds failed", e)
- return null
- }
- }
-
- def mustGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
- var res = syncGetPeers(msg)
- while ((res == null) || (res.length != msg.size)) {
- logInfo("Failed to get peers " + msg)
- Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
- res = syncGetPeers(msg)
- }
-
- return res
- }
-
- def syncGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
- val startTimeMs = System.currentTimeMillis
- val tmp = " msg " + msg + " "
- logDebug("Got in syncGetPeers 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
- try {
- val answer = askMaster(msg).asInstanceOf[Seq[BlockManagerId]]
- if (answer != null) {
- logDebug("GetPeers successful")
- logDebug("Got in syncGetPeers 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
- return answer
- } else {
- logError("Master replied null in response to GetPeers")
- return null
- }
- } catch {
- case e: Exception =>
- logError("GetPeers failed", e)
- return null
- }
- }
-
- def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
- askMaster(GetMemoryStatus).asInstanceOf[Map[BlockManagerId, (Long, Long)]]
- }
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
new file mode 100644
index 0000000000..f4d026da33
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -0,0 +1,401 @@
+package spark.storage
+
+import java.util.{HashMap => JHashMap}
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
+import scala.util.Random
+
+import akka.actor.{Actor, ActorRef, Cancellable}
+import akka.util.{Duration, Timeout}
+import akka.util.duration._
+
+import spark.{Logging, Utils}
+
+/**
+ * BlockManagerMasterActor is an actor on the master node to track statuses of
+ * all slaves' block managers.
+ */
+private[spark]
+class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
+
+ // Mapping from block manager id to the block manager's information.
+ private val blockManagerInfo =
+ new HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
+
+ // Mapping from host name to block manager id. We allow multiple block managers
+ // on the same host name (ip).
+ private val blockManagerIdByHost = new HashMap[String, ArrayBuffer[BlockManagerId]]
+
+ // Mapping from block id to the set of block managers that have the block.
+ private val blockLocations = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
+
+ initLogging()
+
+ val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
+ "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
+
+ val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
+ "5000").toLong
+
+ var timeoutCheckingTask: Cancellable = null
+
+ override def preStart() {
+ if (!BlockManager.getDisableHeartBeatsForTesting) {
+ timeoutCheckingTask = context.system.scheduler.schedule(
+ 0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
+ }
+ super.preStart()
+ }
+
+ def receive = {
+ case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
+ register(blockManagerId, maxMemSize, slaveActor)
+
+ case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
+ updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
+
+ case GetLocations(blockId) =>
+ getLocations(blockId)
+
+ case GetLocationsMultipleBlockIds(blockIds) =>
+ getLocationsMultipleBlockIds(blockIds)
+
+ case GetPeers(blockManagerId, size) =>
+ getPeersDeterministic(blockManagerId, size)
+ /*getPeers(blockManagerId, size)*/
+
+ case GetMemoryStatus =>
+ getMemoryStatus
+
+ case RemoveBlock(blockId) =>
+ removeBlock(blockId)
+
+ case RemoveHost(host) =>
+ removeHost(host)
+ sender ! true
+
+ case StopBlockManagerMaster =>
+ logInfo("Stopping BlockManagerMaster")
+ sender ! true
+ if (timeoutCheckingTask != null) {
+ timeoutCheckingTask.cancel
+ }
+ context.stop(self)
+
+ case ExpireDeadHosts =>
+ expireDeadHosts()
+
+ case HeartBeat(blockManagerId) =>
+ heartBeat(blockManagerId)
+
+ case other =>
+ logInfo("Got unknown message: " + other)
+ }
+
+ def removeBlockManager(blockManagerId: BlockManagerId) {
+ val info = blockManagerInfo(blockManagerId)
+
+ // Remove the block manager from blockManagerIdByHost. If the list of block
+ // managers belonging to the IP is empty, remove the entry from the hash map.
+ blockManagerIdByHost.get(blockManagerId.ip).foreach { managers: ArrayBuffer[BlockManagerId] =>
+ managers -= blockManagerId
+ if (managers.size == 0) blockManagerIdByHost.remove(blockManagerId.ip)
+ }
+
+ // Remove it from blockManagerInfo and remove all the blocks.
+ blockManagerInfo.remove(blockManagerId)
+ var iterator = info.blocks.keySet.iterator
+ while (iterator.hasNext) {
+ val blockId = iterator.next
+ val locations = blockLocations.get(blockId)._2
+ locations -= blockManagerId
+ if (locations.size == 0) {
+ blockLocations.remove(locations)
+ }
+ }
+ }
+
+ def expireDeadHosts() {
+ logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.")
+ val now = System.currentTimeMillis()
+ val minSeenTime = now - slaveTimeout
+ val toRemove = new HashSet[BlockManagerId]
+ for (info <- blockManagerInfo.values) {
+ if (info.lastSeenMs < minSeenTime) {
+ logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
+ toRemove += info.blockManagerId
+ }
+ }
+ toRemove.foreach(removeBlockManager)
+ }
+
+ def removeHost(host: String) {
+ logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
+ logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
+ blockManagerIdByHost.get(host).foreach(_.foreach(removeBlockManager))
+ logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
+ sender ! true
+ }
+
+ def heartBeat(blockManagerId: BlockManagerId) {
+ if (!blockManagerInfo.contains(blockManagerId)) {
+ if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ sender ! true
+ } else {
+ sender ! false
+ }
+ } else {
+ blockManagerInfo(blockManagerId).updateLastSeenMs()
+ sender ! true
+ }
+ }
+
+ // Remove a block from the slaves that have it. This can only be used to remove
+ // blocks that the master knows about.
+ private def removeBlock(blockId: String) {
+ val block = blockLocations.get(blockId)
+ if (block != null) {
+ block._2.foreach { blockManagerId: BlockManagerId =>
+ val blockManager = blockManagerInfo.get(blockManagerId)
+ if (blockManager.isDefined) {
+ // Remove the block from the slave's BlockManager.
+ // Doesn't actually wait for a confirmation and the message might get lost.
+ // If message loss becomes frequent, we should add retry logic here.
+ blockManager.get.slaveActor ! RemoveBlock(blockId)
+ }
+ }
+ }
+ sender ! true
+ }
+
+ // Return a map from the block manager id to max memory and remaining memory.
+ private def getMemoryStatus() {
+ val res = blockManagerInfo.map { case(blockManagerId, info) =>
+ (blockManagerId, (info.maxMem, info.remainingMem))
+ }.toMap
+ sender ! res
+ }
+
+ private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+ val startTimeMs = System.currentTimeMillis()
+ val tmp = " " + blockManagerId + " "
+
+ if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ logInfo("Got Register Msg from master node, don't register it")
+ } else {
+ blockManagerIdByHost.get(blockManagerId.ip) match {
+ case Some(managers) =>
+ // A block manager of the same host name already exists.
+ logInfo("Got another registration for host " + blockManagerId)
+ managers += blockManagerId
+ case None =>
+ blockManagerIdByHost += (blockManagerId.ip -> ArrayBuffer(blockManagerId))
+ }
+
+ blockManagerInfo += (blockManagerId -> new BlockManagerMasterActor.BlockManagerInfo(
+ blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor))
+ }
+ sender ! true
+ }
+
+ private def updateBlockInfo(
+ blockManagerId: BlockManagerId,
+ blockId: String,
+ storageLevel: StorageLevel,
+ memSize: Long,
+ diskSize: Long) {
+
+ val startTimeMs = System.currentTimeMillis()
+ val tmp = " " + blockManagerId + " " + blockId + " "
+
+ if (!blockManagerInfo.contains(blockManagerId)) {
+ if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ // We intentionally do not register the master (except in local mode),
+ // so we should not indicate failure.
+ sender ! true
+ } else {
+ sender ! false
+ }
+ return
+ }
+
+ if (blockId == null) {
+ blockManagerInfo(blockManagerId).updateLastSeenMs()
+ sender ! true
+ return
+ }
+
+ blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
+
+ var locations: HashSet[BlockManagerId] = null
+ if (blockLocations.containsKey(blockId)) {
+ locations = blockLocations.get(blockId)._2
+ } else {
+ locations = new HashSet[BlockManagerId]
+ blockLocations.put(blockId, (storageLevel.replication, locations))
+ }
+
+ if (storageLevel.isValid) {
+ locations.add(blockManagerId)
+ } else {
+ locations.remove(blockManagerId)
+ }
+
+ // Remove the block from master tracking if it has been removed on all slaves.
+ if (locations.size == 0) {
+ blockLocations.remove(blockId)
+ }
+ sender ! true
+ }
+
+ private def getLocations(blockId: String) {
+ val startTimeMs = System.currentTimeMillis()
+ val tmp = " " + blockId + " "
+ if (blockLocations.containsKey(blockId)) {
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ res.appendAll(blockLocations.get(blockId)._2)
+ sender ! res.toSeq
+ } else {
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ sender ! res
+ }
+ }
+
+ private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
+ def getLocations(blockId: String): Seq[BlockManagerId] = {
+ val tmp = blockId
+ if (blockLocations.containsKey(blockId)) {
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ res.appendAll(blockLocations.get(blockId)._2)
+ return res.toSeq
+ } else {
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ return res.toSeq
+ }
+ }
+
+ var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]]
+ for (blockId <- blockIds) {
+ res.append(getLocations(blockId))
+ }
+ sender ! res.toSeq
+ }
+
+ private def getPeers(blockManagerId: BlockManagerId, size: Int) {
+ var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ res.appendAll(peers)
+ res -= blockManagerId
+ val rand = new Random(System.currentTimeMillis())
+ while (res.length > size) {
+ res.remove(rand.nextInt(res.length))
+ }
+ sender ! res.toSeq
+ }
+
+ private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) {
+ var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+
+ val selfIndex = peers.indexOf(blockManagerId)
+ if (selfIndex == -1) {
+ throw new Exception("Self index for " + blockManagerId + " not found")
+ }
+
+ // Note that this logic will select the same node multiple times if there aren't enough peers
+ var index = selfIndex
+ while (res.size < size) {
+ index += 1
+ if (index == selfIndex) {
+ throw new Exception("More peer expected than available")
+ }
+ res += peers(index % peers.size)
+ }
+ sender ! res.toSeq
+ }
+}
+
+
+private[spark]
+object BlockManagerMasterActor {
+
+ case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
+
+ class BlockManagerInfo(
+ val blockManagerId: BlockManagerId,
+ timeMs: Long,
+ val maxMem: Long,
+ val slaveActor: ActorRef)
+ extends Logging {
+
+ private var _lastSeenMs: Long = timeMs
+ private var _remainingMem: Long = maxMem
+
+ // Mapping from block id to its status.
+ private val _blocks = new JHashMap[String, BlockStatus]
+
+ logInfo("Registering block manager %s:%d with %s RAM".format(
+ blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
+
+ def updateLastSeenMs() {
+ _lastSeenMs = System.currentTimeMillis()
+ }
+
+ def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
+ : Unit = synchronized {
+
+ updateLastSeenMs()
+
+ if (_blocks.containsKey(blockId)) {
+ // The block exists on the slave already.
+ val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
+
+ if (originalLevel.useMemory) {
+ _remainingMem += memSize
+ }
+ }
+
+ if (storageLevel.isValid) {
+ // isValid means it is either stored in-memory or on-disk.
+ _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))
+ if (storageLevel.useMemory) {
+ _remainingMem -= memSize
+ logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
+ Utils.memoryBytesToString(_remainingMem)))
+ }
+ if (storageLevel.useDisk) {
+ logInfo("Added %s on disk on %s:%d (size: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
+ }
+ } else if (_blocks.containsKey(blockId)) {
+ // If isValid is not true, drop the block.
+ val blockStatus: BlockStatus = _blocks.get(blockId)
+ _blocks.remove(blockId)
+ if (blockStatus.storageLevel.useMemory) {
+ _remainingMem += blockStatus.memSize
+ logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
+ Utils.memoryBytesToString(_remainingMem)))
+ }
+ if (blockStatus.storageLevel.useDisk) {
+ logInfo("Removed %s on %s:%d on disk (size: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
+ }
+ }
+ }
+
+ def remainingMem: Long = _remainingMem
+
+ def lastSeenMs: Long = _lastSeenMs
+
+ def blocks: JHashMap[String, BlockStatus] = _blocks
+
+ override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
+
+ def clear() {
+ _blocks.clear()
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
new file mode 100644
index 0000000000..d73a9b790f
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -0,0 +1,102 @@
+package spark.storage
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+import akka.actor.ActorRef
+
+
+//////////////////////////////////////////////////////////////////////////////////
+// Messages from the master to slaves.
+//////////////////////////////////////////////////////////////////////////////////
+private[spark]
+sealed trait ToBlockManagerSlave
+
+// Remove a block from the slaves that have it. This can only be used to remove
+// blocks that the master knows about.
+private[spark]
+case class RemoveBlock(blockId: String) extends ToBlockManagerSlave
+
+
+//////////////////////////////////////////////////////////////////////////////////
+// Messages from slaves to the master.
+//////////////////////////////////////////////////////////////////////////////////
+private[spark]
+sealed trait ToBlockManagerMaster
+
+private[spark]
+case class RegisterBlockManager(
+ blockManagerId: BlockManagerId,
+ maxMemSize: Long,
+ sender: ActorRef)
+ extends ToBlockManagerMaster
+
+private[spark]
+case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
+
+private[spark]
+class UpdateBlockInfo(
+ var blockManagerId: BlockManagerId,
+ var blockId: String,
+ var storageLevel: StorageLevel,
+ var memSize: Long,
+ var diskSize: Long)
+ extends ToBlockManagerMaster
+ with Externalizable {
+
+ def this() = this(null, null, null, 0, 0) // For deserialization only
+
+ override def writeExternal(out: ObjectOutput) {
+ blockManagerId.writeExternal(out)
+ out.writeUTF(blockId)
+ storageLevel.writeExternal(out)
+ out.writeInt(memSize.toInt)
+ out.writeInt(diskSize.toInt)
+ }
+
+ override def readExternal(in: ObjectInput) {
+ blockManagerId = new BlockManagerId()
+ blockManagerId.readExternal(in)
+ blockId = in.readUTF()
+ storageLevel = new StorageLevel()
+ storageLevel.readExternal(in)
+ memSize = in.readInt()
+ diskSize = in.readInt()
+ }
+}
+
+private[spark]
+object UpdateBlockInfo {
+ def apply(blockManagerId: BlockManagerId,
+ blockId: String,
+ storageLevel: StorageLevel,
+ memSize: Long,
+ diskSize: Long): UpdateBlockInfo = {
+ new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
+ }
+
+ // For pattern-matching
+ def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
+ Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
+ }
+}
+
+private[spark]
+case class GetLocations(blockId: String) extends ToBlockManagerMaster
+
+private[spark]
+case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
+
+private[spark]
+case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
+
+private[spark]
+case class RemoveHost(host: String) extends ToBlockManagerMaster
+
+private[spark]
+case object StopBlockManagerMaster extends ToBlockManagerMaster
+
+private[spark]
+case object GetMemoryStatus extends ToBlockManagerMaster
+
+private[spark]
+case object ExpireDeadHosts extends ToBlockManagerMaster
diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
new file mode 100644
index 0000000000..f570cdc52d
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
@@ -0,0 +1,16 @@
+package spark.storage
+
+import akka.actor.Actor
+
+import spark.{Logging, SparkException, Utils}
+
+
+/**
+ * An actor to take commands from the master to execute options. For example,
+ * this is used to remove blocks from the slave's BlockManager.
+ */
+class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor {
+ override def receive = {
+ case RemoveBlock(blockId) => blockManager.removeBlock(blockId)
+ }
+}
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 096bf8bdd9..8188d3595e 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -31,7 +31,12 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
def getValues(blockId: String): Option[Iterator[Any]]
- def remove(blockId: String)
+ /**
+ * Remove a block, if it exists.
+ * @param blockId the block to remove.
+ * @return True if the block was found and removed, False otherwise.
+ */
+ def remove(blockId: String): Boolean
def contains(blockId: String): Boolean
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index b5561479db..7e5b820cbb 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -92,10 +92,13 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes))
}
- override def remove(blockId: String) {
+ override def remove(blockId: String): Boolean = {
val file = getFile(blockId)
if (file.exists()) {
file.delete()
+ true
+ } else {
+ false
}
}
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index 02098b82fe..00e32f753c 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -90,7 +90,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}
- override def remove(blockId: String) {
+ override def remove(blockId: String): Boolean = {
entries.synchronized {
val entry = entries.get(blockId)
if (entry != null) {
@@ -98,8 +98,9 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
currentMemory -= entry.size
logInfo("Block %s of size %d dropped from memory (free %d)".format(
blockId, entry.size, freeMemory))
+ true
} else {
- logWarning("Block " + blockId + " could not be removed as it does not exist")
+ false
}
}
}
diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala
index c497f03e0c..e3544e5aae 100644
--- a/core/src/main/scala/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/spark/storage/StorageLevel.scala
@@ -1,6 +1,6 @@
package spark.storage
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
/**
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
@@ -10,14 +10,16 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
* commonly useful storage levels.
*/
class StorageLevel(
- var useDisk: Boolean,
+ var useDisk: Boolean,
var useMemory: Boolean,
var deserialized: Boolean,
var replication: Int = 1)
extends Externalizable {
// TODO: Also add fields for caching priority, dataset ID, and flushing.
-
+
+ assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes")
+
def this(flags: Int, replication: Int) {
this((flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication)
}
@@ -29,14 +31,14 @@ class StorageLevel(
override def equals(other: Any): Boolean = other match {
case s: StorageLevel =>
- s.useDisk == useDisk &&
+ s.useDisk == useDisk &&
s.useMemory == useMemory &&
s.deserialized == deserialized &&
- s.replication == replication
+ s.replication == replication
case _ =>
false
}
-
+
def isValid = ((useMemory || useDisk) && (replication > 0))
def toInt: Int = {
@@ -66,10 +68,16 @@ class StorageLevel(
replication = in.readByte()
}
+ @throws(classOf[IOException])
+ private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)
+
override def toString: String =
"StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication)
+
+ override def hashCode(): Int = toInt * 41 + replication
}
+
object StorageLevel {
val NONE = new StorageLevel(false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false)
@@ -82,4 +90,16 @@ object StorageLevel {
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
+
+ private[spark]
+ val storageLevelCache = new java.util.concurrent.ConcurrentHashMap[StorageLevel, StorageLevel]()
+
+ private[spark] def getCachedStorageLevel(level: StorageLevel): StorageLevel = {
+ if (storageLevelCache.containsKey(level)) {
+ storageLevelCache.get(level)
+ } else {
+ storageLevelCache.put(level, level)
+ level
+ }
+ }
}
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
index 5bb5a29cc4..689f07b969 100644
--- a/core/src/main/scala/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -58,8 +58,10 @@ private[spark] object ThreadingTest {
val startTime = System.currentTimeMillis()
manager.get(blockId) match {
case Some(retrievedBlock) =>
- assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, "Block " + blockId + " did not match")
- println("Got block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
+ assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList,
+ "Block " + blockId + " did not match")
+ println("Got block " + blockId + " in " +
+ (System.currentTimeMillis - startTime) + " ms")
case None =>
assert(false, "Block " + blockId + " could not be retrieved")
}
@@ -73,7 +75,9 @@ private[spark] object ThreadingTest {
System.setProperty("spark.kryoserializer.buffer.mb", "1")
val actorSystem = ActorSystem("test")
val serializer = new KryoSerializer
- val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true)
+ val masterIp: String = System.getProperty("spark.master.host", "localhost")
+ val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
+ val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true, masterIp, masterPort)
val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer, 1024 * 1024)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
@@ -86,6 +90,7 @@ private[spark] object ThreadingTest {
actorSystem.shutdown()
actorSystem.awaitTermination()
println("Everything stopped.")
- println("It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
+ println(
+ "It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
}
}
diff --git a/core/src/main/scala/spark/util/IdGenerator.scala b/core/src/main/scala/spark/util/IdGenerator.scala
new file mode 100644
index 0000000000..b6e309fe1a
--- /dev/null
+++ b/core/src/main/scala/spark/util/IdGenerator.scala
@@ -0,0 +1,14 @@
+package spark.util
+
+import java.util.concurrent.atomic.AtomicInteger
+
+/**
+ * A util used to get a unique generation ID. This is a wrapper around Java's
+ * AtomicInteger. An example usage is in BlockManager, where each BlockManager
+ * instance would start an Akka actor and we use this utility to assign the Akka
+ * actors unique names.
+ */
+private[spark] class IdGenerator {
+ private var id = new AtomicInteger
+ def next: Int = id.incrementAndGet
+}
diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
new file mode 100644
index 0000000000..19e67acd0c
--- /dev/null
+++ b/core/src/main/scala/spark/util/MetadataCleaner.scala
@@ -0,0 +1,35 @@
+package spark.util
+
+import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors}
+import java.util.{TimerTask, Timer}
+import spark.Logging
+
+class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
+
+ val delaySeconds = (System.getProperty("spark.cleanup.delay", "-100").toDouble * 60).toInt
+ val periodSeconds = math.max(10, delaySeconds / 10)
+ val timer = new Timer(name + " cleanup timer", true)
+
+ val task = new TimerTask {
+ def run() {
+ try {
+ if (delaySeconds > 0) {
+ cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
+ logInfo("Ran metadata cleaner for " + name)
+ }
+ } catch {
+ case e: Exception => logError("Error running cleanup task for " + name, e)
+ }
+ }
+ }
+ if (periodSeconds > 0) {
+ logInfo(
+ "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds and "
+ + "period of " + periodSeconds + " secs")
+ timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
+ }
+
+ def cancel() {
+ timer.cancel()
+ }
+}
diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
new file mode 100644
index 0000000000..070ee19ac0
--- /dev/null
+++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
@@ -0,0 +1,87 @@
+package spark.util
+
+import java.util.concurrent.ConcurrentHashMap
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{HashMap, Map}
+
+/**
+ * This is a custom implementation of scala.collection.mutable.Map which stores the insertion
+ * time stamp along with each key-value pair. Key-value pairs that are older than a particular
+ * threshold time can them be removed using the cleanup method. This is intended to be a drop-in
+ * replacement of scala.collection.mutable.HashMap.
+ */
+class TimeStampedHashMap[A, B] extends Map[A, B]() {
+ val internalMap = new ConcurrentHashMap[A, (B, Long)]()
+
+ def get(key: A): Option[B] = {
+ val value = internalMap.get(key)
+ if (value != null) Some(value._1) else None
+ }
+
+ def iterator: Iterator[(A, B)] = {
+ val jIterator = internalMap.entrySet().iterator()
+ jIterator.map(kv => (kv.getKey, kv.getValue._1))
+ }
+
+ override def + [B1 >: B](kv: (A, B1)): Map[A, B1] = {
+ val newMap = new TimeStampedHashMap[A, B1]
+ newMap.internalMap.putAll(this.internalMap)
+ newMap.internalMap.put(kv._1, (kv._2, currentTime))
+ newMap
+ }
+
+ override def - (key: A): Map[A, B] = {
+ internalMap.remove(key)
+ this
+ }
+
+ override def += (kv: (A, B)): this.type = {
+ internalMap.put(kv._1, (kv._2, currentTime))
+ this
+ }
+
+ override def -= (key: A): this.type = {
+ internalMap.remove(key)
+ this
+ }
+
+ override def update(key: A, value: B) {
+ this += ((key, value))
+ }
+
+ override def apply(key: A): B = {
+ val value = internalMap.get(key)
+ if (value == null) throw new NoSuchElementException()
+ value._1
+ }
+
+ override def filter(p: ((A, B)) => Boolean): Map[A, B] = {
+ internalMap.map(kv => (kv._1, kv._2._1)).filter(p)
+ }
+
+ override def empty: Map[A, B] = new TimeStampedHashMap[A, B]()
+
+ override def size(): Int = internalMap.size()
+
+ override def foreach[U](f: ((A, B)) => U): Unit = {
+ val iterator = internalMap.entrySet().iterator()
+ while(iterator.hasNext) {
+ val entry = iterator.next()
+ val kv = (entry.getKey, entry.getValue._1)
+ f(kv)
+ }
+ }
+
+ def cleanup(threshTime: Long) {
+ val iterator = internalMap.entrySet().iterator()
+ while(iterator.hasNext) {
+ val entry = iterator.next()
+ if (entry.getValue._2 < threshTime) {
+ iterator.remove()
+ }
+ }
+ }
+
+ private def currentTime: Long = System.currentTimeMillis()
+
+}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index ad2253596d..8f86e3170e 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -7,6 +7,10 @@ import akka.actor._
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import org.scalatest.PrivateMethodTester
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.Timeouts._
+import org.scalatest.matchers.ShouldMatchers._
+import org.scalatest.time.SpanSugar._
import spark.KryoSerializer
import spark.SizeEstimator
@@ -20,15 +24,16 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
var oldArch: String = null
var oldOops: String = null
var oldHeartBeat: String = null
-
- // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+
+ // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+ System.setProperty("spark.kryoserializer.buffer.mb", "1")
val serializer = new KryoSerializer
before {
actorSystem = ActorSystem("test")
- master = new BlockManagerMaster(actorSystem, true, true)
+ master = new BlockManagerMaster(actorSystem, true, true, "localhost", 7077)
- // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
+ // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
oldOops = System.setProperty("spark.test.useCompressedOops", "true")
oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
@@ -63,7 +68,33 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
}
- test("manager-master interaction") {
+ test("StorageLevel object caching") {
+ val level1 = new StorageLevel(false, false, false, 3)
+ val level2 = new StorageLevel(false, false, false, 3)
+ val bytes1 = spark.Utils.serialize(level1)
+ val level1_ = spark.Utils.deserialize[StorageLevel](bytes1)
+ val bytes2 = spark.Utils.serialize(level2)
+ val level2_ = spark.Utils.deserialize[StorageLevel](bytes2)
+ assert(level1_ === level1, "Deserialized level1 not same as original level1")
+ assert(level2_ === level2, "Deserialized level2 not same as original level1")
+ assert(level1_ === level2_, "Deserialized level1 not same as deserialized level2")
+ assert(level2_.eq(level1_), "Deserialized level2 not the same object as deserialized level1")
+ }
+
+ test("BlockManagerId object caching") {
+ val id1 = new StorageLevel(false, false, false, 3)
+ val id2 = new StorageLevel(false, false, false, 3)
+ val bytes1 = spark.Utils.serialize(id1)
+ val id1_ = spark.Utils.deserialize[StorageLevel](bytes1)
+ val bytes2 = spark.Utils.serialize(id2)
+ val id2_ = spark.Utils.deserialize[StorageLevel](bytes2)
+ assert(id1_ === id1, "Deserialized id1 not same as original id1")
+ assert(id2_ === id2, "Deserialized id2 not same as original id1")
+ assert(id1_ === id2_, "Deserialized id1 not same as deserialized id2")
+ assert(id2_.eq(id1_), "Deserialized id2 not the same object as deserialized level1")
+ }
+
+ test("master + 1 manager interaction") {
store = new BlockManager(actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
@@ -74,83 +105,122 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false)
- // Checking whether blocks are in memory
+ // Checking whether blocks are in memory
assert(store.getSingle("a1") != None, "a1 was not in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") != None, "a3 was not in store")
// Checking whether master knows about the blocks or not
- assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
- assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
- assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
-
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
+ assert(master.getLocations("a2").size > 0, "master was not told about a2")
+ assert(master.getLocations("a3").size === 0, "master was told about a3")
+
// Drop a1 and a2 from memory; this should be reported back to the master
store.dropFromMemory("a1", null)
store.dropFromMemory("a2", null)
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
- assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
- assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
+ assert(master.getLocations("a1").size === 0, "master did not remove a1")
+ assert(master.getLocations("a2").size === 0, "master did not remove a2")
}
- test("reregistration on heart beat") {
- val heartBeat = PrivateMethod[Unit]('heartBeat)
+ test("master + 2 managers interaction") {
store = new BlockManager(actorSystem, master, serializer, 2000)
+ store2 = new BlockManager(actorSystem, master, new KryoSerializer, 2000)
+
+ val peers = master.getPeers(store.blockManagerId, 1)
+ assert(peers.size === 1, "master did not return the other manager as a peer")
+ assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager")
+
val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
+ store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
+ assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
+ assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
+ }
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+ test("removing block") {
+ store = new BlockManager(actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
- assert(store.getSingle("a1") != None, "a1 was not in store")
- assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+ // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
+ store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, false)
- master.notifyADeadHost(store.blockManagerId.ip)
- assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
+ // Checking whether blocks are in memory and memory size
+ val memStatus = master.getMemoryStatus.head._2
+ assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
+ assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200")
+ assert(store.getSingle("a1-to-remove") != None, "a1 was not in store")
+ assert(store.getSingle("a2-to-remove") != None, "a2 was not in store")
+ assert(store.getSingle("a3-to-remove") != None, "a3 was not in store")
- store invokePrivate heartBeat()
- assert(master.mustGetLocations(GetLocations("a1")).size > 0,
- "a1 was not reregistered with master")
+ // Checking whether master knows about the blocks or not
+ assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1")
+ assert(master.getLocations("a2-to-remove").size > 0, "master was not told about a2")
+ assert(master.getLocations("a3-to-remove").size === 0, "master was told about a3")
+
+ // Remove a1 and a2 and a3. Should be no-op for a3.
+ master.removeBlock("a1-to-remove")
+ master.removeBlock("a2-to-remove")
+ master.removeBlock("a3-to-remove")
+
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("a1-to-remove") should be (None)
+ master.getLocations("a1-to-remove") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("a2-to-remove") should be (None)
+ master.getLocations("a2-to-remove") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("a3-to-remove") should not be (None)
+ master.getLocations("a3-to-remove") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ val memStatus = master.getMemoryStatus.head._2
+ memStatus._1 should equal (2000L)
+ memStatus._2 should equal (2000L)
+ }
}
- test("reregistration on block update") {
+ test("reregistration on heart beat") {
+ val heartBeat = PrivateMethod[Unit]('heartBeat)
store = new BlockManager(actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
- assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+ assert(store.getSingle("a1") != None, "a1 was not in store")
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
master.notifyADeadHost(store.blockManagerId.ip)
- assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
-
- store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
+ assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
- assert(master.mustGetLocations(GetLocations("a1")).size > 0,
- "a1 was not reregistered with master")
- assert(master.mustGetLocations(GetLocations("a2")).size > 0,
- "master was not told about a2")
+ store invokePrivate heartBeat()
+ assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
}
- test("deregistration on duplicate") {
- val heartBeat = PrivateMethod[Unit]('heartBeat)
+ test("reregistration on block update") {
store = new BlockManager(actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
- assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
- store2 = new BlockManager(actorSystem, master, serializer, 2000)
-
- assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
+ master.notifyADeadHost(store.blockManagerId.ip)
+ assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
- store invokePrivate heartBeat()
-
- assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+ store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
- store2 invokePrivate heartBeat()
-
- assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a2 was not removed from master")
+ assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
+ assert(master.getLocations("a2").size > 0, "master was not told about a2")
}
test("in-memory LRU storage") {
@@ -171,7 +241,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") === None, "a3 was in store")
}
-
+
test("in-memory LRU storage with serialization") {
store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)