aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-23 22:04:35 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-23 22:04:35 -0700
commitd290e964ea190e6e782461134119fa6d1f90301f (patch)
tree0b6d54873ae05ee8e6db68142795220e07f083f1 /core
parent0bd20c63e2ed3ca8e3577c2805678d9d34218ffd (diff)
parent3b97124604de3e359ebd53df96e79c64e7d82517 (diff)
downloadspark-d290e964ea190e6e782461134119fa6d1f90301f.tar.gz
spark-d290e964ea190e6e782461134119fa6d1f90301f.tar.bz2
spark-d290e964ea190e6e782461134119fa6d1f90301f.zip
Merge pull request #281 from rxin/memreport
Added a method to report slave memory status; force serialize accumulator update in local mode.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala15
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala9
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala140
3 files changed, 93 insertions, 71 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index becf737597..4975e2a9fc 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -45,7 +45,6 @@ import spark.scheduler.TaskScheduler
import spark.scheduler.local.LocalScheduler
import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import spark.storage.BlockManagerMaster
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -199,7 +198,7 @@ class SparkContext(
parallelize(seq, numSlices)
}
- /**
+ /**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
@@ -400,7 +399,7 @@ class SparkContext(
new Accumulable(initialValue, param)
}
- /**
+ /**
* Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for
* reading it in distributed functions. The variable will be sent to each cluster only once.
*/
@@ -427,6 +426,16 @@ class SparkContext(
}
/**
+ * Return a map from the slave to the max memory available for caching and the remaining
+ * memory available for caching.
+ */
+ def getSlavesMemoryStatus: Map[String, (Long, Long)] = {
+ env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
+ (blockManagerId.ip + ":" + blockManagerId.port, mem)
+ }
+ }
+
+ /**
* Clear the job's list of files added by `addFile` so that they do not get donwloaded to
* any new nodes.
*/
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index b84b4dc2ed..eb20fe41b2 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -30,12 +30,12 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
val classLoader = new ExecutorURLClassLoader(Array(), Thread.currentThread.getContextClassLoader)
-
+
// TODO: Need to take into account stage priority in scheduling
override def start() { }
- override def setListener(listener: TaskSchedulerListener) {
+ override def setListener(listener: TaskSchedulerListener) {
this.listener = listener
}
@@ -78,7 +78,8 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
// on in development (so when users move their local Spark programs
// to the cluster, they don't get surprised by serialization errors).
val resultToReturn = ser.deserialize[Any](ser.serialize(result))
- val accumUpdates = Accumulators.values
+ val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
+ ser.serialize(Accumulators.values))
logInfo("Finished task " + idInJob)
listener.taskEnded(task, Success, resultToReturn, accumUpdates)
} catch {
@@ -126,7 +127,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
}
}
}
-
+
override def stop() {
threadPool.shutdownNow()
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 7bfa31ac3d..b3345623b3 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -71,76 +71,79 @@ object HeartBeat {
Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
}
}
-
+
private[spark]
case class GetLocations(blockId: String) extends ToBlockManagerMaster
private[spark]
case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
-
+
private[spark]
case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
-
+
private[spark]
case class RemoveHost(host: String) extends ToBlockManagerMaster
private[spark]
case object StopBlockManagerMaster extends ToBlockManagerMaster
+private[spark]
+case object GetMemoryStatus extends ToBlockManagerMaster
+
private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
-
+
class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
val maxMem: Long) {
- private var lastSeenMs = timeMs
- private var remainingMem = maxMem
- private val blocks = new JHashMap[String, StorageLevel]
+ private var _lastSeenMs = timeMs
+ private var _remainingMem = maxMem
+ private val _blocks = new JHashMap[String, StorageLevel]
logInfo("Registering block manager %s:%d with %s RAM".format(
blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
-
+
def updateLastSeenMs() {
- lastSeenMs = System.currentTimeMillis() / 1000
+ _lastSeenMs = System.currentTimeMillis() / 1000
}
-
+
def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
: Unit = synchronized {
updateLastSeenMs()
-
- if (blocks.containsKey(blockId)) {
+
+ if (_blocks.containsKey(blockId)) {
// The block exists on the slave already.
- val originalLevel: StorageLevel = blocks.get(blockId)
-
+ val originalLevel: StorageLevel = _blocks.get(blockId)
+
if (originalLevel.useMemory) {
- remainingMem += memSize
+ _remainingMem += memSize
}
}
-
+
if (storageLevel.isValid) {
// isValid means it is either stored in-memory or on-disk.
- blocks.put(blockId, storageLevel)
+ _blocks.put(blockId, storageLevel)
if (storageLevel.useMemory) {
- remainingMem -= memSize
+ _remainingMem -= memSize
logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
- Utils.memoryBytesToString(remainingMem)))
+ Utils.memoryBytesToString(_remainingMem)))
}
if (storageLevel.useDisk) {
logInfo("Added %s on disk on %s:%d (size: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
}
- } else if (blocks.containsKey(blockId)) {
+ } else if (_blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
- val originalLevel: StorageLevel = blocks.get(blockId)
- blocks.remove(blockId)
+ val originalLevel: StorageLevel = _blocks.get(blockId)
+ _blocks.remove(blockId)
if (originalLevel.useMemory) {
- remainingMem += memSize
+ _remainingMem += memSize
logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
- Utils.memoryBytesToString(remainingMem)))
+ Utils.memoryBytesToString(_remainingMem)))
}
if (originalLevel.useDisk) {
logInfo("Removed %s on %s:%d on disk (size: %s)".format(
@@ -149,20 +152,14 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
}
- def getLastSeenMs: Long = {
- return lastSeenMs
- }
-
- def getRemainedMem: Long = {
- return remainingMem
- }
+ def remainingMem: Long = _remainingMem
- override def toString: String = {
- return "BlockManagerInfo " + timeMs + " " + remainingMem
- }
+ def lastSeenMs: Long = _lastSeenMs
+
+ override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
def clear() {
- blocks.clear()
+ _blocks.clear()
}
}
@@ -170,7 +167,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
initLogging()
-
+
def removeHost(host: String) {
logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
@@ -197,7 +194,10 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
case GetPeers(blockManagerId, size) =>
getPeersDeterministic(blockManagerId, size)
/*getPeers(blockManagerId, size)*/
-
+
+ case GetMemoryStatus =>
+ getMemoryStatus
+
case RemoveHost(host) =>
removeHost(host)
sender ! true
@@ -207,10 +207,18 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
sender ! true
context.stop(self)
- case other =>
+ case other =>
logInfo("Got unknown message: " + other)
}
-
+
+ // Return a map from the block manager id to max memory and remaining memory.
+ private def getMemoryStatus() {
+ val res = blockManagerInfo.map { case(blockManagerId, info) =>
+ (blockManagerId, (info.maxMem, info.remainingMem))
+ }.toMap
+ sender ! res
+ }
+
private def register(blockManagerId: BlockManagerId, maxMemSize: Long) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
@@ -224,25 +232,25 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
sender ! true
}
-
+
private def heartBeat(
blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long) {
-
+
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " " + blockId + " "
-
+
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
logDebug("Got in heartBeat 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
sender ! true
}
-
+
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
-
+
var locations: HashSet[BlockManagerId] = null
if (blockInfo.containsKey(blockId)) {
locations = blockInfo.get(blockId)._2
@@ -250,19 +258,19 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
locations = new HashSet[BlockManagerId]
blockInfo.put(blockId, (storageLevel.replication, locations))
}
-
+
if (storageLevel.isValid) {
locations += blockManagerId
} else {
locations.remove(blockManagerId)
}
-
+
if (locations.size == 0) {
blockInfo.remove(blockId)
}
sender ! true
}
-
+
private def getLocations(blockId: String) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockId + " "
@@ -270,7 +278,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
if (blockInfo.containsKey(blockId)) {
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
res.appendAll(blockInfo.get(blockId)._2)
- logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at "
+ logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at "
+ Utils.getUsedTimeMs(startTimeMs))
sender ! res.toSeq
} else {
@@ -279,7 +287,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
sender ! res
}
}
-
+
private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
def getLocations(blockId: String): Seq[BlockManagerId] = {
val tmp = blockId
@@ -295,7 +303,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
return res.toSeq
}
}
-
+
logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq)
var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]]
for (blockId <- blockIds) {
@@ -316,7 +324,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
sender ! res.toSeq
}
-
+
private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) {
var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
@@ -362,7 +370,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
logInfo("Connecting to BlockManagerMaster: " + url)
masterActor = actorSystem.actorFor(url)
}
-
+
def stop() {
if (masterActor != null) {
communicate(StopBlockManagerMaster)
@@ -389,7 +397,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
throw new SparkException("Error reply received from BlockManagerMaster")
}
}
-
+
def notifyADeadHost(host: String) {
communicate(RemoveHost(host + ":" + DEFAULT_MANAGER_PORT))
logInfo("Removed " + host + " successfully in notifyADeadHost")
@@ -409,7 +417,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
logDebug("Got in syncRegisterBlockManager 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
+
try {
communicate(msg)
logInfo("BlockManager registered successfully @ syncRegisterBlockManager")
@@ -421,19 +429,19 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
return false
}
}
-
+
def mustHeartBeat(msg: HeartBeat) {
while (! syncHeartBeat(msg)) {
logWarning("Failed to send heartbeat" + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
}
}
-
+
def syncHeartBeat(msg: HeartBeat): Boolean = {
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
logDebug("Got in syncHeartBeat " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
-
+
try {
communicate(msg)
logDebug("Heartbeat sent successfully")
@@ -445,7 +453,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
return false
}
}
-
+
def mustGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
var res = syncGetLocations(msg)
while (res == null) {
@@ -455,7 +463,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
}
return res
}
-
+
def syncGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
@@ -488,13 +496,13 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
}
return res
}
-
+
def syncGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
Seq[Seq[BlockManagerId]] = {
val startTimeMs = System.currentTimeMillis
val tmp = " msg " + msg + " "
logDebug("Got in syncGetLocationsMultipleBlockIds 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
+
try {
val answer = askMaster(msg).asInstanceOf[Seq[Seq[BlockManagerId]]]
if (answer != null) {
@@ -512,7 +520,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
return null
}
}
-
+
def mustGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
var res = syncGetPeers(msg)
while ((res == null) || (res.length != msg.size)) {
@@ -520,10 +528,10 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
res = syncGetPeers(msg)
}
-
+
return res
}
-
+
def syncGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
val startTimeMs = System.currentTimeMillis
val tmp = " msg " + msg + " "
@@ -545,4 +553,8 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
return null
}
}
+
+ def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
+ askMaster(GetMemoryStatus).asInstanceOf[Map[BlockManagerId, (Long, Long)]]
+ }
}