aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/storage/BlockManagerUI.scala
blob: 631455abcd008e62cf6587aa66a915efd06ba9c5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package spark.storage

import akka.actor.{ActorRef, ActorSystem}

import akka.util.Timeout
import scala.concurrent.duration._
import spray.httpx.TwirlSupport._
import spray.routing.Directives

import spark.{Logging, SparkContext}
import spark.util.AkkaUtils
import spark.Utils


/**
 * Web UI server for the BlockManager inside each SparkContext.
 */
private[spark]
class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, sc: SparkContext)
  extends Directives with Logging {

  implicit val implicitActorSystem = actorSystem
  val STATIC_RESOURCE_DIR          = "spark/deploy/static"

  implicit val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
  val host = Utils.localHostName()
  val port = if (System.getProperty("spark.ui.port") != null) {
    System.getProperty("spark.ui.port").toInt
  } else {
    // TODO: Unfortunately, it's not possible to pass port 0 to spray and figure out which
    // random port it bound to, so we have to try to find a local one by creating a socket.
    Utils.findFreePort()
  }

  /** Start a HTTP server to run the Web interface */
  def start() {
    try {
      AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, handler, "BlockManagerHTTPServer")
      logInfo("Started BlockManager web UI at http://%s:%d".format(host, port))
    } catch {
      case e: Exception =>
        logError("Failed to create BlockManager WebUI", e)
        System.exit(1)
    }
  }

  val handler = {
    get {
      path("") {
        complete {
          // Request the current storage status from the Master
          val storageStatusList = sc.getExecutorStorageStatus
          // Calculate macro-level statistics
          val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
          val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
          val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
            .reduceOption(_+_).getOrElse(0L)
          val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
          spark.storage.html.index.
            render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
        }
      } ~
      path("rdd") {
        parameter("id") { id =>
          complete {
            val prefix = "rdd_" + id.toString
            val storageStatusList = sc.getExecutorStorageStatus
            val filteredStorageStatusList = StorageUtils.
              filterStorageStatusByPrefix(storageStatusList, prefix)
            val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
            spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
          }
        }
      } ~
      pathPrefix("static") {
        getFromResourceDirectory(STATIC_RESOURCE_DIR)
      }
    }
  }

  private[spark] def appUIAddress = "http://" + host + ":" + port
}