aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-02-02 23:44:13 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-02-02 23:44:13 -0800
commit88ee6163a1b5d6fccfbd6c7e9ae6fd4968b0695e (patch)
tree67c2a8a58503dfa644a29a4e2f567a7408b4b40f /core
parentcd4ca936798f327357eff1aa27c238dbce18cb8d (diff)
parentc6190067ae40cf457b7f2e58619904b6fd2b1cb6 (diff)
downloadspark-88ee6163a1b5d6fccfbd6c7e9ae6fd4968b0695e.tar.gz
spark-88ee6163a1b5d6fccfbd6c7e9ae6fd4968b0695e.tar.bz2
spark-88ee6163a1b5d6fccfbd6c7e9ae6fd4968b0695e.zip
Merge pull request #422 from squito/blockmanager_info
RDDInfo available from SparkContext
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala18
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala4
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerUI.scala39
-rw-r--r--core/src/main/scala/spark/storage/StorageUtils.scala24
-rw-r--r--core/src/main/twirl/spark/storage/rdd.scala.html6
-rw-r--r--core/src/main/twirl/spark/storage/rdd_table.scala.html6
6 files changed, 59 insertions, 38 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 2ed458c6fe..b95049da0c 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -46,6 +46,7 @@ import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, C
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import storage.BlockManagerUI
import util.{MetadataCleaner, TimeStampedHashMap}
+import storage.{StorageStatus, StorageUtils, RDDInfo}
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -468,13 +469,28 @@ class SparkContext(
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
*/
- def getSlavesMemoryStatus: Map[String, (Long, Long)] = {
+ def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
(blockManagerId.ip + ":" + blockManagerId.port, mem)
}
}
/**
+ * Return information about what RDDs are cached, if they are in mem or on disk, how much space
+ * they take, etc.
+ */
+ def getRDDStorageInfo : Array[RDDInfo] = {
+ StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this)
+ }
+
+ /**
+ * Return information about blocks stored in all of the slaves
+ */
+ def getExecutorStorageStatus : Array[StorageStatus] = {
+ env.blockManager.master.getStorageStatus
+ }
+
+ /**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
*/
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 7be6b9fa87..7389bee150 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -115,6 +115,10 @@ private[spark] class BlockManagerMaster(
askDriverWithReply[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}
+ def getStorageStatus: Array[StorageStatus] = {
+ askDriverWithReply[ArrayBuffer[StorageStatus]](GetStorageStatus).toArray
+ }
+
/** Stop the driver actor, called only on the Spark driver node */
def stop() {
if (driverActor != null) {
diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala
index eda320fa47..9e6721ec17 100644
--- a/core/src/main/scala/spark/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala
@@ -1,13 +1,10 @@
package spark.storage
import akka.actor.{ActorRef, ActorSystem}
-import akka.pattern.ask
import akka.util.Timeout
import akka.util.duration._
-import cc.spray.directives._
import cc.spray.typeconversion.TwirlSupport._
import cc.spray.Directives
-import scala.collection.mutable.ArrayBuffer
import spark.{Logging, SparkContext}
import spark.util.AkkaUtils
import spark.Utils
@@ -48,32 +45,26 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
path("") {
completeWith {
// Request the current storage status from the Master
- val future = blockManagerMaster ? GetStorageStatus
- future.map { status =>
- // Calculate macro-level statistics
- val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
- val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
- val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
- .reduceOption(_+_).getOrElse(0L)
- val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
- spark.storage.html.index.
- render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
- }
+ val storageStatusList = sc.getExecutorStorageStatus
+ // Calculate macro-level statistics
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
+ val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
+ .reduceOption(_+_).getOrElse(0L)
+ val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
+ spark.storage.html.index.
+ render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
}
} ~
path("rdd") {
parameter("id") { id =>
completeWith {
- val future = blockManagerMaster ? GetStorageStatus
- future.map { status =>
- val prefix = "rdd_" + id.toString
- val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
- val filteredStorageStatusList = StorageUtils.
- filterStorageStatusByPrefix(storageStatusList, prefix)
- val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
- spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
- }
+ val prefix = "rdd_" + id.toString
+ val storageStatusList = sc.getExecutorStorageStatus
+ val filteredStorageStatusList = StorageUtils.
+ filterStorageStatusByPrefix(storageStatusList, prefix)
+ val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
+ spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
}
}
} ~
diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala
index a10e3a95c6..5f72b67b2c 100644
--- a/core/src/main/scala/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/spark/storage/StorageUtils.scala
@@ -1,6 +1,6 @@
package spark.storage
-import spark.SparkContext
+import spark.{Utils, SparkContext}
import BlockManagerMasterActor.BlockStatus
private[spark]
@@ -22,8 +22,13 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
}
case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
- numPartitions: Int, memSize: Long, diskSize: Long)
-
+ numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long) {
+ override def toString = {
+ import Utils.memoryBytesToString
+ "RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id,
+ storageLevel.toString, numCachedPartitions, numPartitions, memoryBytesToString(memSize), memoryBytesToString(diskSize))
+ }
+}
/* Helper methods for storage-related objects */
private[spark]
@@ -38,8 +43,6 @@ object StorageUtils {
/* Given a list of BlockStatus objets, returns information for each RDD */
def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus],
sc: SparkContext) : Array[RDDInfo] = {
- // Find all RDD Blocks (ignore broadcast variables)
- val rddBlocks = infos.filterKeys(_.startsWith("rdd"))
// Group by rddId, ignore the partition name
val groupedRddBlocks = infos.groupBy { case(k, v) =>
@@ -56,10 +59,11 @@ object StorageUtils {
// Find the id of the RDD, e.g. rdd_1 => 1
val rddId = rddKey.split("_").last.toInt
// Get the friendly name for the rdd, if available.
- val rddName = Option(sc.persistentRdds(rddId).name).getOrElse(rddKey)
- val rddStorageLevel = sc.persistentRdds(rddId).getStorageLevel
-
- RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize)
+ val rdd = sc.persistentRdds(rddId)
+ val rddName = Option(rdd.name).getOrElse(rddKey)
+ val rddStorageLevel = rdd.getStorageLevel
+
+ RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, rdd.splits.size, memSize, diskSize)
}.toArray
}
@@ -75,4 +79,4 @@ object StorageUtils {
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/twirl/spark/storage/rdd.scala.html b/core/src/main/twirl/spark/storage/rdd.scala.html
index ac7f8c981f..d85addeb17 100644
--- a/core/src/main/twirl/spark/storage/rdd.scala.html
+++ b/core/src/main/twirl/spark/storage/rdd.scala.html
@@ -11,7 +11,11 @@
<strong>Storage Level:</strong>
@(rddInfo.storageLevel.description)
<li>
- <strong>Partitions:</strong>
+ <strong>Cached Partitions:</strong>
+ @(rddInfo.numCachedPartitions)
+ </li>
+ <li>
+ <strong>Total Partitions:</strong>
@(rddInfo.numPartitions)
</li>
<li>
diff --git a/core/src/main/twirl/spark/storage/rdd_table.scala.html b/core/src/main/twirl/spark/storage/rdd_table.scala.html
index af801cf229..a51e64aed0 100644
--- a/core/src/main/twirl/spark/storage/rdd_table.scala.html
+++ b/core/src/main/twirl/spark/storage/rdd_table.scala.html
@@ -6,7 +6,8 @@
<tr>
<th>RDD Name</th>
<th>Storage Level</th>
- <th>Partitions</th>
+ <th>Cached Partitions</th>
+ <th>Fraction Partitions Cached</th>
<th>Size in Memory</th>
<th>Size on Disk</th>
</tr>
@@ -21,7 +22,8 @@
</td>
<td>@(rdd.storageLevel.description)
</td>
- <td>@rdd.numPartitions</td>
+ <td>@rdd.numCachedPartitions</td>
+ <td>@(rdd.numCachedPartitions / rdd.numPartitions.toDouble)</td>
<td>@{Utils.memoryBytesToString(rdd.memSize)}</td>
<td>@{Utils.memoryBytesToString(rdd.diskSize)}</td>
</tr>