From 531ac136bf4ed333cb906ac229d986605a8207a6 Mon Sep 17 00:00:00 2001 From: Denny Date: Mon, 29 Oct 2012 14:53:47 -0700 Subject: BlockManager UI. --- core/src/main/scala/spark/RDD.scala | 8 ++ core/src/main/scala/spark/SparkContext.scala | 10 ++ .../scala/spark/storage/BlockManagerMaster.scala | 33 ++++++- .../main/scala/spark/storage/BlockManagerUI.scala | 102 +++++++++++++++++++++ core/src/main/scala/spark/util/AkkaUtils.scala | 5 +- core/src/main/twirl/spark/common/layout.scala.html | 35 +++++++ .../twirl/spark/deploy/common/layout.scala.html | 35 ------- .../twirl/spark/deploy/master/index.scala.html | 2 +- .../spark/deploy/master/job_details.scala.html | 2 +- .../twirl/spark/deploy/worker/index.scala.html | 2 +- core/src/main/twirl/spark/storage/index.scala.html | 28 ++++++ core/src/main/twirl/spark/storage/rdd.scala.html | 65 +++++++++++++ .../main/twirl/spark/storage/rdd_row.scala.html | 18 ++++ .../main/twirl/spark/storage/rdd_table.scala.html | 18 ++++ 14 files changed, 318 insertions(+), 45 deletions(-) create mode 100644 core/src/main/scala/spark/storage/BlockManagerUI.scala create mode 100644 core/src/main/twirl/spark/common/layout.scala.html delete mode 100644 core/src/main/twirl/spark/deploy/common/layout.scala.html create mode 100644 core/src/main/twirl/spark/storage/index.scala.html create mode 100644 core/src/main/twirl/spark/storage/rdd.scala.html create mode 100644 core/src/main/twirl/spark/storage/rdd_row.scala.html create mode 100644 core/src/main/twirl/spark/storage/rdd_table.scala.html diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 338dff4061..dc757dc6aa 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -107,6 +107,12 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial // Variables relating to persistence private var storageLevel: StorageLevel = StorageLevel.NONE + /* Assign a name to this RDD */ + def name(name: String) = { + sc.rddNames(this.id) = name + this + } + /** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. Can only be called once on each RDD. @@ -118,6 +124,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial "Cannot change storage level of an RDD after it was already assigned a level") } storageLevel = newLevel + // Register the RDD with the SparkContext + sc.persistentRdds(id) = this this } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index d26cccbfe1..71c9dcd017 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -1,6 +1,7 @@ package spark import java.io._ +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger import java.net.{URI, URLClassLoader} @@ -102,10 +103,19 @@ class SparkContext( isLocal) SparkEnv.set(env) + // Start the BlockManager UI + spark.storage.BlockManagerUI.start(SparkEnv.get.actorSystem, + SparkEnv.get.blockManager.master.masterActor, this) + // Used to store a URL for each static file/jar together with the file's local timestamp private[spark] val addedFiles = HashMap[String, Long]() private[spark] val addedJars = HashMap[String, Long]() + // Keeps track of all persisted RDDs + private[spark] val persistentRdds = new ConcurrentHashMap[Int, RDD[_]]() + // A HashMap for friendly RDD Names + private[spark] val rddNames = new ConcurrentHashMap[Int, String]() + // Add each JAR given through the constructor jars.foreach { addJar(_) } diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index ace27e758c..d12a16869a 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -3,7 +3,8 @@ package spark.storage import java.io._ import java.util.{HashMap => JHashMap} -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} import scala.util.Random import akka.actor._ @@ -90,6 +91,15 @@ case object StopBlockManagerMaster extends ToBlockManagerMaster private[spark] case object GetMemoryStatus extends ToBlockManagerMaster +private[spark] +case class GetStorageStatus extends ToBlockManagerMaster + +private[spark] +case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) + +private[spark] +case class StorageStatus(maxMem: Long, remainingMem: Long, blocks: Map[String, BlockStatus]) + private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { @@ -99,7 +109,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor val maxMem: Long) { private var _lastSeenMs = timeMs private var _remainingMem = maxMem - private val _blocks = new JHashMap[String, StorageLevel] + + private val _blocks = new JHashMap[String, BlockStatus] logInfo("Registering block manager %s:%d with %s RAM".format( blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem))) @@ -115,7 +126,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor if (_blocks.containsKey(blockId)) { // The block exists on the slave already. - val originalLevel: StorageLevel = _blocks.get(blockId) + val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel if (originalLevel.useMemory) { _remainingMem += memSize @@ -124,7 +135,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor if (storageLevel.isValid) { // isValid means it is either stored in-memory or on-disk. - _blocks.put(blockId, storageLevel) + _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize)) if (storageLevel.useMemory) { _remainingMem -= memSize logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format( @@ -137,7 +148,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor } } else if (_blocks.containsKey(blockId)) { // If isValid is not true, drop the block. - val originalLevel: StorageLevel = _blocks.get(blockId) + val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel _blocks.remove(blockId) if (originalLevel.useMemory) { _remainingMem += memSize @@ -152,6 +163,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor } } + def blocks: JHashMap[String, BlockStatus] = _blocks + def remainingMem: Long = _remainingMem def lastSeenMs: Long = _lastSeenMs @@ -198,6 +211,9 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor case GetMemoryStatus => getMemoryStatus + case GetStorageStatus => + getStorageStatus + case RemoveHost(host) => removeHost(host) sender ! true @@ -219,6 +235,13 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor sender ! res } + private def getStorageStatus() { + val res = blockManagerInfo.map { case(blockManagerId, info) => + StorageStatus(info.maxMem, info.remainingMem, info.blocks.asScala) + } + sender ! res + } + private def register(blockManagerId: BlockManagerId, maxMemSize: Long) { val startTimeMs = System.currentTimeMillis() val tmp = " " + blockManagerId + " " diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala new file mode 100644 index 0000000000..c168f60c35 --- /dev/null +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -0,0 +1,102 @@ +package spark.storage + +import akka.actor.{ActorRef, ActorSystem} +import akka.dispatch.Await +import akka.pattern.ask +import akka.util.Timeout +import akka.util.duration._ +import cc.spray.Directives +import cc.spray.directives._ +import cc.spray.typeconversion.TwirlSupport._ +import scala.collection.mutable.ArrayBuffer +import spark.{Logging, SparkContext, SparkEnv} +import spark.util.AkkaUtils + +private[spark] +object BlockManagerUI extends Logging { + + /* Starts the Web interface for the BlockManager */ + def start(actorSystem : ActorSystem, masterActor: ActorRef, sc: SparkContext) { + val webUIDirectives = new BlockManagerUIDirectives(actorSystem, masterActor, sc) + try { + logInfo("Starting BlockManager WebUI.") + val port = Option(System.getenv("BLOCKMANAGER_UI_PORT")).getOrElse("9080").toInt + AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, webUIDirectives.handler, "BlockManagerHTTPServer") + } catch { + case e: Exception => + logError("Failed to create BlockManager WebUI", e) + System.exit(1) + } + } + +} + +private[spark] +case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, numPartitions: Int, memSize: Long, diskSize: Long) + +private[spark] +class BlockManagerUIDirectives(val actorSystem: ActorSystem, master: ActorRef, sc: SparkContext) extends Directives { + + val STATIC_RESOURCE_DIR = "spark/deploy/static" + implicit val timeout = Timeout(1 seconds) + + val handler = { + + get { path("") { completeWith { + // Request the current storage status from the Master + val future = master ? GetStorageStatus + future.map { status => + val storageStati = status.asInstanceOf[ArrayBuffer[StorageStatus]] + + // Calculate macro-level statistics + val maxMem = storageStati.map(_.maxMem).reduce(_+_) + val remainingMem = storageStati.map(_.remainingMem).reduce(_+_) + val diskSpaceUsed = storageStati.flatMap(_.blocks.values.map(_.diskSize)) + .reduceOption(_+_).getOrElse(0L) + + // Filter out everything that's not and rdd. + val rddBlocks = storageStati.flatMap(_.blocks).filter { case(k,v) => k.startsWith("rdd") }.toMap + val rdds = rddInfoFromBlockStati(rddBlocks) + + spark.storage.html.index.render(maxMem, remainingMem, diskSpaceUsed, rdds.toList) + } + }}} ~ + get { path("rdd") { parameter("id") { id => { completeWith { + val future = master ? GetStorageStatus + future.map { status => + val prefix = "rdd_" + id.toString + + val storageStati = status.asInstanceOf[ArrayBuffer[StorageStatus]] + val rddBlocks = storageStati.flatMap(_.blocks).filter { case(k,v) => k.startsWith(prefix) }.toMap + val rddInfo = rddInfoFromBlockStati(rddBlocks).first + + spark.storage.html.rdd.render(rddInfo, rddBlocks) + + } + }}}}} ~ + pathPrefix("static") { + getFromResourceDirectory(STATIC_RESOURCE_DIR) + } + + } + + private def rddInfoFromBlockStati(infos: Map[String, BlockStatus]) : Array[RDDInfo] = { + infos.groupBy { case(k,v) => + // Group by rdd name, ignore the partition name + k.substring(0,k.lastIndexOf('_')) + }.map { case(k,v) => + val blockStati = v.map(_._2).toArray + // Add up memory and disk sizes + val tmp = blockStati.map { x => (x.memSize, x.diskSize)}.reduce { (x,y) => + (x._1 + y._1, x._2 + y._2) + } + // Get the friendly name for the rdd, if available. + // This is pretty hacky, is there a better way? + val rddId = k.split("_").last.toInt + val rddName : String = Option(sc.rddNames.get(rddId)).getOrElse(k) + val rddStorageLevel = sc.persistentRdds.get(rddId).getStorageLevel + RDDInfo(rddId, rddName, rddStorageLevel, blockStati.length, tmp._1, tmp._2) + }.toArray + } + +} diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index b466b5239c..13bc0f8ccc 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -50,12 +50,13 @@ private[spark] object AkkaUtils { * Creates a Spray HTTP server bound to a given IP and port with a given Spray Route object to * handle requests. Throws a SparkException if this fails. */ - def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route) { + def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route, + name: String = "HttpServer") { val ioWorker = new IoWorker(actorSystem).start() val httpService = actorSystem.actorOf(Props(new HttpService(route))) val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService))) val server = actorSystem.actorOf( - Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = "HttpServer") + Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = name) actorSystem.registerOnTermination { ioWorker.stop() } val timeout = 3.seconds val future = server.ask(HttpServer.Bind(ip, port))(timeout) diff --git a/core/src/main/twirl/spark/common/layout.scala.html b/core/src/main/twirl/spark/common/layout.scala.html new file mode 100644 index 0000000000..b9192060aa --- /dev/null +++ b/core/src/main/twirl/spark/common/layout.scala.html @@ -0,0 +1,35 @@ +@(title: String)(content: Html) + + + + + + + + + + @title + + + + +
+ + +
+
+ +

@title

+
+
+ +
+ + @content + +
+ + + \ No newline at end of file diff --git a/core/src/main/twirl/spark/deploy/common/layout.scala.html b/core/src/main/twirl/spark/deploy/common/layout.scala.html deleted file mode 100644 index b9192060aa..0000000000 --- a/core/src/main/twirl/spark/deploy/common/layout.scala.html +++ /dev/null @@ -1,35 +0,0 @@ -@(title: String)(content: Html) - - - - - - - - - - @title - - - - -
- - -
-
- -

@title

-
-
- -
- - @content - -
- - - \ No newline at end of file diff --git a/core/src/main/twirl/spark/deploy/master/index.scala.html b/core/src/main/twirl/spark/deploy/master/index.scala.html index 7562076b00..2e15fe2200 100644 --- a/core/src/main/twirl/spark/deploy/master/index.scala.html +++ b/core/src/main/twirl/spark/deploy/master/index.scala.html @@ -1,7 +1,7 @@ @(state: spark.deploy.MasterState) @import spark.deploy.master._ -@spark.deploy.common.html.layout(title = "Spark Master on " + state.uri) { +@spark.common.html.layout(title = "Spark Master on " + state.uri) {
diff --git a/core/src/main/twirl/spark/deploy/master/job_details.scala.html b/core/src/main/twirl/spark/deploy/master/job_details.scala.html index dcf41c28f2..d02a51b214 100644 --- a/core/src/main/twirl/spark/deploy/master/job_details.scala.html +++ b/core/src/main/twirl/spark/deploy/master/job_details.scala.html @@ -1,6 +1,6 @@ @(job: spark.deploy.master.JobInfo) -@spark.deploy.common.html.layout(title = "Job Details") { +@spark.common.html.layout(title = "Job Details") {
diff --git a/core/src/main/twirl/spark/deploy/worker/index.scala.html b/core/src/main/twirl/spark/deploy/worker/index.scala.html index 69746ed02c..40c2d81d77 100644 --- a/core/src/main/twirl/spark/deploy/worker/index.scala.html +++ b/core/src/main/twirl/spark/deploy/worker/index.scala.html @@ -1,6 +1,6 @@ @(worker: spark.deploy.WorkerState) -@spark.deploy.common.html.layout(title = "Spark Worker on " + worker.uri) { +@spark.common.html.layout(title = "Spark Worker on " + worker.uri) {
diff --git a/core/src/main/twirl/spark/storage/index.scala.html b/core/src/main/twirl/spark/storage/index.scala.html new file mode 100644 index 0000000000..fa7dad51ee --- /dev/null +++ b/core/src/main/twirl/spark/storage/index.scala.html @@ -0,0 +1,28 @@ +@(maxMem: Long, remainingMem: Long, diskSpaceUsed: Long, rdds: List[spark.storage.RDDInfo]) + +@spark.common.html.layout(title = "Storage Dashboard") { + + +
+
+
    +
  • Memory: + @{spark.Utils.memoryBytesToString(maxMem - remainingMem)} Used + (@{spark.Utils.memoryBytesToString(remainingMem)} Available)
  • +
  • Disk: @{spark.Utils.memoryBytesToString(diskSpaceUsed)} Used
  • +
+
+
+ +
+ + +
+
+

RDD Summary

+
+ @rdd_table(rdds) +
+
+ +} \ No newline at end of file diff --git a/core/src/main/twirl/spark/storage/rdd.scala.html b/core/src/main/twirl/spark/storage/rdd.scala.html new file mode 100644 index 0000000000..3a70326efe --- /dev/null +++ b/core/src/main/twirl/spark/storage/rdd.scala.html @@ -0,0 +1,65 @@ +@(rddInfo: spark.storage.RDDInfo, blocks: Map[String, spark.storage.BlockStatus]) + +@spark.common.html.layout(title = "RDD Info ") { + + +
+
+
    +
  • + Storage Level: + @(if (rddInfo.storageLevel.useDisk) "Disk" else "") + @(if (rddInfo.storageLevel.useMemory) "Memory" else "") + @(if (rddInfo.storageLevel.deserialized) "Deserialized" else "") + @(rddInfo.storageLevel.replication)x Replicated +
  • + Partitions: + @(rddInfo.numPartitions) +
  • +
  • + Memory Size: + @{spark.Utils.memoryBytesToString(rddInfo.memSize)} +
  • +
  • + Disk Size: + @{spark.Utils.memoryBytesToString(rddInfo.diskSize)} +
  • +
+
+
+ +
+ + +
+
+

RDD Summary

+
+ + + + + + + + + + + + + @blocks.map { case (k,v) => + + + + + + + } + +
Block NameStorage LevelSize in MemorySize on Disk
@k@v.storageLevel@{spark.Utils.memoryBytesToString(v.memSize)}@{spark.Utils.memoryBytesToString(v.diskSize)}
+ + +
+
+ +} \ No newline at end of file diff --git a/core/src/main/twirl/spark/storage/rdd_row.scala.html b/core/src/main/twirl/spark/storage/rdd_row.scala.html new file mode 100644 index 0000000000..3dd9944e3b --- /dev/null +++ b/core/src/main/twirl/spark/storage/rdd_row.scala.html @@ -0,0 +1,18 @@ +@(rdd: spark.storage.RDDInfo) + + + + + @rdd.name + + + + @(if (rdd.storageLevel.useDisk) "Disk" else "") + @(if (rdd.storageLevel.useMemory) "Memory" else "") + @(if (rdd.storageLevel.deserialized) "Deserialized" else "") + @(rdd.storageLevel.replication)x Replicated + + @rdd.numPartitions + @{spark.Utils.memoryBytesToString(rdd.memSize)} + @{spark.Utils.memoryBytesToString(rdd.diskSize)} + \ No newline at end of file diff --git a/core/src/main/twirl/spark/storage/rdd_table.scala.html b/core/src/main/twirl/spark/storage/rdd_table.scala.html new file mode 100644 index 0000000000..24f55ccefb --- /dev/null +++ b/core/src/main/twirl/spark/storage/rdd_table.scala.html @@ -0,0 +1,18 @@ +@(rdds: List[spark.storage.RDDInfo]) + + + + + + + + + + + + + @for(rdd <- rdds) { + @rdd_row(rdd) + } + +
RDD NameStorage LevelPartitionsSize in MemorySize on Disk
\ No newline at end of file -- cgit v1.2.3 From eb95212f4d24dbcd734922f39d51e6fdeaeb4c8b Mon Sep 17 00:00:00 2001 From: Denny Date: Mon, 29 Oct 2012 14:57:32 -0700 Subject: code Formatting --- core/src/main/scala/spark/storage/BlockManagerUI.scala | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index c168f60c35..635c096c87 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -21,7 +21,8 @@ object BlockManagerUI extends Logging { try { logInfo("Starting BlockManager WebUI.") val port = Option(System.getenv("BLOCKMANAGER_UI_PORT")).getOrElse("9080").toInt - AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, webUIDirectives.handler, "BlockManagerHTTPServer") + AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, + webUIDirectives.handler, "BlockManagerHTTPServer") } catch { case e: Exception => logError("Failed to create BlockManager WebUI", e) @@ -32,10 +33,12 @@ object BlockManagerUI extends Logging { } private[spark] -case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, numPartitions: Int, memSize: Long, diskSize: Long) +case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, + numPartitions: Int, memSize: Long, diskSize: Long) private[spark] -class BlockManagerUIDirectives(val actorSystem: ActorSystem, master: ActorRef, sc: SparkContext) extends Directives { +class BlockManagerUIDirectives(val actorSystem: ActorSystem, master: ActorRef, + sc: SparkContext) extends Directives { val STATIC_RESOURCE_DIR = "spark/deploy/static" implicit val timeout = Timeout(1 seconds) @@ -55,7 +58,9 @@ class BlockManagerUIDirectives(val actorSystem: ActorSystem, master: ActorRef, s .reduceOption(_+_).getOrElse(0L) // Filter out everything that's not and rdd. - val rddBlocks = storageStati.flatMap(_.blocks).filter { case(k,v) => k.startsWith("rdd") }.toMap + val rddBlocks = storageStati.flatMap(_.blocks).filter { case(k,v) => + k.startsWith("rdd") + }.toMap val rdds = rddInfoFromBlockStati(rddBlocks) spark.storage.html.index.render(maxMem, remainingMem, diskSpaceUsed, rdds.toList) @@ -67,7 +72,9 @@ class BlockManagerUIDirectives(val actorSystem: ActorSystem, master: ActorRef, s val prefix = "rdd_" + id.toString val storageStati = status.asInstanceOf[ArrayBuffer[StorageStatus]] - val rddBlocks = storageStati.flatMap(_.blocks).filter { case(k,v) => k.startsWith(prefix) }.toMap + val rddBlocks = storageStati.flatMap(_.blocks).filter { case(k,v) => + k.startsWith(prefix) + }.toMap val rddInfo = rddInfoFromBlockStati(rddBlocks).first spark.storage.html.rdd.render(rddInfo, rddBlocks) -- cgit v1.2.3 From ceec1a1a6abb1fd03316e7fcc532d7e121d5bf65 Mon Sep 17 00:00:00 2001 From: Denny Date: Mon, 29 Oct 2012 15:03:01 -0700 Subject: Nicer storage level format on RDD page --- core/src/main/twirl/spark/storage/rdd.scala.html | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/twirl/spark/storage/rdd.scala.html b/core/src/main/twirl/spark/storage/rdd.scala.html index 3a70326efe..075289c826 100644 --- a/core/src/main/twirl/spark/storage/rdd.scala.html +++ b/core/src/main/twirl/spark/storage/rdd.scala.html @@ -50,7 +50,12 @@ @blocks.map { case (k,v) => @k - @v.storageLevel + + @(if (v.storageLevel.useDisk) "Disk" else "") + @(if (v.storageLevel.useMemory) "Memory" else "") + @(if (v.storageLevel.deserialized) "Deserialized" else "") + @(v.storageLevel.replication)x Replicated + @{spark.Utils.memoryBytesToString(v.memSize)} @{spark.Utils.memoryBytesToString(v.diskSize)} -- cgit v1.2.3 From 4a1be7e0dbf0031d85b91dc1132fe101d87ba097 Mon Sep 17 00:00:00 2001 From: Denny Date: Mon, 12 Nov 2012 10:56:35 -0800 Subject: Refactor BlockManager UI and adding worker details. --- core/src/main/scala/spark/RDD.scala | 7 +- core/src/main/scala/spark/SparkContext.scala | 2 - .../scala/spark/storage/BlockManagerMaster.scala | 11 ++- .../main/scala/spark/storage/BlockManagerUI.scala | 51 ++++---------- .../main/scala/spark/storage/StorageLevel.scala | 9 +++ .../main/scala/spark/storage/StorageUtils.scala | 78 ++++++++++++++++++++++ core/src/main/twirl/spark/storage/index.scala.html | 22 ++++-- core/src/main/twirl/spark/storage/rdd.scala.html | 35 ++++++---- .../main/twirl/spark/storage/rdd_row.scala.html | 18 ----- .../main/twirl/spark/storage/rdd_table.scala.html | 16 ++++- .../twirl/spark/storage/worker_table.scala.html | 24 +++++++ 11 files changed, 186 insertions(+), 87 deletions(-) create mode 100644 core/src/main/scala/spark/storage/StorageUtils.scala delete mode 100644 core/src/main/twirl/spark/storage/rdd_row.scala.html create mode 100644 core/src/main/twirl/spark/storage/worker_table.scala.html diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index dc757dc6aa..3669bda2d2 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -86,6 +86,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial @transient val dependencies: List[Dependency[_]] // Methods available on all RDDs: + + // A friendly name for this RDD + var name: String = null /** Record user function generating this RDD. */ private[spark] val origin = Utils.getSparkCallSite @@ -108,8 +111,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial private var storageLevel: StorageLevel = StorageLevel.NONE /* Assign a name to this RDD */ - def name(name: String) = { - sc.rddNames(this.id) = name + def setName(_name: String) = { + name = _name this } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 71c9dcd017..7ea0f6f9e0 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -113,8 +113,6 @@ class SparkContext( // Keeps track of all persisted RDDs private[spark] val persistentRdds = new ConcurrentHashMap[Int, RDD[_]]() - // A HashMap for friendly RDD Names - private[spark] val rddNames = new ConcurrentHashMap[Int, String]() // Add each JAR given through the constructor jars.foreach { addJar(_) } diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 3fc9b629c1..beafdda9d1 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -4,7 +4,7 @@ import java.io._ import java.util.{HashMap => JHashMap} import scala.collection.JavaConverters._ -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.util.Random import akka.actor._ @@ -95,10 +95,7 @@ private[spark] case class GetStorageStatus extends ToBlockManagerMaster private[spark] -case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) - -private[spark] -case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, remainingMem: Long, blocks: Map[String, BlockStatus]) +case class BlockStatus(blockManagerId: BlockManagerId, storageLevel: StorageLevel, memSize: Long, diskSize: Long) private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { @@ -135,7 +132,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor if (storageLevel.isValid) { // isValid means it is either stored in-memory or on-disk. - _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize)) + _blocks.put(blockId, BlockStatus(blockManagerId, storageLevel, memSize, diskSize)) if (storageLevel.useMemory) { _remainingMem -= memSize logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format( @@ -237,7 +234,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor private def getStorageStatus() { val res = blockManagerInfo.map { case(blockManagerId, info) => - StorageStatus(blockManagerId, info.maxMem, info.remainingMem, info.blocks.asScala) + StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala.toMap) } sender ! res } diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index 635c096c87..35cbd59280 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -12,6 +12,7 @@ import scala.collection.mutable.ArrayBuffer import spark.{Logging, SparkContext, SparkEnv} import spark.util.AkkaUtils + private[spark] object BlockManagerUI extends Logging { @@ -32,9 +33,6 @@ object BlockManagerUI extends Logging { } -private[spark] -case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, - numPartitions: Int, memSize: Long, diskSize: Long) private[spark] class BlockManagerUIDirectives(val actorSystem: ActorSystem, master: ActorRef, @@ -49,21 +47,17 @@ class BlockManagerUIDirectives(val actorSystem: ActorSystem, master: ActorRef, // Request the current storage status from the Master val future = master ? GetStorageStatus future.map { status => - val storageStati = status.asInstanceOf[ArrayBuffer[StorageStatus]] + val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray // Calculate macro-level statistics - val maxMem = storageStati.map(_.maxMem).reduce(_+_) - val remainingMem = storageStati.map(_.remainingMem).reduce(_+_) - val diskSpaceUsed = storageStati.flatMap(_.blocks.values.map(_.diskSize)) + val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) + val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) + val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) .reduceOption(_+_).getOrElse(0L) - // Filter out everything that's not and rdd. - val rddBlocks = storageStati.flatMap(_.blocks).filter { case(k,v) => - k.startsWith("rdd") - }.toMap - val rdds = rddInfoFromBlockStati(rddBlocks) + val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) - spark.storage.html.index.render(maxMem, remainingMem, diskSpaceUsed, rdds.toList) + spark.storage.html.index.render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList) } }}} ~ get { path("rdd") { parameter("id") { id => { completeWith { @@ -71,13 +65,13 @@ class BlockManagerUIDirectives(val actorSystem: ActorSystem, master: ActorRef, future.map { status => val prefix = "rdd_" + id.toString - val storageStati = status.asInstanceOf[ArrayBuffer[StorageStatus]] - val rddBlocks = storageStati.flatMap(_.blocks).filter { case(k,v) => - k.startsWith(prefix) - }.toMap - val rddInfo = rddInfoFromBlockStati(rddBlocks).first - spark.storage.html.rdd.render(rddInfo, rddBlocks) + val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray + val filteredStorageStatusList = StorageUtils.filterStorageStatusByPrefix(storageStatusList, prefix) + + val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).first + + spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList) } }}}}} ~ @@ -87,23 +81,6 @@ class BlockManagerUIDirectives(val actorSystem: ActorSystem, master: ActorRef, } - private def rddInfoFromBlockStati(infos: Map[String, BlockStatus]) : Array[RDDInfo] = { - infos.groupBy { case(k,v) => - // Group by rdd name, ignore the partition name - k.substring(0,k.lastIndexOf('_')) - }.map { case(k,v) => - val blockStati = v.map(_._2).toArray - // Add up memory and disk sizes - val tmp = blockStati.map { x => (x.memSize, x.diskSize)}.reduce { (x,y) => - (x._1 + y._1, x._2 + y._2) - } - // Get the friendly name for the rdd, if available. - // This is pretty hacky, is there a better way? - val rddId = k.split("_").last.toInt - val rddName : String = Option(sc.rddNames.get(rddId)).getOrElse(k) - val rddStorageLevel = sc.persistentRdds.get(rddId).getStorageLevel - RDDInfo(rddId, rddName, rddStorageLevel, blockStati.length, tmp._1, tmp._2) - }.toArray - } + } diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala index c497f03e0c..97d8c7566d 100644 --- a/core/src/main/scala/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/spark/storage/StorageLevel.scala @@ -68,6 +68,15 @@ class StorageLevel( override def toString: String = "StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication) + + def description : String = { + var result = "" + result += (if (useDisk) "Disk " else "") + result += (if (useMemory) "Memory " else "") + result += (if (deserialized) "Deserialized " else "Serialized") + result += "%sx Replicated".format(replication) + result + } } object StorageLevel { diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala new file mode 100644 index 0000000000..ebc7390ee5 --- /dev/null +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -0,0 +1,78 @@ +package spark.storage + +import spark.SparkContext + +private[spark] +case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, + blocks: Map[String, BlockStatus]) { + + def memUsed(blockPrefix: String = "") = { + blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.memSize). + reduceOption(_+_).getOrElse(0l) + } + + def diskUsed(blockPrefix: String = "") = { + blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.diskSize). + reduceOption(_+_).getOrElse(0l) + } + + def memRemaining : Long = maxMem - memUsed() + +} + +case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, + numPartitions: Int, memSize: Long, diskSize: Long, locations: Array[BlockManagerId]) + + +/* Helper methods for storage-related objects */ +private[spark] +object StorageUtils { + + /* Given the current storage status of the BlockManager, returns information for each RDD */ + def rddInfoFromStorageStatus(storageStatusList: Array[StorageStatus], + sc: SparkContext) : Array[RDDInfo] = { + rddInfoFromBlockStatusList(storageStatusList.flatMap(_.blocks).toMap, sc) + } + + /* Given a list of BlockStatus objets, returns information for each RDD */ + def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus], + sc: SparkContext) : Array[RDDInfo] = { + // Find all RDD Blocks (ignore broadcast variables) + val rddBlocks = infos.filterKeys(_.startsWith("rdd")) + + // Group by rddId, ignore the partition name + val groupedRddBlocks = infos.groupBy { case(k, v) => + k.substring(0,k.lastIndexOf('_')) + }.mapValues(_.values.toArray) + + // For each RDD, generate an RDDInfo object + groupedRddBlocks.map { case(rddKey, rddBlocks) => + + // Add up memory and disk sizes + val memSize = rddBlocks.map(_.memSize).reduce(_ + _) + val diskSize = rddBlocks.map(_.diskSize).reduce(_ + _) + + // Find the id of the RDD, e.g. rdd_1 => 1 + val rddId = rddKey.split("_").last.toInt + // Get the friendly name for the rdd, if available. + val rddName = Option(sc.persistentRdds.get(rddId).name).getOrElse(rddKey) + val rddStorageLevel = sc.persistentRdds.get(rddId).getStorageLevel + + RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize, + rddBlocks.map(_.blockManagerId)) + }.toArray + } + + /* Removes all BlockStatus object that are not part of a block prefix */ + def filterStorageStatusByPrefix(storageStatusList: Array[StorageStatus], + prefix: String) : Array[StorageStatus] = { + + storageStatusList.map { status => + val newBlocks = status.blocks.filterKeys(_.startsWith(prefix)) + //val newRemainingMem = status.maxMem - newBlocks.values.map(_.memSize).reduce(_ + _) + StorageStatus(status.blockManagerId, status.maxMem, newBlocks) + } + + } + +} \ No newline at end of file diff --git a/core/src/main/twirl/spark/storage/index.scala.html b/core/src/main/twirl/spark/storage/index.scala.html index fa7dad51ee..2b337f6133 100644 --- a/core/src/main/twirl/spark/storage/index.scala.html +++ b/core/src/main/twirl/spark/storage/index.scala.html @@ -1,4 +1,5 @@ -@(maxMem: Long, remainingMem: Long, diskSpaceUsed: Long, rdds: List[spark.storage.RDDInfo]) +@(maxMem: Long, remainingMem: Long, diskSpaceUsed: Long, rdds: Array[spark.storage.RDDInfo], storageStatusList: Array[spark.storage.StorageStatus]) +@import spark.Utils @spark.common.html.layout(title = "Storage Dashboard") { @@ -7,16 +8,16 @@
  • Memory: - @{spark.Utils.memoryBytesToString(maxMem - remainingMem)} Used - (@{spark.Utils.memoryBytesToString(remainingMem)} Available)
  • -
  • Disk: @{spark.Utils.memoryBytesToString(diskSpaceUsed)} Used
  • + @{Utils.memoryBytesToString(maxMem - remainingMem)} Used + (@{Utils.memoryBytesToString(remainingMem)} Available) +
  • Disk: @{Utils.memoryBytesToString(diskSpaceUsed)} Used

- +

RDD Summary

@@ -25,4 +26,15 @@
+
+ + +
+
+

Worker Summary

+
+ @worker_table(storageStatusList) +
+
+ } \ No newline at end of file diff --git a/core/src/main/twirl/spark/storage/rdd.scala.html b/core/src/main/twirl/spark/storage/rdd.scala.html index 075289c826..ac7f8c981f 100644 --- a/core/src/main/twirl/spark/storage/rdd.scala.html +++ b/core/src/main/twirl/spark/storage/rdd.scala.html @@ -1,4 +1,5 @@ -@(rddInfo: spark.storage.RDDInfo, blocks: Map[String, spark.storage.BlockStatus]) +@(rddInfo: spark.storage.RDDInfo, storageStatusList: Array[spark.storage.StorageStatus]) +@import spark.Utils @spark.common.html.layout(title = "RDD Info ") { @@ -8,21 +9,18 @@
  • Storage Level: - @(if (rddInfo.storageLevel.useDisk) "Disk" else "") - @(if (rddInfo.storageLevel.useMemory) "Memory" else "") - @(if (rddInfo.storageLevel.deserialized) "Deserialized" else "") - @(rddInfo.storageLevel.replication)x Replicated + @(rddInfo.storageLevel.description)
  • Partitions: @(rddInfo.numPartitions)
  • Memory Size: - @{spark.Utils.memoryBytesToString(rddInfo.memSize)} + @{Utils.memoryBytesToString(rddInfo.memSize)}
  • Disk Size: - @{spark.Utils.memoryBytesToString(rddInfo.diskSize)} + @{Utils.memoryBytesToString(rddInfo.diskSize)}
@@ -36,6 +34,7 @@

RDD Summary


+ @@ -47,17 +46,14 @@ - @blocks.map { case (k,v) => + @storageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1).map { case (k,v) => - - + + } @@ -67,4 +63,15 @@ +
+ + +
+
+

Worker Summary

+
+ @worker_table(storageStatusList, "rdd_" + rddInfo.id ) +
+
+ } \ No newline at end of file diff --git a/core/src/main/twirl/spark/storage/rdd_row.scala.html b/core/src/main/twirl/spark/storage/rdd_row.scala.html deleted file mode 100644 index 3dd9944e3b..0000000000 --- a/core/src/main/twirl/spark/storage/rdd_row.scala.html +++ /dev/null @@ -1,18 +0,0 @@ -@(rdd: spark.storage.RDDInfo) - - - - - - - - \ No newline at end of file diff --git a/core/src/main/twirl/spark/storage/rdd_table.scala.html b/core/src/main/twirl/spark/storage/rdd_table.scala.html index 24f55ccefb..af801cf229 100644 --- a/core/src/main/twirl/spark/storage/rdd_table.scala.html +++ b/core/src/main/twirl/spark/storage/rdd_table.scala.html @@ -1,4 +1,5 @@ -@(rdds: List[spark.storage.RDDInfo]) +@(rdds: Array[spark.storage.RDDInfo]) +@import spark.Utils
@k - @(if (v.storageLevel.useDisk) "Disk" else "") - @(if (v.storageLevel.useMemory) "Memory" else "") - @(if (v.storageLevel.deserialized) "Deserialized" else "") - @(v.storageLevel.replication)x Replicated + @(v.storageLevel.description) @{spark.Utils.memoryBytesToString(v.memSize)}@{spark.Utils.memoryBytesToString(v.diskSize)}@{Utils.memoryBytesToString(v.memSize)}@{Utils.memoryBytesToString(v.diskSize)}
- - @rdd.name - - - @(if (rdd.storageLevel.useDisk) "Disk" else "") - @(if (rdd.storageLevel.useMemory) "Memory" else "") - @(if (rdd.storageLevel.deserialized) "Deserialized" else "") - @(rdd.storageLevel.replication)x Replicated - @rdd.numPartitions@{spark.Utils.memoryBytesToString(rdd.memSize)}@{spark.Utils.memoryBytesToString(rdd.diskSize)}
@@ -12,7 +13,18 @@ @for(rdd <- rdds) { - @rdd_row(rdd) + + + + + + + }
+ + @rdd.name + + @(rdd.storageLevel.description) + @rdd.numPartitions@{Utils.memoryBytesToString(rdd.memSize)}@{Utils.memoryBytesToString(rdd.diskSize)}
\ No newline at end of file diff --git a/core/src/main/twirl/spark/storage/worker_table.scala.html b/core/src/main/twirl/spark/storage/worker_table.scala.html new file mode 100644 index 0000000000..d54b8de4cc --- /dev/null +++ b/core/src/main/twirl/spark/storage/worker_table.scala.html @@ -0,0 +1,24 @@ +@(workersStatusList: Array[spark.storage.StorageStatus], prefix: String = "") +@import spark.Utils + + + + + + + + + + + @for(status <- workersStatusList) { + + + + + + } + +
HostMemory UsageDisk Usage
@(status.blockManagerId.ip + ":" + status.blockManagerId.port) + @(Utils.memoryBytesToString(status.memUsed(prefix))) + (@(Utils.memoryBytesToString(status.memRemaining)) Total Available) + @(Utils.memoryBytesToString(status.diskUsed(prefix)))
\ No newline at end of file -- cgit v1.2.3 From 50e2b23927956c14db40093d31bc80892764006a Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 22 Jan 2013 09:27:33 -0800 Subject: Fix up some problems from the merge --- .../main/scala/spark/storage/BlockManagerMasterActor.scala | 11 +++++++++++ core/src/main/scala/spark/storage/BlockManagerMessages.scala | 3 +++ core/src/main/scala/spark/storage/StorageUtils.scala | 8 ++++---- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index f4d026da33..c945c34c71 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -68,6 +68,9 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { case GetMemoryStatus => getMemoryStatus + case GetStorageStatus => + getStorageStatus + case RemoveBlock(blockId) => removeBlock(blockId) @@ -177,6 +180,14 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { sender ! res } + private def getStorageStatus() { + val res = blockManagerInfo.map { case(blockManagerId, info) => + import collection.JavaConverters._ + StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala.toMap) + } + sender ! res + } + private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { val startTimeMs = System.currentTimeMillis() val tmp = " " + blockManagerId + " " diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala index d73a9b790f..3a381fd385 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala @@ -100,3 +100,6 @@ case object GetMemoryStatus extends ToBlockManagerMaster private[spark] case object ExpireDeadHosts extends ToBlockManagerMaster + +private[spark] +case object GetStorageStatus extends ToBlockManagerMaster \ No newline at end of file diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index ebc7390ee5..63ad5c125b 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -1,6 +1,7 @@ package spark.storage import spark.SparkContext +import BlockManagerMasterActor.BlockStatus private[spark] case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, @@ -20,8 +21,8 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, } -case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, - numPartitions: Int, memSize: Long, diskSize: Long, locations: Array[BlockManagerId]) +case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, + numPartitions: Int, memSize: Long, diskSize: Long) /* Helper methods for storage-related objects */ @@ -58,8 +59,7 @@ object StorageUtils { val rddName = Option(sc.persistentRdds.get(rddId).name).getOrElse(rddKey) val rddStorageLevel = sc.persistentRdds.get(rddId).getStorageLevel - RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize, - rddBlocks.map(_.blockManagerId)) + RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize) }.toArray } -- cgit v1.2.3 From 539491bbc333834b9ae2721ae6cf3524cefb91ea Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 25 Jan 2013 09:29:59 -0800 Subject: code reformatting --- core/src/main/scala/spark/RDD.scala | 4 ++-- core/src/main/scala/spark/storage/BlockManagerUI.scala | 8 +++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 870cc5ca78..4fcab9279a 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -94,7 +94,7 @@ abstract class RDD[T: ClassManifest]( /** How this RDD depends on any parent RDDs. */ protected def getDependencies(): List[Dependency[_]] = dependencies_ - // A friendly name for this RDD + /** A friendly name for this RDD */ var name: String = null /** Optionally overridden by subclasses to specify placement preferences. */ @@ -111,7 +111,7 @@ abstract class RDD[T: ClassManifest]( /** A unique ID for this RDD (within its SparkContext). */ val id = sc.newRddId() - /* Assign a name to this RDD */ + /** Assign a name to this RDD */ def setName(_name: String) = { name = _name this diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index 35cbd59280..1003cc7a61 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -57,7 +57,8 @@ class BlockManagerUIDirectives(val actorSystem: ActorSystem, master: ActorRef, val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) - spark.storage.html.index.render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList) + spark.storage.html.index. + render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList) } }}} ~ get { path("rdd") { parameter("id") { id => { completeWith { @@ -67,9 +68,10 @@ class BlockManagerUIDirectives(val actorSystem: ActorSystem, master: ActorRef, val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray - val filteredStorageStatusList = StorageUtils.filterStorageStatusByPrefix(storageStatusList, prefix) + val filteredStorageStatusList = StorageUtils. + filterStorageStatusByPrefix(storageStatusList, prefix) - val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).first + val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList) -- cgit v1.2.3 From 1cadaa164e9f078e4ca483edb9db7fd5507c9e64 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 25 Jan 2013 09:30:21 -0800 Subject: switch to TimeStampedHashMap for storing persistent Rdds --- core/src/main/scala/spark/SparkContext.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index d994648899..10ceeb3028 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -44,6 +44,7 @@ import scheduler.{ResultTask, ShuffleMapTask, DAGScheduler, TaskScheduler} import spark.scheduler.local.LocalScheduler import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler} import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} +import util.TimeStampedHashMap /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -110,7 +111,7 @@ class SparkContext( private[spark] val addedJars = HashMap[String, Long]() // Keeps track of all persisted RDDs - private[spark] val persistentRdds = new ConcurrentHashMap[Int, RDD[_]]() + private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]() // Add each JAR given through the constructor jars.foreach { addJar(_) } -- cgit v1.2.3 From a1d9d1767d821c1e25e485e32d9356b12aba6a01 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 25 Jan 2013 10:05:26 -0800 Subject: fixup 1cadaa1, changed api of map --- core/src/main/scala/spark/storage/StorageUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index 63ad5c125b..a10e3a95c6 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -56,8 +56,8 @@ object StorageUtils { // Find the id of the RDD, e.g. rdd_1 => 1 val rddId = rddKey.split("_").last.toInt // Get the friendly name for the rdd, if available. - val rddName = Option(sc.persistentRdds.get(rddId).name).getOrElse(rddKey) - val rddStorageLevel = sc.persistentRdds.get(rddId).getStorageLevel + val rddName = Option(sc.persistentRdds(rddId).name).getOrElse(rddKey) + val rddStorageLevel = sc.persistentRdds(rddId).getStorageLevel RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize) }.toArray -- cgit v1.2.3 From 49c05608f5f27354da120e2367b6d4a63ec38948 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 25 Jan 2013 17:04:16 -0800 Subject: add metadatacleaner for persisentRdd map --- core/src/main/scala/spark/SparkContext.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 10ceeb3028..bff54dbdd1 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -44,7 +44,7 @@ import scheduler.{ResultTask, ShuffleMapTask, DAGScheduler, TaskScheduler} import spark.scheduler.local.LocalScheduler import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler} import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} -import util.TimeStampedHashMap +import util.{MetadataCleaner, TimeStampedHashMap} /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -113,6 +113,9 @@ class SparkContext( // Keeps track of all persisted RDDs private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]() + private[spark] val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup) + + // Add each JAR given through the constructor jars.foreach { addJar(_) } @@ -512,6 +515,7 @@ class SparkContext( /** Shut down the SparkContext. */ def stop() { if (dagScheduler != null) { + metadataCleaner.cancel() dagScheduler.stop() dagScheduler = null taskScheduler = null @@ -654,6 +658,12 @@ class SparkContext( /** Register a new RDD, returning its RDD ID */ private[spark] def newRddId(): Int = nextRddId.getAndIncrement() + + private[spark] def cleanup(cleanupTime: Long) { + var sizeBefore = persistentRdds.size + persistentRdds.clearOldValues(cleanupTime) + logInfo("idToStage " + sizeBefore + " --> " + persistentRdds.size) + } } /** -- cgit v1.2.3