aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/storage
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-02-19 12:44:05 -0800
committerReynold Xin <rxin@cs.berkeley.edu>2013-02-19 12:44:05 -0800
commit19d3b059e3622e5e20b56b4bf1e46cea22b6bee7 (patch)
tree6174901ed0f65ba052e88c48193d69f24c9fe65a /core/src/main/scala/spark/storage
parent81c4d19c612208b932f8579427cb895385336c6e (diff)
parent03d847999e8c54684128573b94973544026081b2 (diff)
downloadspark-19d3b059e3622e5e20b56b4bf1e46cea22b6bee7.tar.gz
spark-19d3b059e3622e5e20b56b4bf1e46cea22b6bee7.tar.bz2
spark-19d3b059e3622e5e20b56b4bf1e46cea22b6bee7.zip
Merge branch 'master' into graph
Diffstat (limited to 'core/src/main/scala/spark/storage')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala5
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala6
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerUI.scala39
-rw-r--r--core/src/main/scala/spark/storage/StorageUtils.scala24
4 files changed, 36 insertions, 38 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index c61fd75c2b..2462721fb8 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -513,7 +513,7 @@ class BlockManager(
}
}
- // Split local and remote blocks. Remote blocks are further split into FetchRequests of size
+ // Partition local and remote blocks. Remote blocks are further split into FetchRequests of size
// at most maxBytesInFlight in order to limit the amount of data in flight.
val remoteRequests = new ArrayBuffer[FetchRequest]
for ((address, blockInfos) <- blocksByAddress) {
@@ -585,7 +585,7 @@ class BlockManager(
resultsGotten += 1
val result = results.take()
bytesInFlight -= result.size
- if (!fetchRequests.isEmpty &&
+ while (!fetchRequests.isEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}
@@ -950,6 +950,7 @@ class BlockManager(
blockInfo.clear()
memoryStore.clear()
diskStore.clear()
+ metadataCleaner.cancel()
logInfo("BlockManager stopped")
}
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 36398095a2..7389bee150 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -27,8 +27,6 @@ private[spark] class BlockManagerMaster(
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
val DRIVER_AKKA_ACTOR_NAME = "BlockMasterManager"
- val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager"
- val DEFAULT_MANAGER_IP: String = Utils.localHostName()
val timeout = 10.seconds
var driverActor: ActorRef = {
@@ -117,6 +115,10 @@ private[spark] class BlockManagerMaster(
askDriverWithReply[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}
+ def getStorageStatus: Array[StorageStatus] = {
+ askDriverWithReply[ArrayBuffer[StorageStatus]](GetStorageStatus).toArray
+ }
+
/** Stop the driver actor, called only on the Spark driver node */
def stop() {
if (driverActor != null) {
diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala
index eda320fa47..9e6721ec17 100644
--- a/core/src/main/scala/spark/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala
@@ -1,13 +1,10 @@
package spark.storage
import akka.actor.{ActorRef, ActorSystem}
-import akka.pattern.ask
import akka.util.Timeout
import akka.util.duration._
-import cc.spray.directives._
import cc.spray.typeconversion.TwirlSupport._
import cc.spray.Directives
-import scala.collection.mutable.ArrayBuffer
import spark.{Logging, SparkContext}
import spark.util.AkkaUtils
import spark.Utils
@@ -48,32 +45,26 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
path("") {
completeWith {
// Request the current storage status from the Master
- val future = blockManagerMaster ? GetStorageStatus
- future.map { status =>
- // Calculate macro-level statistics
- val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
- val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
- val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
- .reduceOption(_+_).getOrElse(0L)
- val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
- spark.storage.html.index.
- render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
- }
+ val storageStatusList = sc.getExecutorStorageStatus
+ // Calculate macro-level statistics
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
+ val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
+ .reduceOption(_+_).getOrElse(0L)
+ val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
+ spark.storage.html.index.
+ render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
}
} ~
path("rdd") {
parameter("id") { id =>
completeWith {
- val future = blockManagerMaster ? GetStorageStatus
- future.map { status =>
- val prefix = "rdd_" + id.toString
- val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
- val filteredStorageStatusList = StorageUtils.
- filterStorageStatusByPrefix(storageStatusList, prefix)
- val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
- spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
- }
+ val prefix = "rdd_" + id.toString
+ val storageStatusList = sc.getExecutorStorageStatus
+ val filteredStorageStatusList = StorageUtils.
+ filterStorageStatusByPrefix(storageStatusList, prefix)
+ val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
+ spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
}
}
} ~
diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala
index a10e3a95c6..dec47a9d41 100644
--- a/core/src/main/scala/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/spark/storage/StorageUtils.scala
@@ -1,6 +1,6 @@
package spark.storage
-import spark.SparkContext
+import spark.{Utils, SparkContext}
import BlockManagerMasterActor.BlockStatus
private[spark]
@@ -22,8 +22,13 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
}
case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
- numPartitions: Int, memSize: Long, diskSize: Long)
-
+ numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long) {
+ override def toString = {
+ import Utils.memoryBytesToString
+ "RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id,
+ storageLevel.toString, numCachedPartitions, numPartitions, memoryBytesToString(memSize), memoryBytesToString(diskSize))
+ }
+}
/* Helper methods for storage-related objects */
private[spark]
@@ -38,8 +43,6 @@ object StorageUtils {
/* Given a list of BlockStatus objets, returns information for each RDD */
def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus],
sc: SparkContext) : Array[RDDInfo] = {
- // Find all RDD Blocks (ignore broadcast variables)
- val rddBlocks = infos.filterKeys(_.startsWith("rdd"))
// Group by rddId, ignore the partition name
val groupedRddBlocks = infos.groupBy { case(k, v) =>
@@ -56,10 +59,11 @@ object StorageUtils {
// Find the id of the RDD, e.g. rdd_1 => 1
val rddId = rddKey.split("_").last.toInt
// Get the friendly name for the rdd, if available.
- val rddName = Option(sc.persistentRdds(rddId).name).getOrElse(rddKey)
- val rddStorageLevel = sc.persistentRdds(rddId).getStorageLevel
-
- RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize)
+ val rdd = sc.persistentRdds(rddId)
+ val rddName = Option(rdd.name).getOrElse(rddKey)
+ val rddStorageLevel = rdd.getStorageLevel
+
+ RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, rdd.partitions.size, memSize, diskSize)
}.toArray
}
@@ -75,4 +79,4 @@ object StorageUtils {
}
-} \ No newline at end of file
+}