diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-23 22:04:35 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-23 22:04:35 -0700 |
commit | d290e964ea190e6e782461134119fa6d1f90301f (patch) | |
tree | 0b6d54873ae05ee8e6db68142795220e07f083f1 /core/src/main/scala | |
parent | 0bd20c63e2ed3ca8e3577c2805678d9d34218ffd (diff) | |
parent | 3b97124604de3e359ebd53df96e79c64e7d82517 (diff) | |
download | spark-d290e964ea190e6e782461134119fa6d1f90301f.tar.gz spark-d290e964ea190e6e782461134119fa6d1f90301f.tar.bz2 spark-d290e964ea190e6e782461134119fa6d1f90301f.zip |
Merge pull request #281 from rxin/memreport
Added a method to report slave memory status; force serialize accumulator update in local mode.
Diffstat (limited to 'core/src/main/scala')
3 files changed, 93 insertions, 71 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index becf737597..4975e2a9fc 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -45,7 +45,6 @@ import spark.scheduler.TaskScheduler import spark.scheduler.local.LocalScheduler import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler} import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} -import spark.storage.BlockManagerMaster /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -199,7 +198,7 @@ class SparkContext( parallelize(seq, numSlices) } - /** + /** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ @@ -400,7 +399,7 @@ class SparkContext( new Accumulable(initialValue, param) } - /** + /** * Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for * reading it in distributed functions. The variable will be sent to each cluster only once. */ @@ -427,6 +426,16 @@ class SparkContext( } /** + * Return a map from the slave to the max memory available for caching and the remaining + * memory available for caching. + */ + def getSlavesMemoryStatus: Map[String, (Long, Long)] = { + env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) => + (blockManagerId.ip + ":" + blockManagerId.port, mem) + } + } + + /** * Clear the job's list of files added by `addFile` so that they do not get donwloaded to * any new nodes. */ diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index b84b4dc2ed..eb20fe41b2 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -30,12 +30,12 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon val currentJars: HashMap[String, Long] = new HashMap[String, Long]() val classLoader = new ExecutorURLClassLoader(Array(), Thread.currentThread.getContextClassLoader) - + // TODO: Need to take into account stage priority in scheduling override def start() { } - override def setListener(listener: TaskSchedulerListener) { + override def setListener(listener: TaskSchedulerListener) { this.listener = listener } @@ -78,7 +78,8 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon // on in development (so when users move their local Spark programs // to the cluster, they don't get surprised by serialization errors). val resultToReturn = ser.deserialize[Any](ser.serialize(result)) - val accumUpdates = Accumulators.values + val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]]( + ser.serialize(Accumulators.values)) logInfo("Finished task " + idInJob) listener.taskEnded(task, Success, resultToReturn, accumUpdates) } catch { @@ -126,7 +127,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon } } } - + override def stop() { threadPool.shutdownNow() } diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 7bfa31ac3d..b3345623b3 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -71,76 +71,79 @@ object HeartBeat { Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize)) } } - + private[spark] case class GetLocations(blockId: String) extends ToBlockManagerMaster private[spark] case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster - + private[spark] case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster - + private[spark] case class RemoveHost(host: String) extends ToBlockManagerMaster private[spark] case object StopBlockManagerMaster extends ToBlockManagerMaster +private[spark] +case object GetMemoryStatus extends ToBlockManagerMaster + private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { - + class BlockManagerInfo( val blockManagerId: BlockManagerId, timeMs: Long, val maxMem: Long) { - private var lastSeenMs = timeMs - private var remainingMem = maxMem - private val blocks = new JHashMap[String, StorageLevel] + private var _lastSeenMs = timeMs + private var _remainingMem = maxMem + private val _blocks = new JHashMap[String, StorageLevel] logInfo("Registering block manager %s:%d with %s RAM".format( blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem))) - + def updateLastSeenMs() { - lastSeenMs = System.currentTimeMillis() / 1000 + _lastSeenMs = System.currentTimeMillis() / 1000 } - + def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) : Unit = synchronized { updateLastSeenMs() - - if (blocks.containsKey(blockId)) { + + if (_blocks.containsKey(blockId)) { // The block exists on the slave already. - val originalLevel: StorageLevel = blocks.get(blockId) - + val originalLevel: StorageLevel = _blocks.get(blockId) + if (originalLevel.useMemory) { - remainingMem += memSize + _remainingMem += memSize } } - + if (storageLevel.isValid) { // isValid means it is either stored in-memory or on-disk. - blocks.put(blockId, storageLevel) + _blocks.put(blockId, storageLevel) if (storageLevel.useMemory) { - remainingMem -= memSize + _remainingMem -= memSize logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format( blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize), - Utils.memoryBytesToString(remainingMem))) + Utils.memoryBytesToString(_remainingMem))) } if (storageLevel.useDisk) { logInfo("Added %s on disk on %s:%d (size: %s)".format( blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize))) } - } else if (blocks.containsKey(blockId)) { + } else if (_blocks.containsKey(blockId)) { // If isValid is not true, drop the block. - val originalLevel: StorageLevel = blocks.get(blockId) - blocks.remove(blockId) + val originalLevel: StorageLevel = _blocks.get(blockId) + _blocks.remove(blockId) if (originalLevel.useMemory) { - remainingMem += memSize + _remainingMem += memSize logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format( blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize), - Utils.memoryBytesToString(remainingMem))) + Utils.memoryBytesToString(_remainingMem))) } if (originalLevel.useDisk) { logInfo("Removed %s on %s:%d on disk (size: %s)".format( @@ -149,20 +152,14 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor } } - def getLastSeenMs: Long = { - return lastSeenMs - } - - def getRemainedMem: Long = { - return remainingMem - } + def remainingMem: Long = _remainingMem - override def toString: String = { - return "BlockManagerInfo " + timeMs + " " + remainingMem - } + def lastSeenMs: Long = _lastSeenMs + + override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem def clear() { - blocks.clear() + _blocks.clear() } } @@ -170,7 +167,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]] initLogging() - + def removeHost(host: String) { logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.") logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq) @@ -197,7 +194,10 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor case GetPeers(blockManagerId, size) => getPeersDeterministic(blockManagerId, size) /*getPeers(blockManagerId, size)*/ - + + case GetMemoryStatus => + getMemoryStatus + case RemoveHost(host) => removeHost(host) sender ! true @@ -207,10 +207,18 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor sender ! true context.stop(self) - case other => + case other => logInfo("Got unknown message: " + other) } - + + // Return a map from the block manager id to max memory and remaining memory. + private def getMemoryStatus() { + val res = blockManagerInfo.map { case(blockManagerId, info) => + (blockManagerId, (info.maxMem, info.remainingMem)) + }.toMap + sender ! res + } + private def register(blockManagerId: BlockManagerId, maxMemSize: Long) { val startTimeMs = System.currentTimeMillis() val tmp = " " + blockManagerId + " " @@ -224,25 +232,25 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs)) sender ! true } - + private def heartBeat( blockManagerId: BlockManagerId, blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) { - + val startTimeMs = System.currentTimeMillis() val tmp = " " + blockManagerId + " " + blockId + " " - + if (blockId == null) { blockManagerInfo(blockManagerId).updateLastSeenMs() logDebug("Got in heartBeat 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs)) sender ! true } - + blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize) - + var locations: HashSet[BlockManagerId] = null if (blockInfo.containsKey(blockId)) { locations = blockInfo.get(blockId)._2 @@ -250,19 +258,19 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor locations = new HashSet[BlockManagerId] blockInfo.put(blockId, (storageLevel.replication, locations)) } - + if (storageLevel.isValid) { locations += blockManagerId } else { locations.remove(blockManagerId) } - + if (locations.size == 0) { blockInfo.remove(blockId) } sender ! true } - + private def getLocations(blockId: String) { val startTimeMs = System.currentTimeMillis() val tmp = " " + blockId + " " @@ -270,7 +278,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor if (blockInfo.containsKey(blockId)) { var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] res.appendAll(blockInfo.get(blockId)._2) - logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at " + logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at " + Utils.getUsedTimeMs(startTimeMs)) sender ! res.toSeq } else { @@ -279,7 +287,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor sender ! res } } - + private def getLocationsMultipleBlockIds(blockIds: Array[String]) { def getLocations(blockId: String): Seq[BlockManagerId] = { val tmp = blockId @@ -295,7 +303,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor return res.toSeq } } - + logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq) var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]] for (blockId <- blockIds) { @@ -316,7 +324,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor } sender ! res.toSeq } - + private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) { var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] @@ -362,7 +370,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool logInfo("Connecting to BlockManagerMaster: " + url) masterActor = actorSystem.actorFor(url) } - + def stop() { if (masterActor != null) { communicate(StopBlockManagerMaster) @@ -389,7 +397,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool throw new SparkException("Error reply received from BlockManagerMaster") } } - + def notifyADeadHost(host: String) { communicate(RemoveHost(host + ":" + DEFAULT_MANAGER_PORT)) logInfo("Removed " + host + " successfully in notifyADeadHost") @@ -409,7 +417,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool val startTimeMs = System.currentTimeMillis() val tmp = " msg " + msg + " " logDebug("Got in syncRegisterBlockManager 0 " + tmp + Utils.getUsedTimeMs(startTimeMs)) - + try { communicate(msg) logInfo("BlockManager registered successfully @ syncRegisterBlockManager") @@ -421,19 +429,19 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool return false } } - + def mustHeartBeat(msg: HeartBeat) { while (! syncHeartBeat(msg)) { logWarning("Failed to send heartbeat" + msg) Thread.sleep(REQUEST_RETRY_INTERVAL_MS) } } - + def syncHeartBeat(msg: HeartBeat): Boolean = { val startTimeMs = System.currentTimeMillis() val tmp = " msg " + msg + " " logDebug("Got in syncHeartBeat " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs)) - + try { communicate(msg) logDebug("Heartbeat sent successfully") @@ -445,7 +453,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool return false } } - + def mustGetLocations(msg: GetLocations): Seq[BlockManagerId] = { var res = syncGetLocations(msg) while (res == null) { @@ -455,7 +463,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool } return res } - + def syncGetLocations(msg: GetLocations): Seq[BlockManagerId] = { val startTimeMs = System.currentTimeMillis() val tmp = " msg " + msg + " " @@ -488,13 +496,13 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool } return res } - + def syncGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds): Seq[Seq[BlockManagerId]] = { val startTimeMs = System.currentTimeMillis val tmp = " msg " + msg + " " logDebug("Got in syncGetLocationsMultipleBlockIds 0 " + tmp + Utils.getUsedTimeMs(startTimeMs)) - + try { val answer = askMaster(msg).asInstanceOf[Seq[Seq[BlockManagerId]]] if (answer != null) { @@ -512,7 +520,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool return null } } - + def mustGetPeers(msg: GetPeers): Seq[BlockManagerId] = { var res = syncGetPeers(msg) while ((res == null) || (res.length != msg.size)) { @@ -520,10 +528,10 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool Thread.sleep(REQUEST_RETRY_INTERVAL_MS) res = syncGetPeers(msg) } - + return res } - + def syncGetPeers(msg: GetPeers): Seq[BlockManagerId] = { val startTimeMs = System.currentTimeMillis val tmp = " msg " + msg + " " @@ -545,4 +553,8 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool return null } } + + def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = { + askMaster(GetMemoryStatus).asInstanceOf[Map[BlockManagerId, (Long, Long)]] + } } |