aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-01-27 23:56:14 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-01-27 23:56:14 -0800
commitf03d9760fd8ac67fd0865cb355ba75d2eff507fe (patch)
tree2faf4f2f6ff16102017dd2e280ec07738fc05680 /core
parent909850729ec59b788645575fdc03df7cc51fe42b (diff)
downloadspark-f03d9760fd8ac67fd0865cb355ba75d2eff507fe.tar.gz
spark-f03d9760fd8ac67fd0865cb355ba75d2eff507fe.tar.bz2
spark-f03d9760fd8ac67fd0865cb355ba75d2eff507fe.zip
Clean up BlockManagerUI a little (make it not be an object, merge with
Directives, and bind to a random port)
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala7
-rw-r--r--core/src/main/scala/spark/Utils.scala17
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala6
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala6
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerUI.scala120
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala6
-rw-r--r--core/src/main/scala/spark/util/MetadataCleaner.scala3
7 files changed, 91 insertions, 74 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 39721b47ae..77036c1275 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -44,6 +44,7 @@ import scheduler.{ResultTask, ShuffleMapTask, DAGScheduler, TaskScheduler}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
+import storage.BlockManagerUI
import util.{MetadataCleaner, TimeStampedHashMap}
/**
@@ -88,8 +89,9 @@ class SparkContext(
SparkEnv.set(env)
// Start the BlockManager UI
- spark.storage.BlockManagerUI.start(SparkEnv.get.actorSystem,
- SparkEnv.get.blockManager.master.masterActor, this)
+ private[spark] val ui = new BlockManagerUI(
+ env.actorSystem, env.blockManager.master.masterActor, this)
+ ui.start()
// Used to store a URL for each static file/jar together with the file's local timestamp
private[spark] val addedFiles = HashMap[String, Long]()
@@ -97,7 +99,6 @@ class SparkContext(
// Keeps track of all persisted RDDs
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]()
-
private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index ae77264372..1e58d01273 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -1,7 +1,7 @@
package spark
import java.io._
-import java.net.{NetworkInterface, InetAddress, Inet4Address, URL, URI}
+import java.net._
import java.util.{Locale, Random, UUID}
import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
import org.apache.hadoop.conf.Configuration
@@ -11,6 +11,7 @@ import scala.collection.JavaConversions._
import scala.io.Source
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
+import scala.Some
/**
* Various utility methods used by Spark.
@@ -431,4 +432,18 @@ private object Utils extends Logging {
}
"%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine)
}
+
+ /**
+ * Try to find a free port to bind to on the local host. This should ideally never be needed,
+ * except that, unfortunately, some of the networking libraries we currently rely on (e.g. Spray)
+ * don't let users bind to port 0 and then figure out which free port they actually bound to.
+ * We work around this by binding a ServerSocket and immediately unbinding it. This is *not*
+ * necessarily guaranteed to work, but it's the best we can do.
+ */
+ def findFreePort(): Int = {
+ val socket = new ServerSocket(0)
+ val portBound = socket.getLocalPort
+ socket.close()
+ portBound
+ }
}
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 458ee2d665..a01774f511 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -14,12 +14,15 @@ import cc.spray.typeconversion.SprayJsonSupport._
import spark.deploy._
import spark.deploy.JsonProtocol._
+/**
+ * Web UI server for the standalone master.
+ */
private[spark]
class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/master/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
- implicit val timeout = Timeout(1 seconds)
+ implicit val timeout = Timeout(10 seconds)
val handler = {
get {
@@ -76,5 +79,4 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
getFromResourceDirectory(RESOURCE_DIR)
}
}
-
}
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index f9489d99fc..ef81f072a3 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -13,12 +13,15 @@ import cc.spray.typeconversion.SprayJsonSupport._
import spark.deploy.{WorkerState, RequestWorkerState}
import spark.deploy.JsonProtocol._
+/**
+ * Web UI server for the standalone worker.
+ */
private[spark]
class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/worker/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
- implicit val timeout = Timeout(1 seconds)
+ implicit val timeout = Timeout(10 seconds)
val handler = {
get {
@@ -50,5 +53,4 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Direct
getFromResourceDirectory(RESOURCE_DIR)
}
}
-
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala
index 956ede201e..eda320fa47 100644
--- a/core/src/main/scala/spark/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala
@@ -1,32 +1,41 @@
package spark.storage
import akka.actor.{ActorRef, ActorSystem}
-import akka.dispatch.Await
import akka.pattern.ask
import akka.util.Timeout
import akka.util.duration._
-import cc.spray.Directives
import cc.spray.directives._
import cc.spray.typeconversion.TwirlSupport._
+import cc.spray.Directives
import scala.collection.mutable.ArrayBuffer
-import spark.{Logging, SparkContext, SparkEnv}
+import spark.{Logging, SparkContext}
import spark.util.AkkaUtils
import spark.Utils
+/**
+ * Web UI server for the BlockManager inside each SparkContext.
+ */
private[spark]
-object BlockManagerUI extends Logging {
+class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, sc: SparkContext)
+ extends Directives with Logging {
+
+ val STATIC_RESOURCE_DIR = "spark/deploy/static"
+
+ implicit val timeout = Timeout(10 seconds)
- /* Starts the Web interface for the BlockManager */
- def start(actorSystem : ActorSystem, masterActor: ActorRef, sc: SparkContext) {
- val webUIDirectives = new BlockManagerUIDirectives(actorSystem, masterActor, sc)
+ /** Start a HTTP server to run the Web interface */
+ def start() {
try {
- // TODO: This needs to find a random free port to bind to. Unfortunately, there's no way
- // in spray to do that, so we'll have to rely on something like new ServerSocket()
- val boundPort = AkkaUtils.startSprayServer(actorSystem, "0.0.0.0",
- Option(System.getenv("BLOCKMANAGER_UI_PORT")).getOrElse("9080").toInt,
- webUIDirectives.handler, "BlockManagerHTTPServer")
- logInfo("Started BlockManager web UI at %s:%d".format(Utils.localHostName(), boundPort))
+ val port = if (System.getProperty("spark.ui.port") != null) {
+ System.getProperty("spark.ui.port").toInt
+ } else {
+ // TODO: Unfortunately, it's not possible to pass port 0 to spray and figure out which
+ // random port it bound to, so we have to try to find a local one by creating a socket.
+ Utils.findFreePort()
+ }
+ AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, handler, "BlockManagerHTTPServer")
+ logInfo("Started BlockManager web UI at http://%s:%d".format(Utils.localHostName(), port))
} catch {
case e: Exception =>
logError("Failed to create BlockManager WebUI", e)
@@ -34,58 +43,43 @@ object BlockManagerUI extends Logging {
}
}
-}
-
-
-private[spark]
-class BlockManagerUIDirectives(val actorSystem: ActorSystem, master: ActorRef,
- sc: SparkContext) extends Directives {
-
- val STATIC_RESOURCE_DIR = "spark/deploy/static"
- implicit val timeout = Timeout(1 seconds)
-
val handler = {
-
- get { path("") { completeWith {
- // Request the current storage status from the Master
- val future = master ? GetStorageStatus
- future.map { status =>
- val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
-
- // Calculate macro-level statistics
- val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
- val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
- .reduceOption(_+_).getOrElse(0L)
-
- val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
-
- spark.storage.html.index.
- render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
- }
- }}} ~
- get { path("rdd") { parameter("id") { id => { completeWith {
- val future = master ? GetStorageStatus
- future.map { status =>
- val prefix = "rdd_" + id.toString
-
-
- val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
- val filteredStorageStatusList = StorageUtils.
- filterStorageStatusByPrefix(storageStatusList, prefix)
-
- val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
-
- spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
-
+ get {
+ path("") {
+ completeWith {
+ // Request the current storage status from the Master
+ val future = blockManagerMaster ? GetStorageStatus
+ future.map { status =>
+ // Calculate macro-level statistics
+ val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
+ val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
+ .reduceOption(_+_).getOrElse(0L)
+ val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
+ spark.storage.html.index.
+ render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
+ }
+ }
+ } ~
+ path("rdd") {
+ parameter("id") { id =>
+ completeWith {
+ val future = blockManagerMaster ? GetStorageStatus
+ future.map { status =>
+ val prefix = "rdd_" + id.toString
+ val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
+ val filteredStorageStatusList = StorageUtils.
+ filterStorageStatusByPrefix(storageStatusList, prefix)
+ val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
+ spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
+ }
+ }
+ }
+ } ~
+ pathPrefix("static") {
+ getFromResourceDirectory(STATIC_RESOURCE_DIR)
}
- }}}}} ~
- pathPrefix("static") {
- getFromResourceDirectory(STATIC_RESOURCE_DIR)
}
-
}
-
-
-
}
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index 775ff8f1aa..e0fdeffbc4 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -1,6 +1,6 @@
package spark.util
-import akka.actor.{Props, ActorSystemImpl, ActorSystem}
+import akka.actor.{ActorRef, Props, ActorSystemImpl, ActorSystem}
import com.typesafe.config.ConfigFactory
import akka.util.duration._
import akka.pattern.ask
@@ -55,7 +55,7 @@ private[spark] object AkkaUtils {
* handle requests. Returns the bound port or throws a SparkException on failure.
*/
def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route,
- name: String = "HttpServer"): Int = {
+ name: String = "HttpServer"): ActorRef = {
val ioWorker = new IoWorker(actorSystem).start()
val httpService = actorSystem.actorOf(Props(new HttpService(route)))
val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService)))
@@ -67,7 +67,7 @@ private[spark] object AkkaUtils {
try {
Await.result(future, timeout) match {
case bound: HttpServer.Bound =>
- return bound.endpoint.getPort
+ return server
case other: Any =>
throw new SparkException("Failed to bind web UI to port " + port + ": " + other)
}
diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
index 721c4c6029..51fb440108 100644
--- a/core/src/main/scala/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/spark/util/MetadataCleaner.scala
@@ -5,6 +5,9 @@ import java.util.{TimerTask, Timer}
import spark.Logging
+/**
+ * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
+ */
class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
val delaySeconds = MetadataCleaner.getDelaySeconds