aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorImran Rashid <imran@quantifind.com>2013-01-22 18:48:43 -0800
committerImran Rashid <imran@quantifind.com>2013-01-28 20:07:53 -0800
commita423ee546c389b5ce0d2117299456712370d7ad1 (patch)
treefcabff7633c872c3dbbfd791a77f772640d8665d
parentf03d9760fd8ac67fd0865cb355ba75d2eff507fe (diff)
downloadspark-a423ee546c389b5ce0d2117299456712370d7ad1.tar.gz
spark-a423ee546c389b5ce0d2117299456712370d7ad1.tar.bz2
spark-a423ee546c389b5ce0d2117299456712370d7ad1.zip
expose RDD & storage info directly via SparkContext
-rw-r--r--core/src/main/scala/spark/SparkContext.scala16
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala4
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerUI.scala39
-rw-r--r--core/src/main/scala/spark/storage/StorageUtils.scala10
4 files changed, 41 insertions, 28 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 77036c1275..be992250a9 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -46,6 +46,7 @@ import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, C
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import storage.BlockManagerUI
import util.{MetadataCleaner, TimeStampedHashMap}
+import storage.{StorageStatus, StorageUtils, RDDInfo}
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -474,6 +475,21 @@ class SparkContext(
}
/**
+ * Return information about what RDDs are cached, if they are in mem or on disk, how much space
+ * they take, etc.
+ */
+ def getRDDStorageInfo : Array[RDDInfo] = {
+ StorageUtils.rddInfoFromStorageStatus(getSlavesStorageStatus, this)
+ }
+
+ /**
+ * Return information about blocks stored in all of the slaves
+ */
+ def getSlavesStorageStatus : Array[StorageStatus] = {
+ env.blockManager.master.getStorageStatus
+ }
+
+ /**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
*/
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 55ff1dde9c..c7ee76f0b7 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -118,6 +118,10 @@ private[spark] class BlockManagerMaster(
askMasterWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}
+ def getStorageStatus: Array[StorageStatus] = {
+ askMasterWithRetry[ArrayBuffer[StorageStatus]](GetStorageStatus).toArray
+ }
+
/** Stop the master actor, called only on the Spark master node */
def stop() {
if (masterActor != null) {
diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala
index eda320fa47..52f6d1b657 100644
--- a/core/src/main/scala/spark/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala
@@ -1,13 +1,10 @@
package spark.storage
import akka.actor.{ActorRef, ActorSystem}
-import akka.pattern.ask
import akka.util.Timeout
import akka.util.duration._
-import cc.spray.directives._
import cc.spray.typeconversion.TwirlSupport._
import cc.spray.Directives
-import scala.collection.mutable.ArrayBuffer
import spark.{Logging, SparkContext}
import spark.util.AkkaUtils
import spark.Utils
@@ -48,32 +45,26 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
path("") {
completeWith {
// Request the current storage status from the Master
- val future = blockManagerMaster ? GetStorageStatus
- future.map { status =>
- // Calculate macro-level statistics
- val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
- val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
- val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
- .reduceOption(_+_).getOrElse(0L)
- val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
- spark.storage.html.index.
- render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
- }
+ val storageStatusList = sc.getSlavesStorageStatus
+ // Calculate macro-level statistics
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
+ val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
+ .reduceOption(_+_).getOrElse(0L)
+ val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
+ spark.storage.html.index.
+ render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
}
} ~
path("rdd") {
parameter("id") { id =>
completeWith {
- val future = blockManagerMaster ? GetStorageStatus
- future.map { status =>
- val prefix = "rdd_" + id.toString
- val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
- val filteredStorageStatusList = StorageUtils.
- filterStorageStatusByPrefix(storageStatusList, prefix)
- val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
- spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
- }
+ val prefix = "rdd_" + id.toString
+ val storageStatusList = sc.getSlavesStorageStatus
+ val filteredStorageStatusList = StorageUtils.
+ filterStorageStatusByPrefix(storageStatusList, prefix)
+ val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
+ spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
}
}
} ~
diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala
index a10e3a95c6..d6e33c8619 100644
--- a/core/src/main/scala/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/spark/storage/StorageUtils.scala
@@ -56,9 +56,11 @@ object StorageUtils {
// Find the id of the RDD, e.g. rdd_1 => 1
val rddId = rddKey.split("_").last.toInt
// Get the friendly name for the rdd, if available.
- val rddName = Option(sc.persistentRdds(rddId).name).getOrElse(rddKey)
- val rddStorageLevel = sc.persistentRdds(rddId).getStorageLevel
-
+ val rdd = sc.persistentRdds(rddId)
+ val rddName = Option(rdd.name).getOrElse(rddKey)
+ val rddStorageLevel = rdd.getStorageLevel
+ //TODO get total number of partitions in rdd
+
RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize)
}.toArray
}
@@ -75,4 +77,4 @@ object StorageUtils {
}
-} \ No newline at end of file
+}