aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/storage/BlockManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/storage/BlockManager.scala')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala207
1 files changed, 144 insertions, 63 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 367c79dd76..9e4816f7ce 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -27,6 +27,7 @@ import spark.SizeEstimator
import spark.SparkEnv
import spark.SparkException
import spark.Utils
+import spark.util.ByteBufferInputStream
import spark.network._
class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
@@ -65,19 +66,15 @@ class BlockLocker(numLockers: Int) {
}
-/**
- * A start towards a block manager class. This will eventually be used for both RDD persistence
- * and shuffle outputs.
- *
- * TODO: Should make the communication with Master or Peers code more robust and log friendly.
- */
+
class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging {
-
+
+ case class BlockInfo(level: StorageLevel, tellMaster: Boolean)
+
private val NUM_LOCKS = 337
private val locker = new BlockLocker(NUM_LOCKS)
- private val storageLevels = Collections.synchronizedMap(new JHashMap[String, StorageLevel])
-
+ private val blockInfo = Collections.synchronizedMap(new JHashMap[String, BlockInfo])
private val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private val diskStore: BlockStore = new DiskStore(this,
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
@@ -87,7 +84,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
val connectionManagerId = connectionManager.id
val blockManagerId = new BlockManagerId(connectionManagerId.host, connectionManagerId.port)
- // TODO(Haoyuan): This will be removed after cacheTracker is removed from the code base.
+ // TODO: This will be removed after cacheTracker is removed from the code base.
var cacheTracker: CacheTracker = null
initLogging()
@@ -104,12 +101,54 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
* Initialize the BlockManager. Register to the BlockManagerMaster, and start the
* BlockManagerWorker actor.
*/
- def initialize() {
+ private def initialize() {
BlockManagerMaster.mustRegisterBlockManager(
RegisterBlockManager(blockManagerId, maxMemory, maxMemory))
BlockManagerWorker.startBlockManagerWorker(this)
}
-
+
+ /**
+ * Get storage level of local block. If no info exists for the block, then returns null.
+ */
+ def getLevel(blockId: String): StorageLevel = {
+ val info = blockInfo.get(blockId)
+ if (info != null) info.level else null
+ }
+
+ /**
+ * Change storage level for a local block and tell master is necesary.
+ * If new level is invalid, then block info (if it exists) will be silently removed.
+ */
+ def setLevel(blockId: String, level: StorageLevel, tellMaster: Boolean = true) {
+ if (level == null) {
+ throw new IllegalArgumentException("Storage level is null")
+ }
+
+ // If there was earlier info about the block, then use earlier tellMaster
+ val oldInfo = blockInfo.get(blockId)
+ val newTellMaster = if (oldInfo != null) oldInfo.tellMaster else tellMaster
+ if (oldInfo != null && oldInfo.tellMaster != tellMaster) {
+ logWarning("Ignoring tellMaster setting as it is different from earlier setting")
+ }
+
+ // If level is valid, store the block info, else remove the block info
+ if (level.isValid) {
+ blockInfo.put(blockId, new BlockInfo(level, newTellMaster))
+ logDebug("Info for block " + blockId + " updated with new level as " + level)
+ } else {
+ blockInfo.remove(blockId)
+ logDebug("Info for block " + blockId + " removed as new level is null or invalid")
+ }
+
+ // Tell master if necessary
+ if (newTellMaster) {
+ logDebug("Told master about block " + blockId)
+ notifyMaster(HeartBeat(blockManagerId, blockId, level, 0, 0))
+ } else {
+ logDebug("Did not tell master about block " + blockId)
+ }
+ }
+
/**
* Get locations of the block.
*/
@@ -122,9 +161,9 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
}
/**
- * Get locations of an array of blocks
+ * Get locations of an array of blocks.
*/
- def getLocationsMultipleBlockIds(blockIds: Array[String]): Array[Seq[String]] = {
+ def getLocations(blockIds: Array[String]): Array[Seq[String]] = {
val startTimeMs = System.currentTimeMillis
val locations = BlockManagerMaster.mustGetLocationsMultipleBlockIds(
GetLocationsMultipleBlockIds(blockIds)).map(_.map(_.ip).toSeq).toArray
@@ -132,12 +171,18 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
return locations
}
+ /**
+ * Get block from local block manager.
+ */
def getLocal(blockId: String): Option[Iterator[Any]] = {
- logDebug("Getting block " + blockId)
+ if (blockId == null) {
+ throw new IllegalArgumentException("Block Id is null")
+ }
+ logDebug("Getting local block " + blockId)
locker.getLock(blockId).synchronized {
// Check storage level of block
- val level = storageLevels.get(blockId)
+ val level = getLevel(blockId)
if (level != null) {
logDebug("Level for block " + blockId + " is " + level + " on local machine")
@@ -181,12 +226,20 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
return None
}
+ /**
+ * Get block from remote block managers.
+ */
def getRemote(blockId: String): Option[Iterator[Any]] = {
+ if (blockId == null) {
+ throw new IllegalArgumentException("Block Id is null")
+ }
+ logDebug("Getting remote block " + blockId)
// Get locations of block
val locations = BlockManagerMaster.mustGetLocations(GetLocations(blockId))
// Get block from remote locations
for (loc <- locations) {
+ logDebug("Getting remote block " + blockId + " from " + loc)
val data = BlockManagerWorker.syncGetBlock(
GetBlock(blockId), ConnectionManagerId(loc.ip, loc.port))
if (data != null) {
@@ -200,16 +253,19 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
}
/**
- * Read a block from the block manager.
+ * Get a block from the block manager (either local or remote).
*/
def get(blockId: String): Option[Iterator[Any]] = {
getLocal(blockId).orElse(getRemote(blockId))
}
/**
- * Read many blocks from block manager using their BlockManagerIds.
+ * Get many blocks from local and remote block manager using their BlockManagerIds.
*/
def get(blocksByAddress: Seq[(BlockManagerId, Seq[String])]): HashMap[String, Option[Iterator[Any]]] = {
+ if (blocksByAddress == null) {
+ throw new IllegalArgumentException("BlocksByAddress is null")
+ }
logDebug("Getting " + blocksByAddress.map(_._2.size).sum + " blocks")
var startTime = System.currentTimeMillis
val blocks = new HashMap[String,Option[Iterator[Any]]]()
@@ -235,7 +291,8 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
val future = connectionManager.sendMessageReliably(cmId, blockMessageArray.toBufferMessage)
(cmId, future)
}
- logDebug("Started remote gets for " + remoteBlockIds.size + " blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
+ logDebug("Started remote gets for " + remoteBlockIds.size + " blocks in " +
+ Utils.getUsedTimeMs(startTime) + " ms")
// Get the local blocks while remote blocks are being fetched
startTime = System.currentTimeMillis
@@ -276,7 +333,8 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
throw new BlockException(oneBlockId, "Could not get blocks from " + cmId)
}
}
- logDebug("Got remote " + count + " blocks from " + cmId.host + " in " + Utils.getUsedTimeMs(startTime) + " ms")
+ logDebug("Got remote " + count + " blocks from " + cmId.host + " in " +
+ Utils.getUsedTimeMs(startTime) + " ms")
}
logDebug("Got all blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
@@ -284,29 +342,32 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
}
/**
- * Write a new block to the block manager.
+ * Put a new block of values to the block manager.
*/
def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true) {
- if (!level.useDisk && !level.useMemory) {
- throw new IllegalArgumentException("Storage level has neither useMemory nor useDisk set")
+ if (blockId == null) {
+ throw new IllegalArgumentException("Block Id is null")
+ }
+ if (values == null) {
+ throw new IllegalArgumentException("Values is null")
+ }
+ if (level == null || !level.isValid) {
+ throw new IllegalArgumentException("Storage level is null or invalid")
}
val startTimeMs = System.currentTimeMillis
var bytes: ByteBuffer = null
locker.getLock(blockId).synchronized {
- logDebug("Put for block " + blockId + " used " + Utils.getUsedTimeMs(startTimeMs)
+ logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
// Check and warn if block with same id already exists
- if (storageLevels.get(blockId) != null) {
+ if (getLevel(blockId) != null) {
logWarning("Block " + blockId + " already exists in local machine")
return
}
- // Store the storage level
- storageLevels.put(blockId, level)
-
if (level.useMemory && level.useDisk) {
// If saving to both memory and disk, then serialize only once
memoryStore.putValues(blockId, values, level) match {
@@ -333,11 +394,10 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
}
}
- if (tellMaster) {
- notifyMaster(HeartBeat(blockManagerId, blockId, level, 0, 0))
- logDebug("Put block " + blockId + " after notifying the master " + Utils.getUsedTimeMs(startTimeMs))
- }
+ // Store the storage level
+ setLevel(blockId, level, tellMaster)
}
+ logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
// Replicate block if required
if (level.replication > 1) {
@@ -347,21 +407,32 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
replicate(blockId, bytes, level)
}
- // TODO(Haoyuan): This code will be removed when CacheTracker is gone.
+ // TODO: This code will be removed when CacheTracker is gone.
if (blockId.startsWith("rdd")) {
notifyTheCacheTracker(blockId)
}
- logDebug("Put block " + blockId + " after notifying the CacheTracker " + Utils.getUsedTimeMs(startTimeMs))
+ logDebug("Put block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs))
}
+ /**
+ * Put a new block of serialized bytes to the block manager.
+ */
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {
- val startTime = System.currentTimeMillis
- if (!level.useDisk && !level.useMemory) {
- throw new IllegalArgumentException("Storage level has neither useMemory nor useDisk set")
- } else if (level.deserialized) {
- throw new IllegalArgumentException("Storage level cannot have deserialized when putBytes is used")
+ if (blockId == null) {
+ throw new IllegalArgumentException("Block Id is null")
+ }
+ if (bytes == null) {
+ throw new IllegalArgumentException("Bytes is null")
+ }
+ if (level == null || !level.isValid) {
+ throw new IllegalArgumentException("Storage level is null or invalid")
}
+
+ val startTimeMs = System.currentTimeMillis
+
+ // Initiate the replication before storing it locally. This is faster as
+ // data is already serialized and ready for sending
val replicationFuture = if (level.replication > 1) {
future {
replicate(blockId, bytes, level)
@@ -371,13 +442,12 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
}
locker.getLock(blockId).synchronized {
- logDebug("PutBytes for block " + blockId + " used " + Utils.getUsedTimeMs(startTime)
+ logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
- if (storageLevels.get(blockId) != null) {
+ if (getLevel(blockId) != null) {
logWarning("Block " + blockId + " already exists")
return
}
- storageLevels.put(blockId, level)
if (level.useMemory) {
memoryStore.putBytes(blockId, bytes, level)
@@ -385,15 +455,17 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
if (level.useDisk) {
diskStore.putBytes(blockId, bytes, level)
}
- if (tellMaster) {
- notifyMaster(HeartBeat(blockManagerId, blockId, level, 0, 0))
- }
+
+ // Store the storage level
+ setLevel(blockId, level, tellMaster)
}
+ // TODO: This code will be removed when CacheTracker is gone.
if (blockId.startsWith("rdd")) {
notifyTheCacheTracker(blockId)
}
-
+
+ // If replication had started, then wait for it to finish
if (level.replication > 1) {
if (replicationFuture == null) {
throw new Exception("Unexpected")
@@ -403,13 +475,18 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
val finishTime = System.currentTimeMillis
if (level.replication > 1) {
- logDebug("PutBytes with replication took " + (finishTime - startTime) + " ms")
+ logDebug("PutBytes for block " + blockId + " with replication took " +
+ Utils.getUsedTimeMs(startTimeMs))
} else {
- logDebug("PutBytes without replication took " + (finishTime - startTime) + " ms")
+ logDebug("PutBytes for block " + blockId + " without replication took " +
+ Utils.getUsedTimeMs(startTimeMs))
}
-
}
+ /**
+ * Replicate block to another node.
+ */
+
private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) {
val tLevel: StorageLevel =
new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
@@ -429,8 +506,8 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
}
}
- // TODO(Haoyuan): This code will be removed when CacheTracker is gone.
- def notifyTheCacheTracker(key: String) {
+ // TODO: This code will be removed when CacheTracker is gone.
+ private def notifyTheCacheTracker(key: String) {
val rddInfo = key.split(":")
val rddId: Int = rddInfo(1).toInt
val splitIndex: Int = rddInfo(2).toInt
@@ -448,8 +525,8 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
/**
* Write a block consisting of a single object.
*/
- def putSingle(blockId: String, value: Any, level: StorageLevel) {
- put(blockId, Iterator(value), level)
+ def putSingle(blockId: String, value: Any, level: StorageLevel, tellMaster: Boolean = true) {
+ put(blockId, Iterator(value), level, tellMaster)
}
/**
@@ -457,7 +534,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
*/
def dropFromMemory(blockId: String) {
locker.getLock(blockId).synchronized {
- val level = storageLevels.get(blockId)
+ val level = getLevel(blockId)
if (level == null) {
logWarning("Block " + blockId + " cannot be removed from memory as it does not exist")
return
@@ -467,14 +544,8 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
return
}
memoryStore.remove(blockId)
- if (!level.useDisk) {
- storageLevels.remove(blockId)
- } else {
- val newLevel = level.clone
- newLevel.useMemory = false
- storageLevels.remove(blockId)
- storageLevels.put(blockId, newLevel)
- }
+ val newLevel = new StorageLevel(level.useDisk, false, level.deserialized, level.replication)
+ setLevel(blockId, newLevel)
}
}
@@ -489,14 +560,24 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = {
/*serializer.newInstance().deserializeMany(bytes)*/
val ser = serializer.newInstance()
- return ser.deserializeStream(new FastByteArrayInputStream(bytes.array())).toIterator
+ bytes.rewind()
+ return ser.deserializeStream(new ByteBufferInputStream(bytes)).toIterator
}
private def notifyMaster(heartBeat: HeartBeat) {
BlockManagerMaster.mustHeartBeat(heartBeat)
}
+
+ def stop() {
+ connectionManager.stop()
+ blockInfo.clear()
+ memoryStore.clear()
+ diskStore.clear()
+ logInfo("BlockManager stopped")
+ }
}
+
object BlockManager extends Logging {
def getMaxMemoryFromSystemProperties(): Long = {
val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble