aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala1
-rw-r--r--core/src/main/scala/spark/RDD.scala25
-rw-r--r--core/src/main/scala/spark/SparkContext.scala15
-rw-r--r--core/src/main/scala/spark/TaskEndReason.scala4
-rw-r--r--core/src/main/scala/spark/Utils.scala45
-rw-r--r--core/src/main/scala/spark/deploy/JsonProtocol.scala112
-rw-r--r--core/src/main/scala/spark/deploy/LocalSparkCluster.scala4
-rw-r--r--core/src/main/scala/spark/deploy/WebUI.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala22
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala78
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala101
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/IndexPage.scala119
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala59
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala23
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala61
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala100
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala78
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala10
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala23
-rw-r--r--core/src/main/scala/spark/scheduler/InputFormatInfo.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/JobLogger.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/JobResult.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/JobWaiter.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListener.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/Stage.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala17
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala1
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerUI.scala82
-rw-r--r--core/src/main/scala/spark/ui/JettyUtils.scala115
-rw-r--r--core/src/main/scala/spark/ui/Page.scala3
-rw-r--r--core/src/main/scala/spark/ui/SparkUI.scala63
-rw-r--r--core/src/main/scala/spark/ui/UIUtils.scala113
-rw-r--r--core/src/main/scala/spark/ui/UIWorkloadGenerator.scala71
-rw-r--r--core/src/main/scala/spark/ui/jobs/IndexPage.scala112
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressUI.scala127
-rw-r--r--core/src/main/scala/spark/ui/jobs/StagePage.scala114
-rw-r--r--core/src/main/scala/spark/ui/storage/BlockManagerUI.scala24
-rw-r--r--core/src/main/scala/spark/ui/storage/IndexPage.scala64
-rw-r--r--core/src/main/scala/spark/ui/storage/RDDPage.scala104
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala39
42 files changed, 1551 insertions, 395 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index fe812fe530..0095b868a8 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil}
+import org.apache.hadoop.security.UserGroupInformation
import spark.partial.BoundedDouble
import spark.partial.PartialResult
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index e88290fdb2..8dde5391d3 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -108,6 +108,9 @@ abstract class RDD[T: ClassTag](
// Methods and fields available on all RDDs
// =======================================================================
+ /** The SparkContext that created this RDD. */
+ def sparkContext: SparkContext = sc
+
/** A unique ID for this RDD (within its SparkContext). */
val id: Int = sc.newRddId()
@@ -284,31 +287,35 @@ abstract class RDD[T: ClassTag](
def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = {
var fraction = 0.0
var total = 0
- val multiplier = 3.0
- val initialCount = count()
+ var multiplier = 3.0
+ var initialCount = this.count()
var maxSelected = 0
+ if (num < 0) {
+ throw new IllegalArgumentException("Negative number of elements requested")
+ }
+
if (initialCount > Integer.MAX_VALUE - 1) {
maxSelected = Integer.MAX_VALUE - 1
} else {
maxSelected = initialCount.toInt
}
- if (num > initialCount) {
+ if (num > initialCount && !withReplacement) {
total = maxSelected
- fraction = math.min(multiplier * (maxSelected + 1) / initialCount, 1.0)
- } else if (num < 0) {
- throw(new IllegalArgumentException("Negative number of elements requested"))
+ fraction = multiplier * (maxSelected + 1) / initialCount
} else {
- fraction = math.min(multiplier * (num + 1) / initialCount, 1.0)
+ fraction = multiplier * (num + 1) / initialCount
total = num
}
val rand = new Random(seed)
- var samples = this.sample(withReplacement, fraction, rand.nextInt).collect()
+ var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
+ // If the first sample didn't turn out large enough, keep trying to take samples;
+ // this shouldn't happen often because we use a big multiplier for thei initial size
while (samples.length < total) {
- samples = this.sample(withReplacement, fraction, rand.nextInt).collect()
+ samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
}
Utils.randomizeInPlace(samples, rand).take(total)
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index ef6de87193..01d9a70a78 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.mesos.MesosNativeLibrary
@@ -49,8 +50,9 @@ import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener,
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import spark.storage.{BlockManagerUI, StorageStatus, StorageUtils, RDDInfo}
+import spark.storage.{StorageStatus, StorageUtils, RDDInfo}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
+import ui.{SparkUI}
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -96,11 +98,6 @@ class SparkContext(
isLocal)
SparkEnv.set(env)
- // Start the BlockManager UI
- private[spark] val ui = new BlockManagerUI(
- env.actorSystem, env.blockManager.master.driverActor, this)
- ui.start()
-
// Used to store a URL for each static file/jar together with the file's local timestamp
private[spark] val addedFiles = HashMap[String, Long]()
private[spark] val addedJars = HashMap[String, Long]()
@@ -109,6 +106,9 @@ class SparkContext(
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
+ // Initalize the Spark UI
+ private[spark] val ui = new SparkUI(this)
+ ui.bind()
// Add each JAR given through the constructor
if (jars != null) {
@@ -217,6 +217,8 @@ class SparkContext(
@volatile private var dagScheduler = new DAGScheduler(taskScheduler)
dagScheduler.start()
+ ui.start()
+
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
val conf = SparkHadoopUtil.newConfiguration()
@@ -579,6 +581,7 @@ class SparkContext(
/** Shut down the SparkContext. */
def stop() {
+ ui.stop()
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala
index 8140cba084..bb75ec208c 100644
--- a/core/src/main/scala/spark/TaskEndReason.scala
+++ b/core/src/main/scala/spark/TaskEndReason.scala
@@ -1,5 +1,6 @@
package spark
+import spark.executor.TaskMetrics
import spark.storage.BlockManagerId
/**
@@ -24,7 +25,8 @@ private[spark] case class FetchFailed(
private[spark] case class ExceptionFailure(
className: String,
description: String,
- stackTrace: Array[StackTraceElement])
+ stackTrace: Array[StackTraceElement],
+ metrics: Option[TaskMetrics])
extends TaskEndReason
private[spark] case class OtherFailure(message: String) extends TaskEndReason
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index e02507f83e..f90b2ccaa1 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -490,6 +490,26 @@ private object Utils extends Logging {
}
/**
+ * Returns a human-readable string representing a duration such as "35ms"
+ */
+ def msDurationToString(ms: Long): String = {
+ val second = 1000
+ val minute = 60 * second
+ val hour = 60 * minute
+
+ ms match {
+ case t if t < second =>
+ "%d ms".format(t)
+ case t if t < minute =>
+ "%.1f s".format(t.toFloat / second)
+ case t if t < hour =>
+ "%.1f m".format(t.toFloat / minute)
+ case t =>
+ "%.2f h".format(t.toFloat / hour)
+ }
+ }
+
+ /**
* Convert a memory quantity in megabytes to a human-readable string such as "4.0 MB".
*/
def memoryMegabytesToString(megabytes: Long): String = {
@@ -604,18 +624,19 @@ private object Utils extends Logging {
"%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile,
callSiteInfo.firstUserLine)
}
- /**
- * Try to find a free port to bind to on the local host. This should ideally never be needed,
- * except that, unfortunately, some of the networking libraries we currently rely on (e.g. Spray)
- * don't let users bind to port 0 and then figure out which free port they actually bound to.
- * We work around this by binding a ServerSocket and immediately unbinding it. This is *not*
- * necessarily guaranteed to work, but it's the best we can do.
- */
- def findFreePort(): Int = {
- val socket = new ServerSocket(0)
- val portBound = socket.getLocalPort
- socket.close()
- portBound
+
+ /** Return a string containing the last `n` bytes of a file. */
+ def lastNBytes(path: String, n: Int): String = {
+ val file = new File(path)
+ val length = file.length()
+ val buff = new Array[Byte](math.min(n, length.toInt))
+ val skip = math.max(0, length - n)
+ val stream = new FileInputStream(file)
+
+ stream.skip(skip)
+ stream.read(buff)
+ stream.close()
+ Source.fromBytes(buff).mkString
}
/**
diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala
index 88b03a007c..607d34d111 100644
--- a/core/src/main/scala/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala
@@ -1,79 +1,65 @@
package spark.deploy
import master.{ApplicationInfo, WorkerInfo}
+import net.liftweb.json.JsonDSL._
import worker.ExecutorRunner
-import spray.json._
-/**
- * spray-json helper class containing implicit conversion to json for marshalling responses
- */
-private[spark] object JsonProtocol extends DefaultJsonProtocol {
- implicit object WorkerInfoJsonFormat extends RootJsonWriter[WorkerInfo] {
- def write(obj: WorkerInfo) = JsObject(
- "id" -> JsString(obj.id),
- "host" -> JsString(obj.host),
- "port" -> JsNumber(obj.port),
- "webuiaddress" -> JsString(obj.webUiAddress),
- "cores" -> JsNumber(obj.cores),
- "coresused" -> JsNumber(obj.coresUsed),
- "memory" -> JsNumber(obj.memory),
- "memoryused" -> JsNumber(obj.memoryUsed)
- )
- }
+private[spark] object JsonProtocol {
+ def writeWorkerInfo(obj: WorkerInfo) = {
+ ("id" -> obj.id) ~
+ ("host" -> obj.host) ~
+ ("port" -> obj.port) ~
+ ("webuiaddress" -> obj.webUiAddress) ~
+ ("cores" -> obj.cores) ~
+ ("coresused" -> obj.coresUsed) ~
+ ("memory" -> obj.memory) ~
+ ("memoryused" -> obj.memoryUsed)
+ }
- implicit object AppInfoJsonFormat extends RootJsonWriter[ApplicationInfo] {
- def write(obj: ApplicationInfo) = JsObject(
- "starttime" -> JsNumber(obj.startTime),
- "id" -> JsString(obj.id),
- "name" -> JsString(obj.desc.name),
- "cores" -> JsNumber(obj.desc.maxCores),
- "user" -> JsString(obj.desc.user),
- "memoryperslave" -> JsNumber(obj.desc.memoryPerSlave),
- "submitdate" -> JsString(obj.submitDate.toString))
+ def writeApplicationInfo(obj: ApplicationInfo) = {
+ ("starttime" -> obj.startTime) ~
+ ("id" -> obj.id) ~
+ ("name" -> obj.desc.name) ~
+ ("cores" -> obj.desc.maxCores) ~
+ ("user" -> obj.desc.user) ~
+ ("memoryperslave" -> obj.desc.memoryPerSlave) ~
+ ("submitdate" -> obj.submitDate.toString)
}
- implicit object AppDescriptionJsonFormat extends RootJsonWriter[ApplicationDescription] {
- def write(obj: ApplicationDescription) = JsObject(
- "name" -> JsString(obj.name),
- "cores" -> JsNumber(obj.maxCores),
- "memoryperslave" -> JsNumber(obj.memoryPerSlave),
- "user" -> JsString(obj.user)
- )
+ def writeApplicationDescription(obj: ApplicationDescription) = {
+ ("name" -> obj.name) ~
+ ("cores" -> obj.maxCores) ~
+ ("memoryperslave" -> obj.memoryPerSlave) ~
+ ("user" -> obj.user)
}
- implicit object ExecutorRunnerJsonFormat extends RootJsonWriter[ExecutorRunner] {
- def write(obj: ExecutorRunner) = JsObject(
- "id" -> JsNumber(obj.execId),
- "memory" -> JsNumber(obj.memory),
- "appid" -> JsString(obj.appId),
- "appdesc" -> obj.appDesc.toJson.asJsObject
- )
+ def writeExecutorRunner(obj: ExecutorRunner) = {
+ ("id" -> obj.execId) ~
+ ("memory" -> obj.memory) ~
+ ("appid" -> obj.appId) ~
+ ("appdesc" -> writeApplicationDescription(obj.appDesc))
}
- implicit object MasterStateJsonFormat extends RootJsonWriter[MasterState] {
- def write(obj: MasterState) = JsObject(
- "url" -> JsString("spark://" + obj.uri),
- "workers" -> JsArray(obj.workers.toList.map(_.toJson)),
- "cores" -> JsNumber(obj.workers.map(_.cores).sum),
- "coresused" -> JsNumber(obj.workers.map(_.coresUsed).sum),
- "memory" -> JsNumber(obj.workers.map(_.memory).sum),
- "memoryused" -> JsNumber(obj.workers.map(_.memoryUsed).sum),
- "activeapps" -> JsArray(obj.activeApps.toList.map(_.toJson)),
- "completedapps" -> JsArray(obj.completedApps.toList.map(_.toJson))
- )
+ def writeMasterState(obj: MasterState) = {
+ ("url" -> ("spark://" + obj.uri)) ~
+ ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
+ ("cores" -> obj.workers.map(_.cores).sum) ~
+ ("coresused" -> obj.workers.map(_.coresUsed).sum) ~
+ ("memory" -> obj.workers.map(_.memory).sum) ~
+ ("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
+ ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
+ ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo))
}
- implicit object WorkerStateJsonFormat extends RootJsonWriter[WorkerState] {
- def write(obj: WorkerState) = JsObject(
- "id" -> JsString(obj.workerId),
- "masterurl" -> JsString(obj.masterUrl),
- "masterwebuiurl" -> JsString(obj.masterWebUiUrl),
- "cores" -> JsNumber(obj.cores),
- "coresused" -> JsNumber(obj.coresUsed),
- "memory" -> JsNumber(obj.memory),
- "memoryused" -> JsNumber(obj.memoryUsed),
- "executors" -> JsArray(obj.executors.toList.map(_.toJson)),
- "finishedexecutors" -> JsArray(obj.finishedExecutors.toList.map(_.toJson))
- )
+ def writeWorkerState(obj: WorkerState) = {
+ ("id" -> obj.workerId) ~
+ ("masterurl" -> obj.masterUrl) ~
+ ("masterwebuiurl" -> obj.masterWebUiUrl) ~
+ ("cores" -> obj.cores) ~
+ ("coresused" -> obj.coresUsed) ~
+ ("memory" -> obj.memory) ~
+ ("memoryused" -> obj.memoryUsed) ~
+ ("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~
+ ("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner))
}
}
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index 2b0b3b10e7..494b1cc6f0 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -17,11 +17,11 @@ import scala.collection.mutable.ArrayBuffer
*/
private[spark]
class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
-
+
private val localHostname = Utils.localHostName()
private val masterActorSystems = ArrayBuffer[ActorSystem]()
private val workerActorSystems = ArrayBuffer[ActorSystem]()
-
+
def start(): String = {
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
diff --git a/core/src/main/scala/spark/deploy/WebUI.scala b/core/src/main/scala/spark/deploy/WebUI.scala
index ad1a1092b2..844c4142c7 100644
--- a/core/src/main/scala/spark/deploy/WebUI.scala
+++ b/core/src/main/scala/spark/deploy/WebUI.scala
@@ -6,7 +6,7 @@ import java.util.Date
/**
* Utilities used throughout the web UI.
*/
-private[spark] object WebUI {
+private[spark] object DeployWebUI {
val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
def formatDate(date: Date): String = DATE_FORMAT.format(date)
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 770cfe9d05..d31e6735b7 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -13,6 +13,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import spark.deploy._
import spark.{Logging, SparkException, Utils}
import spark.util.AkkaUtils
+import ui.MasterWebUI
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
@@ -35,6 +36,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
var firstApp: Option[ApplicationInfo] = None
+ val webUi = new MasterWebUI(self)
+
Utils.checkHost(host, "Expected hostname")
val masterPublicAddress = {
@@ -51,20 +54,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
logInfo("Starting Spark master at spark://" + host + ":" + port)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- startWebUi()
+ webUi.start()
import context.dispatcher
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
}
- def startWebUi() {
- val webUi = new MasterWebUI(self)
- try {
- AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler)
- } catch {
- case e: Exception =>
- logError("Failed to create web UI", e)
- System.exit(1)
- }
+ override def postStop() {
+ webUi.stop()
}
override def receive = {
@@ -76,7 +72,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
} else {
addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
context.watch(sender) // This doesn't work with remote actors but helps for testing
- sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUiPort)
+ sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort)
schedule()
}
}
@@ -279,7 +275,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
exec.state = ExecutorState.KILLED
}
app.markFinished(state)
- app.driver ! ApplicationRemoved(state.toString)
+ if (state != ApplicationState.FINISHED) {
+ app.driver ! ApplicationRemoved(state.toString)
+ }
schedule()
}
}
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
deleted file mode 100644
index 34cee87853..0000000000
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-package spark.deploy.master
-
-import akka.actor.{ActorRef, ActorContext, ActorRefFactory}
-import scala.concurrent.Await
-import akka.pattern.ask
-
-import akka.util.Timeout
-import scala.concurrent.duration._
-import spray.routing.Directives
-import spray.routing.directives._
-import spray.httpx.TwirlSupport._
-import spray.httpx.SprayJsonSupport._
-import spray.http.MediaTypes._
-
-import spark.deploy._
-import spark.deploy.JsonProtocol._
-
-/**
- * Web UI server for the standalone master.
- */
-private[spark]
-class MasterWebUI(master: ActorRef)(implicit val context: ActorContext) extends Directives {
- import context.dispatcher
-
- val actorSystem = context.system
- val RESOURCE_DIR = "spark/deploy/master/webui"
- val STATIC_RESOURCE_DIR = "spark/deploy/static"
-
- implicit val timeout = Timeout(Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
-
- val handler = {
- get {
- (path("") & parameters('format ?)) {
- case Some(js) if js.equalsIgnoreCase("json") =>
- val future = (master ? RequestMasterState).mapTo[MasterState]
- respondWithMediaType(`application/json`) { ctx =>
- ctx.complete(future.mapTo[MasterState])
- }
- case _ =>
- complete {
- val future = (master ? RequestMasterState).mapTo[MasterState]
- future.map {
- masterState => spark.deploy.master.html.index.render(masterState)
- }
- }
- } ~
- path("app") {
- parameters("appId", 'format ?) {
- case (appId, Some(js)) if (js.equalsIgnoreCase("json")) =>
- val future = master ? RequestMasterState
- val appInfo = for (masterState <- future.mapTo[MasterState]) yield {
- masterState.activeApps.find(_.id == appId).getOrElse({
- masterState.completedApps.find(_.id == appId).getOrElse(null)
- })
- }
- respondWithMediaType(`application/json`) { ctx =>
- ctx.complete(appInfo.mapTo[ApplicationInfo])
- }
- case (appId, _) =>
- complete {
- val future = master ? RequestMasterState
- future.map { state =>
- val masterState = state.asInstanceOf[MasterState]
- val app = masterState.activeApps.find(_.id == appId).getOrElse({
- masterState.completedApps.find(_.id == appId).getOrElse(null)
- })
- spark.deploy.master.html.app_details.render(app)
- }
- }
- }
- } ~
- pathPrefix("static") {
- getFromResourceDirectory(STATIC_RESOURCE_DIR)
- } ~
- getFromResourceDirectory(RESOURCE_DIR)
- }
- }
-}
diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
new file mode 100644
index 0000000000..939f8c9587
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
@@ -0,0 +1,101 @@
+package spark.deploy.master.ui
+
+import akka.pattern.ask
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import javax.servlet.http.HttpServletRequest
+
+import net.liftweb.json.JsonAST.JValue
+
+import scala.xml.Node
+
+import spark.deploy.{RequestMasterState, JsonProtocol, MasterState}
+import spark.deploy.master.ExecutorInfo
+import spark.ui.UIUtils
+
+private[spark] class ApplicationPage(parent: MasterWebUI) {
+ val master = parent.master
+ implicit val timeout = parent.timeout
+
+ /** Executor details for a particular application */
+ def renderJson(request: HttpServletRequest): JValue = {
+ val appId = request.getParameter("appId")
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
+ val state = Await.result(stateFuture, 30 seconds)
+ val app = state.activeApps.find(_.id == appId).getOrElse({
+ state.completedApps.find(_.id == appId).getOrElse(null)
+ })
+ JsonProtocol.writeApplicationInfo(app)
+ }
+
+ /** Executor details for a particular application */
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val appId = request.getParameter("appId")
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
+ val state = Await.result(stateFuture, 30 seconds)
+ val app = state.activeApps.find(_.id == appId).getOrElse({
+ state.completedApps.find(_.id == appId).getOrElse(null)
+ })
+
+ val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs")
+ val executors = app.executors.values.toSeq
+ val executorTable = UIUtils.listingTable(executorHeaders, executorRow, executors)
+
+ val content =
+ <hr />
+ <div class="row">
+ <div class="span12">
+ <ul class="unstyled">
+ <li><strong>ID:</strong> {app.id}</li>
+ <li><strong>Description:</strong> {app.desc.name}</li>
+ <li><strong>User:</strong> {app.desc.user}</li>
+ <li><strong>Cores:</strong>
+ {
+ if (app.desc.maxCores == Integer.MAX_VALUE) {
+ "Unlimited %s granted".format(app.coresGranted)
+ } else {
+ "%s (%s granted, %s left)".format(
+ app.desc.maxCores, app.coresGranted, app.coresLeft)
+ }
+ }
+ </li>
+ <li><strong>Memory per Slave:</strong> {app.desc.memoryPerSlave}</li>
+ <li><strong>Submit Date:</strong> {app.submitDate}</li>
+ <li><strong>State:</strong> {app.state}</li>
+ <li><strong><a href={app.appUiUrl}>Application Detail UI</a></strong></li>
+ </ul>
+ </div>
+ </div>
+
+ <hr/>
+
+ <div class="row"> <!-- Executors -->
+ <div class="span12">
+ <h3> Executor Summary </h3>
+ <br/>
+ {executorTable}
+ </div>
+ </div>;
+ UIUtils.basicSparkPage(content, "Application Info: " + app.desc.name)
+ }
+
+ def executorRow(executor: ExecutorInfo): Seq[Node] = {
+ <tr>
+ <td>{executor.id}</td>
+ <td>
+ <a href={executor.worker.webUiAddress}>{executor.worker.id}</a>
+ </td>
+ <td>{executor.cores}</td>
+ <td>{executor.memory}</td>
+ <td>{executor.state}</td>
+ <td>
+ <a href={"%s/log?appId=%s&executorId=%s&logType=stdout"
+ .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
+ <a href={"%s/log?appId=%s&executorId=%s&logType=stderr"
+ .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a>
+ </td>
+ </tr>
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
new file mode 100644
index 0000000000..73e378f988
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
@@ -0,0 +1,119 @@
+package spark.deploy.master.ui
+
+import scala.concurrent.Await
+import akka.pattern.ask
+import scala.concurrent.duration._
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import spark.deploy.{RequestMasterState, DeployWebUI, MasterState}
+import spark.Utils
+import spark.ui.UIUtils
+import spark.deploy.master.{ApplicationInfo, WorkerInfo}
+
+private[spark] class IndexPage(parent: MasterWebUI) {
+ val master = parent.master
+ implicit val timeout = parent.timeout
+
+ /** Index view listing applications and executors */
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
+ val state = Await.result(stateFuture, 30 seconds)
+
+ val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
+ val workers = state.workers.sortBy(_.id)
+ val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)
+
+ val appHeaders = Seq("ID", "Description", "Cores", "Memory per Node", "Submit Time", "User",
+ "State", "Duration")
+ val activeApps = state.activeApps.sortBy(_.startTime).reverse
+ val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
+ val completedApps = state.completedApps.sortBy(_.endTime).reverse
+ val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
+
+ val content =
+ <hr />
+ <div class="row">
+ <div class="span12">
+ <ul class="unstyled">
+ <li><strong>URL:</strong>{state.uri}</li>
+ <li><strong>Workers:</strong>{state.workers.size}</li>
+ <li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
+ {state.workers.map(_.coresUsed).sum} Used</li>
+ <li><strong>Memory:</strong>
+ {Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total,
+ {Utils.memoryMegabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
+ <li><strong>Applications:</strong>
+ {state.activeApps.size} Running,
+ {state.completedApps.size} Completed </li>
+ </ul>
+ </div>
+ </div>
+
+ <div class="row">
+ <div class="span12">
+ <h3> Workers </h3>
+ <br/>
+ {workerTable}
+ </div>
+ </div>
+
+ <hr/>
+
+ <div class="row">
+ <div class="span12">
+ <h3> Running Applications </h3>
+ <br/>
+ {activeAppsTable}
+ </div>
+ </div>
+
+ <hr/>
+
+ <div class="row">
+ <div class="span12">
+ <h3> Completed Applications </h3>
+ <br/>
+ {completedAppsTable}
+ </div>
+ </div>;
+ UIUtils.basicSparkPage(content, "Spark Master: " + state.uri)
+ }
+
+ def workerRow(worker: WorkerInfo): Seq[Node] = {
+ <tr>
+ <td>
+ <a href={worker.webUiAddress}>{worker.id}</a>
+ </td>
+ <td>{worker.host}:{worker.port}</td>
+ <td>{worker.state}</td>
+ <td>{worker.cores} ({worker.coresUsed} Used)</td>
+ <td sorttable_customkey={"%s.%s".format(worker.memory, worker.memoryUsed)}>
+ {Utils.memoryMegabytesToString(worker.memory)}
+ ({Utils.memoryMegabytesToString(worker.memoryUsed)} Used)
+ </td>
+ </tr>
+ }
+
+
+ def appRow(app: ApplicationInfo): Seq[Node] = {
+ <tr>
+ <td>
+ <a href={"app?appId=" + app.id}>{app.id}</a>
+ </td>
+ <td>{app.desc.name}</td>
+ <td>
+ {app.coresGranted}
+ </td>
+ <td sorttable_customkey={app.desc.memoryPerSlave.toString}>
+ {Utils.memoryMegabytesToString(app.desc.memoryPerSlave)}
+ </td>
+ <td>{DeployWebUI.formatDate(app.submitDate)}</td>
+ <td>{app.desc.user}</td>
+ <td>{app.state.toString}</td>
+ <td>{DeployWebUI.formatDuration(app.duration)}</td>
+ </tr>
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
new file mode 100644
index 0000000000..bcc4d49234
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
@@ -0,0 +1,59 @@
+package spark.deploy.master.ui
+
+import akka.actor.ActorRef
+import scala.concurrent.duration._
+
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.{Handler, Server}
+
+import spark.{Logging, Utils}
+import spark.ui.JettyUtils
+import spark.ui.JettyUtils._
+
+/**
+ * Web UI server for the standalone master.
+ */
+private[spark]
+class MasterWebUI(val master: ActorRef, requestedPort: Option[Int] = None) extends Logging {
+ implicit val timeout = Duration.create(
+ System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val host = Utils.localHostName()
+ val port = requestedPort.getOrElse(
+ System.getProperty("master.ui.port", MasterWebUI.DEFAULT_PORT).toInt)
+
+ var server: Option[Server] = None
+ var boundPort: Option[Int] = None
+
+ val applicationPage = new ApplicationPage(this)
+ val indexPage = new IndexPage(this)
+
+ def start() {
+ try {
+ val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
+ server = Some(srv)
+ boundPort = Some(bPort)
+ logInfo("Started Master web UI at http://%s:%d".format(host, boundPort.get))
+ } catch {
+ case e: Exception =>
+ logError("Failed to create Master JettyUtils", e)
+ System.exit(1)
+ }
+ }
+
+ val handlers = Array[(String, Handler)](
+ ("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
+ ("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
+ ("/app", (request: HttpServletRequest) => applicationPage.render(request)),
+ ("*", (request: HttpServletRequest) => indexPage.render(request))
+ )
+
+ def stop() {
+ server.foreach(_.stop())
+ }
+}
+
+private[spark] object MasterWebUI {
+ val STATIC_RESOURCE_DIR = "spark/ui/static"
+ val DEFAULT_PORT = "8080"
+}
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index b5dfd16e67..7c1871e047 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -14,7 +14,7 @@ import spark.deploy.LaunchExecutor
import spark.deploy.RegisterWorkerFailed
import spark.deploy.master.Master
import java.io.File
-
+import ui.WorkerWebUI
private[spark] class Worker(
host: String,
@@ -45,6 +45,7 @@ private[spark] class Worker(
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
}
+ var webUi: WorkerWebUI = null
var coresUsed = 0
var memoryUsed = 0
@@ -76,35 +77,26 @@ private[spark] class Worker(
sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
logInfo("Spark home: " + sparkHome)
createWorkDir()
+ webUi = new WorkerWebUI(self, workDir, Some(webUiPort))
+ webUi.start()
connectToMaster()
- startWebUi()
}
def connectToMaster() {
logInfo("Connecting to master " + masterUrl)
master = context.actorFor(Master.toAkkaUrl(masterUrl))
- master ! RegisterWorker(workerId, host, port, cores, memory, webUiPort, publicAddress)
+ master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
}
- def startWebUi() {
- val webUi = new WorkerWebUI(self, workDir)
- try {
- AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler)
- } catch {
- case e: Exception =>
- logError("Failed to create web UI", e)
- System.exit(1)
- }
- }
+ import context.dispatcher
override def receive = {
case RegisteredWorker(url) =>
masterWebUiUrl = url
logInfo("Successfully registered with master")
- import context.dispatcher
- context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
+ context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
master ! Heartbeat(workerId)
}
@@ -170,6 +162,7 @@ private[spark] class Worker(
override def postStop() {
executors.values.foreach(_.kill())
+ webUi.stop()
}
}
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
deleted file mode 100644
index cc2ab6187a..0000000000
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-package spark.deploy.worker
-
-import akka.actor.{ActorRef, ActorContext}
-import scala.concurrent.Await
-import akka.pattern.ask
-
-import akka.util.Timeout
-import scala.concurrent.duration._
-import spray.routing.Directives
-import spray.httpx.TwirlSupport._
-import spray.httpx.SprayJsonSupport._
-import spray.http.MediaTypes._
-
-import spark.deploy.{WorkerState, RequestWorkerState}
-import spark.deploy.JsonProtocol._
-import java.io.File
-
-/**
- * Web UI server for the standalone worker.
- */
-private[spark]
-class WorkerWebUI(worker: ActorRef, workDir: File)(implicit val context: ActorContext) extends Directives {
- import context.dispatcher
-
- val actorSystem = context.system
- val RESOURCE_DIR = "spark/deploy/worker/webui"
- val STATIC_RESOURCE_DIR = "spark/deploy/static"
-
- implicit val timeout = Timeout(Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
-
- val handler = {
- get {
- (path("") & parameters('format ?)) {
- case Some(js) if js.equalsIgnoreCase("json") => {
- val future = (worker ? RequestWorkerState).mapTo[WorkerState]
- respondWithMediaType(`application/json`) { ctx =>
- ctx.complete(future)
- }
- }
- case _ =>
- complete {
- val future = (worker ? RequestWorkerState).mapTo[WorkerState]
- future.map { workerState =>
- spark.deploy.worker.html.index(workerState)
- }
- }
- } ~
- path("log") {
- parameters("appId", "executorId", "logType") { (appId, executorId, logType) =>
- respondWithMediaType(`text/plain`) {
- getFromFile(workDir.getPath() + "/" + appId + "/" + executorId + "/" + logType)
- }
- }
- } ~
- pathPrefix("static") {
- getFromResourceDirectory(STATIC_RESOURCE_DIR)
- } ~
- getFromResourceDirectory(RESOURCE_DIR)
- }
- }
-}
diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
new file mode 100644
index 0000000000..8cb74632aa
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
@@ -0,0 +1,100 @@
+package spark.deploy.worker.ui
+
+import scala.concurrent.duration._
+import scala.concurrent.Await
+
+import akka.pattern.ask
+
+import javax.servlet.http.HttpServletRequest
+
+import net.liftweb.json.JsonAST.JValue
+
+import scala.xml.Node
+
+import spark.deploy.{RequestWorkerState, JsonProtocol, WorkerState}
+import spark.deploy.worker.ExecutorRunner
+import spark.Utils
+import spark.ui.UIUtils
+
+private[spark] class IndexPage(parent: WorkerWebUI) {
+ val worker = parent.worker
+ val timeout = parent.timeout
+
+ def renderJson(request: HttpServletRequest): JValue = {
+ val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState]
+ val workerState = Await.result(stateFuture, 30 seconds)
+ JsonProtocol.writeWorkerState(workerState)
+ }
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState]
+ val workerState = Await.result(stateFuture, 30 seconds)
+
+ val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
+ val runningExecutorTable =
+ UIUtils.listingTable(executorHeaders, executorRow, workerState.executors)
+ val finishedExecutorTable =
+ UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors)
+
+ val content =
+ <hr />
+ <div class="row"> <!-- Worker Details -->
+ <div class="span12">
+ <ul class="unstyled">
+ <li><strong>ID:</strong> {workerState.workerId}</li>
+ <li><strong>
+ Master URL:</strong> {workerState.masterUrl}
+ </li>
+ <li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li>
+ <li><strong>Memory:</strong> {Utils.memoryMegabytesToString(workerState.memory)}
+ ({Utils.memoryMegabytesToString(workerState.memoryUsed)} Used)</li>
+ </ul>
+ <p><a href={workerState.masterWebUiUrl}>Back to Master</a></p>
+ </div>
+ </div>
+ <hr/>
+
+ <div class="row"> <!-- Running Executors -->
+ <div class="span12">
+ <h3> Running Executors {workerState.executors.size} </h3>
+ <br/>
+ {runningExecutorTable}
+ </div>
+ </div>
+ <hr/>
+
+ <div class="row"> <!-- Finished Executors -->
+ <div class="span12">
+ <h3> Finished Executors </h3>
+ <br/>
+ {finishedExecutorTable}
+ </div>
+ </div>;
+
+ UIUtils.basicSparkPage(content, "Spark Worker on %s:%s".format(workerState.host, workerState.port))
+ }
+
+ def executorRow(executor: ExecutorRunner): Seq[Node] = {
+ <tr>
+ <td>{executor.execId}</td>
+ <td>{executor.cores}</td>
+ <td sorttable_customkey={executor.memory.toString}>
+ {Utils.memoryMegabytesToString(executor.memory)}
+ </td>
+ <td>
+ <ul class="unstyled">
+ <li><strong>ID:</strong> {executor.appId}</li>
+ <li><strong>Name:</strong> {executor.appDesc.name}</li>
+ <li><strong>User:</strong> {executor.appDesc.user}</li>
+ </ul>
+ </td>
+ <td>
+ <a href={"log?appId=%s&executorId=%s&logType=stdout"
+ .format(executor.appId, executor.execId)}>stdout</a>
+ <a href={"log?appId=%s&executorId=%s&logType=stderr"
+ .format(executor.appId, executor.execId)}>stderr</a>
+ </td>
+ </tr>
+ }
+
+}
diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
new file mode 100644
index 0000000000..b1336dd1af
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -0,0 +1,78 @@
+package spark.deploy.worker.ui
+
+import akka.actor.ActorRef
+import akka.util.Timeout
+
+import scala.concurrent.duration._
+
+import java.io.{FileInputStream, File}
+
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.{Handler, Server}
+
+import spark.{Utils, Logging}
+import spark.ui.JettyUtils
+import spark.ui.JettyUtils._
+
+/**
+ * Web UI server for the standalone worker.
+ */
+private[spark]
+class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option[Int] = None)
+ extends Logging {
+ implicit val timeout = Timeout(
+ Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
+ val host = Utils.localHostName()
+ val port = requestedPort.getOrElse(
+ System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
+
+ var server: Option[Server] = None
+ var boundPort: Option[Int] = None
+
+ val indexPage = new IndexPage(this)
+
+ val handlers = Array[(String, Handler)](
+ ("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
+ ("/log", (request: HttpServletRequest) => log(request)),
+ ("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
+ ("*", (request: HttpServletRequest) => indexPage.render(request))
+ )
+
+ def start() {
+ try {
+ val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
+ server = Some(srv)
+ boundPort = Some(bPort)
+ logInfo("Started Worker web UI at http://%s:%d".format(host, bPort))
+ } catch {
+ case e: Exception =>
+ logError("Failed to create Worker JettyUtils", e)
+ System.exit(1)
+ }
+ }
+
+ def log(request: HttpServletRequest): String = {
+ val appId = request.getParameter("appId")
+ val executorId = request.getParameter("executorId")
+ val logType = request.getParameter("logType")
+
+ val maxBytes = 1024 * 1024 // Guard against OOM
+ val defaultBytes = 100 * 1024
+ val numBytes = Option(request.getParameter("numBytes"))
+ .flatMap(s => Some(s.toInt)).getOrElse(defaultBytes)
+
+ val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
+ val pre = "==== Last %s bytes of %s/%s/%s ====\n".format(numBytes, appId, executorId, logType)
+ pre + Utils.lastNBytes(path, math.min(numBytes, maxBytes))
+ }
+
+ def stop() {
+ server.foreach(_.stop())
+ }
+}
+
+private[spark] object WorkerWebUI {
+ val STATIC_RESOURCE_DIR = "spark/ui/static"
+ val DEFAULT_PORT="8081"
+}
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 2bf55ea9a9..8360547a74 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -93,15 +93,18 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
val ser = SparkEnv.get.closureSerializer.newInstance()
logInfo("Running task ID " + taskId)
context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
+ var attemptedTask: Option[Task[Any]] = None
+ var taskStart: Long = 0
try {
SparkEnv.set(env)
Accumulators.clear()
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
val task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
+ attemptedTask = Some(task)
logInfo("Its generation is " + task.generation)
env.mapOutputTracker.updateGeneration(task.generation)
- val taskStart = System.currentTimeMillis()
+ taskStart = System.currentTimeMillis()
val value = task.run(taskId.toInt)
val taskFinish = System.currentTimeMillis()
task.metrics.foreach{ m =>
@@ -129,7 +132,10 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
}
case t: Throwable => {
- val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace)
+ val serviceTime = (System.currentTimeMillis() - taskStart).toInt
+ val metrics = attemptedTask.flatMap(t => t.metrics)
+ metrics.foreach{m => m.executorRunTime = serviceTime}
+ val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
// TODO: Should we exit the whole executor here? On the one hand, the failed task may
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index cbf5512e24..07c103503c 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -15,6 +15,7 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
+import spark.deploy.SparkHadoopUtil
import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
import spark.util.NextIterator
import org.apache.hadoop.conf.Configurable
@@ -50,6 +51,7 @@ class HadoopRDD[K, V](
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
override def getPartitions: Array[Partition] = {
+ SparkHadoopUtil.addCredentials(conf);
val inputFormat = createInputFormat(conf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(conf)
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 1164c40c43..d4bda150cb 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -257,7 +257,7 @@ class DAGScheduler(
eventQueue.put(toSubmit)
waiter.awaitResult() match {
case JobSucceeded => {}
- case JobFailed(exception: Exception) =>
+ case JobFailed(exception: Exception, _) =>
logInfo("Failed to run " + callSite)
throw exception
}
@@ -313,7 +313,7 @@ class DAGScheduler(
handleExecutorLost(execId)
case completion: CompletionEvent =>
- sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task,
+ sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task,
completion.reason, completion.taskInfo, completion.taskMetrics)))
handleTaskCompletion(completion)
@@ -325,7 +325,7 @@ class DAGScheduler(
for (job <- activeJobs) {
val error = new SparkException("Job cancelled because SparkContext was shut down")
job.listener.jobFailed(error)
- sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error))))
+ sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error, None))))
}
return true
}
@@ -504,6 +504,7 @@ class DAGScheduler(
case _ => "Unkown"
}
logInfo("%s (%s) finished in %s s".format(stage, stage.origin, serviceTime))
+ stage.completionTime = Some(System.currentTimeMillis)
val stageComp = StageCompleted(stageToInfos(stage))
sparkListeners.foreach{_.onStageCompleted(stageComp)}
running -= stage
@@ -525,6 +526,7 @@ class DAGScheduler(
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
+ idToActiveJob -= stage.priority
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
@@ -618,8 +620,11 @@ class DAGScheduler(
handleExecutorLost(bmAddress.executorId, Some(task.generation))
}
+ case ExceptionFailure(className, description, stackTrace, metrics) =>
+ // Do nothing here, left up to the TaskScheduler to decide how to handle user failures
+
case other =>
- // Non-fetch failure -- probably a bug in user code; abort all jobs depending on this stage
+ // Unrecognized failure - abort all jobs depending on this stage
abortStage(idToStage(task.stageId), task + " failed: " + other)
}
}
@@ -652,7 +657,7 @@ class DAGScheduler(
"(generation " + currentGeneration + ")")
}
}
-
+
private def handleExecutorGained(execId: String, hostPort: String) {
// remove from failedGeneration(execId) ?
if (failedGeneration.contains(execId)) {
@@ -667,11 +672,13 @@ class DAGScheduler(
*/
private def abortStage(failedStage: Stage, reason: String) {
val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq
+ failedStage.completionTime = Some(System.currentTimeMillis())
for (resultStage <- dependentStages) {
val job = resultStageToJob(resultStage)
val error = new SparkException("Job failed: " + reason)
job.listener.jobFailed(error)
- sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error))))
+ sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage)))))
+ idToActiveJob -= resultStage.priority
activeJobs -= job
resultStageToJob -= resultStage
}
@@ -748,6 +755,10 @@ class DAGScheduler(
sizeBefore = pendingTasks.size
pendingTasks.clearOldValues(cleanupTime)
logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size)
+
+ sizeBefore = stageToInfos.size
+ stageToInfos.clearOldValues(cleanupTime)
+ logInfo("stageToInfos " + sizeBefore + " --> " + stageToInfos.size)
}
def stop() {
diff --git a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
index 287f731787..17d0ea4f80 100644
--- a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
@@ -3,11 +3,13 @@ package spark.scheduler
import spark.Logging
import scala.collection.immutable.Set
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.ReflectionUtils
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.conf.Configuration
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._
+import spark.deploy.SparkHadoopUtil
/**
@@ -70,6 +72,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
val conf = new JobConf(configuration)
+ SparkHadoopUtil.addCredentials(conf);
FileInputFormat.setInputPaths(conf, path)
val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
@@ -89,6 +92,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
val jobConf = new JobConf(configuration)
+ SparkHadoopUtil.addCredentials(jobConf);
FileInputFormat.setInputPaths(jobConf, path)
val instance: org.apache.hadoop.mapred.InputFormat[_, _] =
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
index 178bfaba3d..6a9d52f356 100644
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -275,7 +275,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
var info = "JOB_ID=" + job.runId
reason match {
case JobSucceeded => info += " STATUS=SUCCESS"
- case JobFailed(exception) =>
+ case JobFailed(exception, _) =>
info += " STATUS=FAILED REASON="
exception.getMessage.split("\\s+").foreach(info += _ + "_")
case _ =>
diff --git a/core/src/main/scala/spark/scheduler/JobResult.scala b/core/src/main/scala/spark/scheduler/JobResult.scala
index 654131ee84..a0fdf391e6 100644
--- a/core/src/main/scala/spark/scheduler/JobResult.scala
+++ b/core/src/main/scala/spark/scheduler/JobResult.scala
@@ -6,4 +6,4 @@ package spark.scheduler
private[spark] sealed trait JobResult
private[spark] case object JobSucceeded extends JobResult
-private[spark] case class JobFailed(exception: Exception) extends JobResult
+private[spark] case class JobFailed(exception: Exception, failedStage: Option[Stage]) extends JobResult
diff --git a/core/src/main/scala/spark/scheduler/JobWaiter.scala b/core/src/main/scala/spark/scheduler/JobWaiter.scala
index 3cc6a86345..6ff2e29434 100644
--- a/core/src/main/scala/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/spark/scheduler/JobWaiter.scala
@@ -35,7 +35,7 @@ private[spark] class JobWaiter[T](totalTasks: Int, resultHandler: (Int, T) => Un
throw new UnsupportedOperationException("jobFailed() called on a finished JobWaiter")
}
jobFinished = true
- jobResult = JobFailed(exception)
+ jobResult = JobFailed(exception, None)
this.notifyAll()
}
}
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index bac984b5c9..8de3aa91a4 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -68,6 +68,7 @@ class StatsReportListener extends SparkListener with Logging {
showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize))
//runtime breakdown
+
val runtimePcts = stageCompleted.stageInfo.taskInfos.map{
case (info, metrics) => RuntimePercentage(info.duration, metrics)
}
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index 7fc9e13fd9..539cf8233b 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -34,6 +34,7 @@ private[spark] class Stage(
/** When first task was submitted to scheduler. */
var submissionTime: Option[Long] = None
+ var completionTime: Option[Long] = None
private var nextAttemptId = 0
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index d72b0bfc9f..fe6420a522 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -571,6 +571,7 @@ private[spark] class ClusterTaskSetManager(
return
case ef: ExceptionFailure =>
+ sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
val key = ef.description
val now = System.currentTimeMillis
val (printFull, dupCount) = {
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index 93d4318b29..b000e328e6 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -145,6 +145,9 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
// Set the Spark execution environment for the worker thread
SparkEnv.set(env)
val ser = SparkEnv.get.closureSerializer.newInstance()
+ var attemptedTask: Option[Task[_]] = None
+ val start = System.currentTimeMillis()
+ var taskStart: Long = 0
try {
Accumulators.clear()
Thread.currentThread().setContextClassLoader(classLoader)
@@ -153,10 +156,11 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
// this adds a bit of unnecessary overhead but matches how the Mesos Executor works.
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(bytes)
updateDependencies(taskFiles, taskJars) // Download any files added with addFile
- val deserStart = System.currentTimeMillis()
val deserializedTask = ser.deserialize[Task[_]](
taskBytes, Thread.currentThread.getContextClassLoader)
- val deserTime = System.currentTimeMillis() - deserStart
+ attemptedTask = Some(deserializedTask)
+ val deserTime = System.currentTimeMillis() - start
+ taskStart = System.currentTimeMillis()
// Run it
val result: Any = deserializedTask.run(taskId)
@@ -170,16 +174,19 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
val resultToReturn = ser.deserialize[Any](serResult)
val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
ser.serialize(Accumulators.values))
+ val serviceTime = System.currentTimeMillis() - taskStart
logInfo("Finished " + taskId)
- deserializedTask.metrics.get.executorRunTime = deserTime.toInt//info.duration.toInt //close enough
+ deserializedTask.metrics.get.executorRunTime = serviceTime.toInt
deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
-
val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null))
val serializedResult = ser.serialize(taskResult)
localActor ! LocalStatusUpdate(taskId, TaskState.FINISHED, serializedResult)
} catch {
case t: Throwable => {
- val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace)
+ val serviceTime = System.currentTimeMillis() - taskStart
+ val metrics = attemptedTask.flatMap(t => t.metrics)
+ metrics.foreach{m => m.executorRunTime = serviceTime.toInt}
+ val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
localActor ! LocalStatusUpdate(taskId, TaskState.FAILED, ser.serialize(failure))
}
}
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index 70b69bb26f..f12fec41d5 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -152,6 +152,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
info.markFailed()
decreaseRunningTasks(1)
val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](serializedData, getClass.getClassLoader)
+ sched.listener.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null))
if (!finished(index)) {
copiesRunning(index) -= 1
numFailures(index) += 1
diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala
deleted file mode 100644
index 631455abcd..0000000000
--- a/core/src/main/scala/spark/storage/BlockManagerUI.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-package spark.storage
-
-import akka.actor.{ActorRef, ActorSystem}
-
-import akka.util.Timeout
-import scala.concurrent.duration._
-import spray.httpx.TwirlSupport._
-import spray.routing.Directives
-
-import spark.{Logging, SparkContext}
-import spark.util.AkkaUtils
-import spark.Utils
-
-
-/**
- * Web UI server for the BlockManager inside each SparkContext.
- */
-private[spark]
-class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, sc: SparkContext)
- extends Directives with Logging {
-
- implicit val implicitActorSystem = actorSystem
- val STATIC_RESOURCE_DIR = "spark/deploy/static"
-
- implicit val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
- val host = Utils.localHostName()
- val port = if (System.getProperty("spark.ui.port") != null) {
- System.getProperty("spark.ui.port").toInt
- } else {
- // TODO: Unfortunately, it's not possible to pass port 0 to spray and figure out which
- // random port it bound to, so we have to try to find a local one by creating a socket.
- Utils.findFreePort()
- }
-
- /** Start a HTTP server to run the Web interface */
- def start() {
- try {
- AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, handler, "BlockManagerHTTPServer")
- logInfo("Started BlockManager web UI at http://%s:%d".format(host, port))
- } catch {
- case e: Exception =>
- logError("Failed to create BlockManager WebUI", e)
- System.exit(1)
- }
- }
-
- val handler = {
- get {
- path("") {
- complete {
- // Request the current storage status from the Master
- val storageStatusList = sc.getExecutorStorageStatus
- // Calculate macro-level statistics
- val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
- val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
- .reduceOption(_+_).getOrElse(0L)
- val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
- spark.storage.html.index.
- render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
- }
- } ~
- path("rdd") {
- parameter("id") { id =>
- complete {
- val prefix = "rdd_" + id.toString
- val storageStatusList = sc.getExecutorStorageStatus
- val filteredStorageStatusList = StorageUtils.
- filterStorageStatusByPrefix(storageStatusList, prefix)
- val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
- spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
- }
- }
- } ~
- pathPrefix("static") {
- getFromResourceDirectory(STATIC_RESOURCE_DIR)
- }
- }
- }
-
- private[spark] def appUIAddress = "http://" + host + ":" + port
-}
diff --git a/core/src/main/scala/spark/ui/JettyUtils.scala b/core/src/main/scala/spark/ui/JettyUtils.scala
new file mode 100644
index 0000000000..bc6f9c10d5
--- /dev/null
+++ b/core/src/main/scala/spark/ui/JettyUtils.scala
@@ -0,0 +1,115 @@
+package spark.ui
+
+import annotation.tailrec
+
+import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
+
+import net.liftweb.json.{JValue, pretty, render}
+
+import org.eclipse.jetty.server.{Server, Request, Handler}
+import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHandler, AbstractHandler}
+import org.eclipse.jetty.util.thread.QueuedThreadPool
+
+import scala.util.{Try, Success, Failure}
+import scala.xml.Node
+
+import spark.Logging
+
+/** Utilities for launching a web server using Jetty's HTTP Server class */
+private[spark] object JettyUtils extends Logging {
+ // Base type for a function that returns something based on an HTTP request. Allows for
+ // implicit conversion from many types of functions to jetty Handlers.
+ type Responder[T] = HttpServletRequest => T
+
+ // Conversions from various types of Responder's to jetty Handlers
+ implicit def jsonResponderToHandler(responder: Responder[JValue]): Handler =
+ createHandler(responder, "text/json", (in: JValue) => pretty(render(in)))
+
+ implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler =
+ createHandler(responder, "text/html", (in: Seq[Node]) => "<!DOCTYPE html>" + in.toString)
+
+ implicit def textResponderToHandler(responder: Responder[String]): Handler =
+ createHandler(responder, "text/plain")
+
+ private def createHandler[T <% AnyRef](responder: Responder[T], contentType: String,
+ extractFn: T => String = (in: Any) => in.toString): Handler = {
+ new AbstractHandler {
+ def handle(target: String,
+ baseRequest: Request,
+ request: HttpServletRequest,
+ response: HttpServletResponse) {
+ response.setContentType("%s;charset=utf-8".format(contentType))
+ response.setStatus(HttpServletResponse.SC_OK)
+ baseRequest.setHandled(true)
+ val result = responder(request)
+ response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
+ response.getWriter().println(extractFn(result))
+ }
+ }
+ }
+
+ /** Creates a handler that always redirects the user to a given path */
+ def createRedirectHandler(newPath: String): Handler = {
+ new AbstractHandler {
+ def handle(target: String,
+ baseRequest: Request,
+ request: HttpServletRequest,
+ response: HttpServletResponse) {
+ response.setStatus(302)
+ response.setHeader("Location", baseRequest.getRootURL + newPath)
+ baseRequest.setHandled(true)
+ }
+ }
+ }
+
+ /** Creates a handler for serving files from a static directory */
+ def createStaticHandler(resourceBase: String): ResourceHandler = {
+ val staticHandler = new ResourceHandler
+ Option(getClass.getClassLoader.getResource(resourceBase)) match {
+ case Some(res) =>
+ staticHandler.setResourceBase(res.toString)
+ case None =>
+ throw new Exception("Could not find resource path for Web UI: " + resourceBase)
+ }
+ staticHandler
+ }
+
+ /**
+ * Attempts to start a Jetty server at the supplied ip:port which uses the supplied handlers.
+ *
+ * If the desired port number is contented, continues incrementing ports until a free port is
+ * found. Returns the chosen port and the jetty Server object.
+ */
+ def startJettyServer(ip: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int) = {
+ val handlersToRegister = handlers.map { case(path, handler) =>
+ val contextHandler = new ContextHandler(path)
+ contextHandler.setHandler(handler)
+ contextHandler.asInstanceOf[org.eclipse.jetty.server.Handler]
+ }
+
+ val handlerList = new HandlerList
+ handlerList.setHandlers(handlersToRegister.toArray)
+
+ @tailrec
+ def connect(currentPort: Int): (Server, Int) = {
+ val server = new Server(currentPort)
+ val pool = new QueuedThreadPool
+ pool.setDaemon(true)
+ server.setThreadPool(pool)
+ server.setHandler(handlerList)
+
+ Try { server.start() } match {
+ case s: Success[_] =>
+ sys.addShutdownHook(server.stop()) // Be kind, un-bind
+ (server, server.getConnectors.head.getLocalPort)
+ case f: Failure[_] =>
+ server.stop()
+ logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort))
+ logInfo("Error was: " + f.toString)
+ connect((currentPort + 1) % 65536)
+ }
+ }
+
+ connect(port)
+ }
+}
diff --git a/core/src/main/scala/spark/ui/Page.scala b/core/src/main/scala/spark/ui/Page.scala
new file mode 100644
index 0000000000..c853b44b76
--- /dev/null
+++ b/core/src/main/scala/spark/ui/Page.scala
@@ -0,0 +1,3 @@
+package spark.ui
+
+private[spark] object Page extends Enumeration { val Storage, Jobs = Value } \ No newline at end of file
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
new file mode 100644
index 0000000000..b3bdc2c490
--- /dev/null
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -0,0 +1,63 @@
+package spark.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.{Handler, Server}
+
+import spark.{Logging, SparkContext, Utils}
+import spark.ui.storage.BlockManagerUI
+import spark.ui.jobs.JobProgressUI
+import spark.ui.UIUtils._
+import spark.ui.JettyUtils._
+
+/** Top level user interface for Spark */
+private[spark] class SparkUI(sc: SparkContext) extends Logging {
+ // TODO(pwendell): It would be nice to add a view that prints out environment information
+
+ val host = Utils.localHostName()
+ val port = Option(System.getProperty("spark.ui.port")).getOrElse(SparkUI.DEFAULT_PORT).toInt
+ var boundPort: Option[Int] = None
+ var server: Option[Server] = None
+
+ val handlers = Seq[(String, Handler)](
+ ("/static", createStaticHandler(SparkUI.STATIC_RESOURCE_DIR)),
+ ("/", createRedirectHandler("/stages"))
+ )
+ val storage = new BlockManagerUI(sc)
+ val jobs = new JobProgressUI(sc)
+ val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ handlers
+
+ /** Bind the HTTP server which backs this web interface */
+ def bind() {
+ try {
+ val (srv, usedPort) = JettyUtils.startJettyServer("0.0.0.0", port, allHandlers)
+ logInfo("Started Spark Web UI at http://%s:%d".format(host, usedPort))
+ server = Some(srv)
+ boundPort = Some(usedPort)
+ } catch {
+ case e: Exception =>
+ logError("Failed to create Spark JettyUtils", e)
+ System.exit(1)
+ }
+ }
+
+ /** Initialize all components of the server */
+ def start() {
+ // NOTE: This is decoupled from bind() because of the following dependency cycle:
+ // DAGScheduler() requires that the port of this server is known
+ // This server must register all handlers, including JobProgressUI, before binding
+ // JobProgressUI registers a listener with SparkContext, which requires sc to initialize
+ jobs.start()
+ }
+
+ def stop() {
+ server.foreach(_.stop())
+ }
+
+ private[spark] def appUIAddress = "http://" + host + ":" + boundPort.getOrElse("-1")
+}
+
+private[spark] object SparkUI {
+ val DEFAULT_PORT = "33000"
+ val STATIC_RESOURCE_DIR = "spark/ui/static"
+}
diff --git a/core/src/main/scala/spark/ui/UIUtils.scala b/core/src/main/scala/spark/ui/UIUtils.scala
new file mode 100644
index 0000000000..7b79290d1b
--- /dev/null
+++ b/core/src/main/scala/spark/ui/UIUtils.scala
@@ -0,0 +1,113 @@
+package spark.ui
+
+import scala.xml.Node
+
+import spark.SparkContext
+
+/** Utility functions for generating XML pages with spark content. */
+private[spark] object UIUtils {
+ import Page._
+
+ /** Returns a spark page with correctly formatted headers */
+ def headerSparkPage(content: => Seq[Node], sc: SparkContext, title: String, page: Page.Value)
+ : Seq[Node] = {
+ val storage = page match {
+ case Storage => <li class="active"><a href="/storage">Storage</a></li>
+ case _ => <li><a href="/storage">Storage</a></li>
+ }
+ val jobs = page match {
+ case Jobs => <li class="active"><a href="/stages">Jobs</a></li>
+ case _ => <li><a href="/stages">Jobs</a></li>
+ }
+
+ <html>
+ <head>
+ <meta http-equiv="Content-type" content="text/html; charset=utf-8" />
+ <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css" />
+ <link rel="stylesheet" href="/static/webui.css" type="text/css" />
+ <link rel="stylesheet" href="/static/bootstrap-responsive.min.css" type="text/css" />
+ <script src="/static/sorttable.js"></script>
+ <title>{title}</title>
+ <style type="text/css">
+ table.sortable thead {{ cursor: pointer; }}
+ </style>
+ </head>
+ <body>
+ <div class="container">
+
+ <div class="row">
+ <div class="span12">
+ <div class="navbar">
+ <div class="navbar-inner">
+ <div class="container">
+ <div class="brand"><img src="/static/spark-logo-77x50px-hd.png" /></div>
+ <ul class="nav">
+ {storage}
+ {jobs}
+ </ul>
+ <ul id="infolist">
+ <li>Application: <strong>{sc.appName}</strong></li>
+ <li>Master: <strong>{sc.master}</strong></li>
+ <li>Executors: <strong>{sc.getExecutorStorageStatus.size}</strong></li>
+ </ul>
+ </div>
+ </div>
+ </div>
+ </div>
+ </div>
+
+ <div class="row" style="padding-top: 5px;">
+ <div class="span12">
+ <h1 style="vertical-align: bottom; display: inline-block;">
+ {title}
+ </h1>
+ </div>
+ </div>
+ <hr/>
+ {content}
+ </div>
+ </body>
+ </html>
+ }
+
+ /** Returns a page with the spark css/js and a simple format. Used for scheduler UI. */
+ def basicSparkPage(content: => Seq[Node], title: String): Seq[Node] = {
+ <html>
+ <head>
+ <meta http-equiv="Content-type" content="text/html; charset=utf-8" />
+ <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css" />
+ <link rel="stylesheet" href="/static/bootstrap-responsive.min.css" type="text/css" />
+ <script src="/static/sorttable.js"></script>
+ <title>{title}</title>
+ <style type="text/css">
+ table.sortable thead {{ cursor: pointer; }}
+ </style>
+ </head>
+ <body>
+ <div class="container">
+ <div class="row">
+ <div class="span2">
+ <img src="/static/spark_logo.png" />
+ </div>
+ <div class="span10">
+ <h1 style="vertical-align: bottom; margin-top: 40px; display: inline-block;">
+ {title}
+ </h1>
+ </div>
+ </div>
+ {content}
+ </div>
+ </body>
+ </html>
+ }
+
+ /** Returns an HTML table constructed by generating a row for each object in a sequence. */
+ def listingTable[T](headers: Seq[String], makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>{headers.map(h => <th>{h}</th>)}</thead>
+ <tbody>
+ {rows.map(r => makeRow(r))}
+ </tbody>
+ </table>
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
new file mode 100644
index 0000000000..8bbc6ce88e
--- /dev/null
+++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
@@ -0,0 +1,71 @@
+package spark.ui
+
+import scala.util.Random
+
+import spark.SparkContext
+import spark.SparkContext._
+
+/**
+ * Continuously generates jobs that expose various features of the WebUI (internal testing tool).
+ *
+ * Usage: ./run spark.ui.UIWorkloadGenerator [master]
+ */
+private[spark] object UIWorkloadGenerator {
+ val NUM_PARTITIONS = 100
+ val INTER_JOB_WAIT_MS = 500
+
+ def main(args: Array[String]) {
+ val master = args(0)
+ val appName = "Spark UI Tester"
+ val sc = new SparkContext(master, appName)
+
+ // NOTE: Right now there is no easy way for us to show spark.job.annotation for a given phase,
+ // but we pass it here anyways since it will be useful once we do.
+ def setName(s: String) = {
+ sc.addLocalProperties("spark.job.annotation", s)
+ }
+ val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS)
+ def nextFloat() = (new Random()).nextFloat()
+
+ val jobs = Seq[(String, () => Long)](
+ ("Count", baseData.count),
+ ("Cache and Count", baseData.map(x => x).cache.count),
+ ("Single Shuffle", baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count),
+ ("Entirely failed phase", baseData.map(x => throw new Exception).count),
+ ("Partially failed phase", {
+ baseData.map{x =>
+ val probFailure = (4.0 / NUM_PARTITIONS)
+ if (nextFloat() < probFailure) {
+ throw new Exception("This is a task failure")
+ }
+ 1
+ }.count
+ }),
+ ("Partially failed phase (longer tasks)", {
+ baseData.map{x =>
+ val probFailure = (4.0 / NUM_PARTITIONS)
+ if (nextFloat() < probFailure) {
+ Thread.sleep(100)
+ throw new Exception("This is a task failure")
+ }
+ 1
+ }.count
+ }),
+ ("Job with delays", baseData.map(x => Thread.sleep(100)).count)
+ )
+
+ while (true) {
+ for ((desc, job) <- jobs) {
+ try {
+ setName(desc)
+ job()
+ println("Job funished: " + desc)
+ } catch {
+ case e: Exception =>
+ println("Job Failed: " + desc)
+ }
+ Thread.sleep(INTER_JOB_WAIT_MS)
+ }
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
new file mode 100644
index 0000000000..1e675ab2cb
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -0,0 +1,112 @@
+package spark.ui.jobs
+
+import java.util.Date
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.Some
+import scala.xml.{NodeSeq, Node}
+
+import spark.scheduler.Stage
+import spark.ui.UIUtils._
+import spark.ui.Page._
+import spark.storage.StorageLevel
+
+/** Page showing list of all ongoing and recently finished stages */
+private[spark] class IndexPage(parent: JobProgressUI) {
+ def listener = parent.listener
+ val dateFmt = parent.dateFmt
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val activeStages = listener.activeStages.toSeq
+ val completedStages = listener.completedStages.reverse.toSeq
+ val failedStages = listener.failedStages.reverse.toSeq
+
+ /** Special table which merges two header cells. */
+ def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <th>Stage Id</th>
+ <th>Origin</th>
+ <th>Submitted</th>
+ <td>Duration</td>
+ <td colspan="2">Tasks: Complete/Total</td>
+ <td>Shuffle Activity</td>
+ <td>Stored RDD</td>
+ </thead>
+ <tbody>
+ {rows.map(r => makeRow(r))}
+ </tbody>
+ </table>
+ }
+
+ val activeStageTable: NodeSeq = stageTable(stageRow, activeStages)
+ val completedStageTable = stageTable(stageRow, completedStages)
+ val failedStageTable: NodeSeq = stageTable(stageRow, failedStages)
+
+ val content = <h2>Active Stages</h2> ++ activeStageTable ++
+ <h2>Completed Stages</h2> ++ completedStageTable ++
+ <h2>Failed Stages</h2> ++ failedStageTable
+
+ headerSparkPage(content, parent.sc, "Spark Stages", Jobs)
+ }
+
+ def getElapsedTime(submitted: Option[Long], completed: Long): String = {
+ submitted match {
+ case Some(t) => parent.formatDuration(completed - t)
+ case _ => "Unknown"
+ }
+ }
+
+ def makeProgressBar(completed: Int, total: Int): Seq[Node] = {
+ val width=130
+ val height=15
+ val completeWidth = (completed.toDouble / total) * width
+
+ <svg width={width.toString} height={height.toString}>
+ <rect width={width.toString} height={height.toString}
+ fill="white" stroke="rgb(51,51,51)" stroke-width="1" />
+ <rect width={completeWidth.toString} height={height.toString}
+ fill="rgb(0,136,204)" stroke="black" stroke-width="1" />
+ </svg>
+ }
+
+
+ def stageRow(s: Stage): Seq[Node] = {
+ val submissionTime = s.submissionTime match {
+ case Some(t) => dateFmt.format(new Date(t))
+ case None => "Unknown"
+ }
+ val (read, write) = (listener.hasShuffleRead(s.id), listener.hasShuffleWrite(s.id))
+ val shuffleInfo = (read, write) match {
+ case (true, true) => "Read/Write"
+ case (true, false) => "Read"
+ case (false, true) => "Write"
+ case _ => ""
+ }
+ val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
+ val totalTasks = s.numPartitions
+
+ <tr>
+ <td>{s.id}</td>
+ <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.origin}</a></td>
+ <td>{submissionTime}</td>
+ <td>{getElapsedTime(s.submissionTime,
+ s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
+ <td class="progress-cell">{makeProgressBar(completedTasks, totalTasks)}</td>
+ <td style="border-left: 0; text-align: center;">{completedTasks} / {totalTasks}
+ {listener.stageToTasksFailed.getOrElse(s.id, 0) match {
+ case f if f > 0 => "(%s failed)".format(f)
+ case _ =>
+ }}
+ </td>
+ <td>{shuffleInfo}</td>
+ <td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) {
+ <a href={"/storage/rdd?id=%s".format(s.rdd.id)}>
+ {Option(s.rdd.name).getOrElse(s.rdd.id)}
+ </a>
+ }}
+ </td>
+ </tr>
+ }
+}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
new file mode 100644
index 0000000000..c5d2aa5d60
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -0,0 +1,127 @@
+package spark.ui.jobs
+
+import scala.concurrent.duration._
+
+import java.text.SimpleDateFormat
+
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.Handler
+
+import scala.Seq
+import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
+
+import spark.ui.JettyUtils._
+import spark.{ExceptionFailure, SparkContext, Success, Utils}
+import spark.scheduler._
+import spark.scheduler.cluster.TaskInfo
+import spark.executor.TaskMetrics
+import collection.mutable
+
+/** Web UI showing progress status of all jobs in the given SparkContext. */
+private[spark] class JobProgressUI(val sc: SparkContext) {
+ private var _listener: Option[JobProgressListener] = None
+ def listener = _listener.get
+ val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+
+ private val indexPage = new IndexPage(this)
+ private val stagePage = new StagePage(this)
+
+ def start() {
+ _listener = Some(new JobProgressListener)
+ sc.addSparkListener(listener)
+ }
+
+ def formatDuration(ms: Long) = Utils.msDurationToString(ms)
+
+ def getHandlers = Seq[(String, Handler)](
+ ("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)),
+ ("/stages", (request: HttpServletRequest) => indexPage.render(request))
+ )
+}
+
+private[spark] class JobProgressListener extends SparkListener {
+ // How many stages to remember
+ val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
+
+ val activeStages = HashSet[Stage]()
+ val completedStages = ListBuffer[Stage]()
+ val failedStages = ListBuffer[Stage]()
+
+ val stageToTasksComplete = HashMap[Int, Int]()
+ val stageToTasksFailed = HashMap[Int, Int]()
+ val stageToTaskInfos =
+ HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
+
+ override def onJobStart(jobStart: SparkListenerJobStart) {}
+
+ override def onStageCompleted(stageCompleted: StageCompleted) = {
+ val stage = stageCompleted.stageInfo.stage
+ activeStages -= stage
+ completedStages += stage
+ trimIfNecessary(completedStages)
+ }
+
+ /** If stages is too large, remove and garbage collect old stages */
+ def trimIfNecessary(stages: ListBuffer[Stage]) {
+ if (stages.size > RETAINED_STAGES) {
+ val toRemove = RETAINED_STAGES / 10
+ stages.takeRight(toRemove).foreach( s => {
+ stageToTaskInfos.remove(s.id)
+ })
+ stages.trimEnd(toRemove)
+ }
+ }
+
+ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
+ activeStages += stageSubmitted.stage
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ val sid = taskEnd.task.stageId
+ val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
+ taskEnd.reason match {
+ case e: ExceptionFailure =>
+ stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
+ (Some(e), e.metrics)
+ case _ =>
+ stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
+ (None, Some(taskEnd.taskMetrics))
+ }
+ val taskList = stageToTaskInfos.getOrElse(
+ sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList += ((taskEnd.taskInfo, metrics, failureInfo))
+ stageToTaskInfos(sid) = taskList
+ }
+
+ override def onJobEnd(jobEnd: SparkListenerJobEnd) {
+ jobEnd match {
+ case end: SparkListenerJobEnd =>
+ end.jobResult match {
+ case JobFailed(ex, Some(stage)) =>
+ activeStages -= stage
+ failedStages += stage
+ trimIfNecessary(failedStages)
+ case _ =>
+ }
+ case _ =>
+ }
+ }
+
+ /** Is this stage's input from a shuffle read. */
+ def hasShuffleRead(stageID: Int): Boolean = {
+ // This is written in a slightly complicated way to avoid having to scan all tasks
+ for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
+ if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
+ }
+ return false // No tasks have finished for this stage
+ }
+
+ /** Is this stage's output to a shuffle write. */
+ def hasShuffleWrite(stageID: Int): Boolean = {
+ // This is written in a slightly complicated way to avoid having to scan all tasks
+ for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
+ if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
+ }
+ return false // No tasks have finished for this stage
+ }
+}
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
new file mode 100644
index 0000000000..51b82b6a8c
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -0,0 +1,114 @@
+package spark.ui.jobs
+
+import java.util.Date
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import spark.ui.UIUtils._
+import spark.ui.Page._
+import spark.util.Distribution
+import spark.{ExceptionFailure, Utils}
+import spark.scheduler.cluster.TaskInfo
+import spark.executor.TaskMetrics
+
+/** Page showing statistics and task list for a given stage */
+private[spark] class StagePage(parent: JobProgressUI) {
+ def listener = parent.listener
+ val dateFmt = parent.dateFmt
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val stageId = request.getParameter("id").toInt
+
+ if (!listener.stageToTaskInfos.contains(stageId)) {
+ val content =
+ <div>
+ <h2>Summary Metrics</h2> No tasks have finished yet
+ <h2>Tasks</h2> No tasks have finished yet
+ </div>
+ return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
+ }
+
+ val tasks = listener.stageToTaskInfos(stageId)
+
+ val shuffleRead = listener.hasShuffleRead(stageId)
+ val shuffleWrite = listener.hasShuffleWrite(stageId)
+
+ val taskHeaders: Seq[String] =
+ Seq("Task ID", "Duration", "Locality Level", "Worker", "Launch Time") ++
+ {if (shuffleRead) Seq("Shuffle Read") else Nil} ++
+ {if (shuffleWrite) Seq("Shuffle Write") else Nil} ++
+ Seq("Details")
+
+ val taskTable = listingTable(taskHeaders, taskRow, tasks)
+
+ // Excludes tasks which failed and have incomplete metrics
+ val validTasks = tasks.filter(t => Option(t._2).isDefined)
+
+ val summaryTable: Option[Seq[Node]] =
+ if (validTasks.size == 0) {
+ None
+ }
+ else {
+ val serviceTimes = validTasks.map{case (info, metrics, exception) =>
+ metrics.get.executorRunTime.toDouble}
+ val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
+ ms => parent.formatDuration(ms.toLong))
+
+ def getQuantileCols(data: Seq[Double]) =
+ Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong))
+
+ val shuffleReadSizes = validTasks.map {
+ case(info, metrics, exception) =>
+ metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+ }
+ val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes)
+
+ val shuffleWriteSizes = validTasks.map {
+ case(info, metrics, exception) =>
+ metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
+ }
+ val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
+
+ val listings: Seq[Seq[String]] = Seq(serviceQuantiles,
+ if (shuffleRead) shuffleReadQuantiles else Nil,
+ if (shuffleWrite) shuffleWriteQuantiles else Nil)
+
+ val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max")
+ def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
+ Some(listingTable(quantileHeaders, quantileRow, listings))
+ }
+
+ val content =
+ <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++ <h2>Tasks</h2> ++ taskTable;
+
+ headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
+ }
+
+
+ def taskRow(taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
+ def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
+ trace.map(e => <span style="display:block;">{e.toString}</span>)
+ val (info, metrics, exception) = taskData
+ <tr>
+ <td>{info.taskId}</td>
+ <td sorttable_customkey={metrics.map{m => m.executorRunTime.toString}.getOrElse("1")}>
+ {metrics.map{m => parent.formatDuration(m.executorRunTime)}.getOrElse("")}
+ </td>
+ <td>{info.taskLocality}</td>
+ <td>{info.hostPort}</td>
+ <td>{dateFmt.format(new Date(info.launchTime))}</td>
+ {metrics.flatMap{m => m.shuffleReadMetrics}.map{s =>
+ <td>{Utils.memoryBytesToString(s.remoteBytesRead)}</td>}.getOrElse("")}
+ {metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
+ <td>{Utils.memoryBytesToString(s.shuffleBytesWritten)}</td>}.getOrElse("")}
+ <td>{exception.map(e =>
+ <span>
+ {e.className} ({e.description})<br/>
+ {fmtStackTrace(e.stackTrace)}
+ </span>).getOrElse("")}
+ </td>
+ </tr>
+ }
+}
diff --git a/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala
new file mode 100644
index 0000000000..252600aad4
--- /dev/null
+++ b/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala
@@ -0,0 +1,24 @@
+package spark.ui.storage
+
+import scala.concurrent.duration._
+
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.Handler
+
+import spark.{Logging, SparkContext}
+import spark.ui.JettyUtils._
+
+/** Web UI showing storage status of all RDD's in the given SparkContext. */
+private[spark] class BlockManagerUI(val sc: SparkContext) extends Logging {
+ implicit val timeout = Duration.create(
+ System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+
+ val indexPage = new IndexPage(this)
+ val rddPage = new RDDPage(this)
+
+ def getHandlers = Seq[(String, Handler)](
+ ("/storage/rdd", (request: HttpServletRequest) => rddPage.render(request)),
+ ("/storage", (request: HttpServletRequest) => indexPage.render(request))
+ )
+}
diff --git a/core/src/main/scala/spark/ui/storage/IndexPage.scala b/core/src/main/scala/spark/ui/storage/IndexPage.scala
new file mode 100644
index 0000000000..d284134391
--- /dev/null
+++ b/core/src/main/scala/spark/ui/storage/IndexPage.scala
@@ -0,0 +1,64 @@
+package spark.ui.storage
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import spark.storage.{RDDInfo, StorageUtils}
+import spark.Utils
+import spark.ui.UIUtils._
+import spark.ui.Page._
+
+/** Page showing list of RDD's currently stored in the cluster */
+private[spark] class IndexPage(parent: BlockManagerUI) {
+ val sc = parent.sc
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val storageStatusList = sc.getExecutorStorageStatus
+ // Calculate macro-level statistics
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
+ val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
+ .reduceOption(_+_).getOrElse(0L)
+
+ val rddHeaders = Seq(
+ "RDD Name",
+ "Storage Level",
+ "Cached Partitions",
+ "Fraction Partitions Cached",
+ "Size in Memory",
+ "Size on Disk")
+ val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
+ val rddTable = listingTable(rddHeaders, rddRow, rdds)
+
+ val content =
+ <div class="row">
+ <div class="span12">
+ <ul class="unstyled">
+ <li><strong>Memory:</strong>
+ {Utils.memoryBytesToString(maxMem - remainingMem)} Used
+ ({Utils.memoryBytesToString(remainingMem)} Available) </li>
+ <li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li>
+ </ul>
+ </div>
+ </div> ++ {rddTable};
+
+ headerSparkPage(content, parent.sc, "Spark Storage ", Storage)
+ }
+
+ def rddRow(rdd: RDDInfo): Seq[Node] = {
+ <tr>
+ <td>
+ <a href={"/storage/rdd?id=%s".format(rdd.id)}>
+ {rdd.name}
+ </a>
+ </td>
+ <td>{rdd.storageLevel.description}
+ </td>
+ <td>{rdd.numCachedPartitions}</td>
+ <td>{rdd.numCachedPartitions / rdd.numPartitions.toDouble}</td>
+ <td>{Utils.memoryBytesToString(rdd.memSize)}</td>
+ <td>{Utils.memoryBytesToString(rdd.diskSize)}</td>
+ </tr>
+ }
+}
diff --git a/core/src/main/scala/spark/ui/storage/RDDPage.scala b/core/src/main/scala/spark/ui/storage/RDDPage.scala
new file mode 100644
index 0000000000..0cb1e47ea5
--- /dev/null
+++ b/core/src/main/scala/spark/ui/storage/RDDPage.scala
@@ -0,0 +1,104 @@
+package spark.ui.storage
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import spark.storage.{StorageStatus, StorageUtils}
+import spark.ui.UIUtils._
+import spark.Utils
+import spark.storage.BlockManagerMasterActor.BlockStatus
+import spark.ui.Page._
+
+/** Page showing storage details for a given RDD */
+private[spark] class RDDPage(parent: BlockManagerUI) {
+ val sc = parent.sc
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val id = request.getParameter("id")
+ val prefix = "rdd_" + id.toString
+ val storageStatusList = sc.getExecutorStorageStatus
+ val filteredStorageStatusList = StorageUtils.
+ filterStorageStatusByPrefix(storageStatusList, prefix)
+ val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
+
+ val workerHeaders = Seq("Host", "Memory Usage", "Disk Usage")
+ val workers = filteredStorageStatusList.map((prefix, _))
+ val workerTable = listingTable(workerHeaders, workerRow, workers)
+
+ val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk")
+ val blocks = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1)
+ val blockTable = listingTable(blockHeaders, blockRow, blocks)
+
+ val content =
+ <div class="row">
+ <div class="span12">
+ <ul class="unstyled">
+ <li>
+ <strong>Storage Level:</strong>
+ {rddInfo.storageLevel.description}
+ </li>
+ <li>
+ <strong>Cached Partitions:</strong>
+ {rddInfo.numCachedPartitions}
+ </li>
+ <li>
+ <strong>Total Partitions:</strong>
+ {rddInfo.numPartitions}
+ </li>
+ <li>
+ <strong>Memory Size:</strong>
+ {Utils.memoryBytesToString(rddInfo.memSize)}
+ </li>
+ <li>
+ <strong>Disk Size:</strong>
+ {Utils.memoryBytesToString(rddInfo.diskSize)}
+ </li>
+ </ul>
+ </div>
+ </div>
+ <hr/>
+ <div class="row">
+ <div class="span12">
+ {workerTable}
+ </div>
+ </div>
+ <hr/>
+ <div class="row">
+ <div class="span12">
+ <h3> RDD Summary </h3>
+ {blockTable}
+ </div>
+ </div>;
+
+ headerSparkPage(content, parent.sc, "RDD Info: " + rddInfo.name, Jobs)
+ }
+
+ def blockRow(blk: (String, BlockStatus)): Seq[Node] = {
+ val (id, block) = blk
+ <tr>
+ <td>{id}</td>
+ <td>
+ {block.storageLevel.description}
+ </td>
+ <td sorttable_customkey={block.memSize.toString}>
+ {Utils.memoryBytesToString(block.memSize)}
+ </td>
+ <td sorttable_customkey={block.diskSize.toString}>
+ {Utils.memoryBytesToString(block.diskSize)}
+ </td>
+ </tr>
+ }
+
+ def workerRow(worker: (String, StorageStatus)): Seq[Node] = {
+ val (prefix, status) = worker
+ <tr>
+ <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td>
+ <td>
+ {Utils.memoryBytesToString(status.memUsed(prefix))}
+ ({Utils.memoryBytesToString(status.memRemaining)} Total Available)
+ </td>
+ <td>{Utils.memoryBytesToString(status.diskUsed(prefix))}</td>
+ </tr>
+ }
+}
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index ea39888c21..c381a0510b 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -1,20 +1,10 @@
package spark.util
-import akka.actor.{Props, ActorSystem, ExtendedActorSystem}
+import akka.actor.{ActorSystem, ExtendedActorSystem}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
-import akka.pattern.ask
-import akka.remote.RemoteActorRefProvider
-
-import spray.routing.Route
-import spray.io.IOExtension
-import spray.routing.HttpServiceActor
-import spray.can.server.{HttpServer, ServerSettings}
-import spray.io.SingletonHandler
import scala.concurrent.Await
-import spark.{Utils, SparkException}
-
-import java.util.concurrent.TimeoutException
+import akka.remote.RemoteActorRefProvider
/**
* Various utility classes for working with Akka.
@@ -65,29 +55,4 @@ private[spark] object AkkaUtils {
return (actorSystem, boundPort)
}
- /**
- * Creates a Spray HTTP server bound to a given IP and port with a given Spray Route object to
- * handle requests. Returns the bound port or throws a SparkException on failure.
- * TODO: Not changing ip to host here - is it required ?
- */
- def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route, name: String = "HttpServer") = {
- val ioWorker = IOExtension(actorSystem).ioBridge()
- val httpService = actorSystem.actorOf(Props(HttpServiceActor(route)))
- val server = actorSystem.actorOf(
- Props(new HttpServer(ioWorker, SingletonHandler(httpService), ServerSettings())), name = name)
- actorSystem.registerOnTermination { actorSystem.stop(ioWorker) }
- val timeout = 3.seconds
- val future = server.ask(HttpServer.Bind(ip, port))(timeout)
- try {
- Await.result(future, timeout) match {
- case bound: HttpServer.Bound =>
- server
- case other: Any =>
- throw new SparkException("Failed to bind web UI to port " + port + ": " + other)
- }
- } catch {
- case e: TimeoutException =>
- throw new SparkException("Failed to bind web UI to port " + port)
- }
- }
}