aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTimothy Chen <tnachen@gmail.com>2015-05-01 18:36:42 -0700
committerAndrew Or <andrew@databricks.com>2015-05-01 18:36:42 -0700
commit2022193412e832393a29b94609841c3ffe8e3d66 (patch)
treed4b355562c135caef634b443c41601997a59619d /core
parent099327d5376554134c9af49bc2045add4cfb024d (diff)
downloadspark-2022193412e832393a29b94609841c3ffe8e3d66.tar.gz
spark-2022193412e832393a29b94609841c3ffe8e3d66.tar.bz2
spark-2022193412e832393a29b94609841c3ffe8e3d66.zip
[SPARK-7216] [MESOS] Add driver details page to Mesos cluster UI.
Add a details page that displays Mesos driver in the Mesos cluster UI Author: Timothy Chen <tnachen@gmail.com> Closes #5763 from tnachen/mesos_cluster_page and squashes the following commits: 55f36eb [Timothy Chen] Add driver details page to Mesos cluster UI.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala180
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala4
6 files changed, 222 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
new file mode 100644
index 0000000000..be8560d10f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.deploy.Command
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.scheduler.cluster.mesos.{MesosClusterSubmissionState, MesosClusterRetryState}
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+
+private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") {
+
+ override def render(request: HttpServletRequest): Seq[Node] = {
+ val driverId = request.getParameter("id")
+ require(driverId != null && driverId.nonEmpty, "Missing id parameter")
+
+ val state = parent.scheduler.getDriverState(driverId)
+ if (state.isEmpty) {
+ val content =
+ <div>
+ <p>Cannot find driver {driverId}</p>
+ </div>
+ return UIUtils.basicSparkPage(content, s"Details for Job $driverId")
+ }
+ val driverState = state.get
+ val driverHeaders = Seq("Driver property", "Value")
+ val schedulerHeaders = Seq("Scheduler property", "Value")
+ val commandEnvHeaders = Seq("Command environment variable", "Value")
+ val launchedHeaders = Seq("Launched property", "Value")
+ val commandHeaders = Seq("Comamnd property", "Value")
+ val retryHeaders = Seq("Last failed status", "Next retry time", "Retry count")
+ val driverDescription = Iterable.apply(driverState.description)
+ val submissionState = Iterable.apply(driverState.submissionState)
+ val command = Iterable.apply(driverState.description.command)
+ val schedulerProperties = Iterable.apply(driverState.description.schedulerProperties)
+ val commandEnv = Iterable.apply(driverState.description.command.environment)
+ val driverTable =
+ UIUtils.listingTable(driverHeaders, driverRow, driverDescription)
+ val commandTable =
+ UIUtils.listingTable(commandHeaders, commandRow, command)
+ val commandEnvTable =
+ UIUtils.listingTable(commandEnvHeaders, propertiesRow, commandEnv)
+ val schedulerTable =
+ UIUtils.listingTable(schedulerHeaders, propertiesRow, schedulerProperties)
+ val launchedTable =
+ UIUtils.listingTable(launchedHeaders, launchedRow, submissionState)
+ val retryTable =
+ UIUtils.listingTable(
+ retryHeaders, retryRow, Iterable.apply(driverState.description.retryState))
+ val content =
+ <p>Driver state information for driver id {driverId}</p>
+ <a href="/">Back to Drivers</a>
+ <div class="row-fluid">
+ <div class="span12">
+ <h4>Driver state: {driverState.state}</h4>
+ <h4>Driver properties</h4>
+ {driverTable}
+ <h4>Driver command</h4>
+ {commandTable}
+ <h4>Driver command environment</h4>
+ {commandEnvTable}
+ <h4>Scheduler properties</h4>
+ {schedulerTable}
+ <h4>Launched state</h4>
+ {launchedTable}
+ <h4>Retry state</h4>
+ {retryTable}
+ </div>
+ </div>;
+
+ UIUtils.basicSparkPage(content, s"Details for Job $driverId")
+ }
+
+ private def launchedRow(submissionState: Option[MesosClusterSubmissionState]): Seq[Node] = {
+ submissionState.map { state =>
+ <tr>
+ <td>Mesos Slave ID</td>
+ <td>{state.slaveId.getValue}</td>
+ </tr>
+ <tr>
+ <td>Mesos Task ID</td>
+ <td>{state.taskId.getValue}</td>
+ </tr>
+ <tr>
+ <td>Launch Time</td>
+ <td>{state.startDate}</td>
+ </tr>
+ <tr>
+ <td>Finish Time</td>
+ <td>{state.finishDate.map(_.toString).getOrElse("")}</td>
+ </tr>
+ <tr>
+ <td>Last Task Status</td>
+ <td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td>
+ </tr>
+ }.getOrElse(Seq[Node]())
+ }
+
+ private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = {
+ properties.map { case (k, v) =>
+ <tr>
+ <td>{k}</td><td>{v}</td>
+ </tr>
+ }.toSeq
+ }
+
+ private def commandRow(command: Command): Seq[Node] = {
+ <tr>
+ <td>Main class</td><td>{command.mainClass}</td>
+ </tr>
+ <tr>
+ <td>Arguments</td><td>{command.arguments.mkString(" ")}</td>
+ </tr>
+ <tr>
+ <td>Class path entries</td><td>{command.classPathEntries.mkString(" ")}</td>
+ </tr>
+ <tr>
+ <td>Java options</td><td>{command.javaOpts.mkString((" "))}</td>
+ </tr>
+ <tr>
+ <td>Library path entries</td><td>{command.libraryPathEntries.mkString((" "))}</td>
+ </tr>
+ }
+
+ private def driverRow(driver: MesosDriverDescription): Seq[Node] = {
+ <tr>
+ <td>Name</td><td>{driver.name}</td>
+ </tr>
+ <tr>
+ <td>Id</td><td>{driver.submissionId}</td>
+ </tr>
+ <tr>
+ <td>Cores</td><td>{driver.cores}</td>
+ </tr>
+ <tr>
+ <td>Memory</td><td>{driver.mem}</td>
+ </tr>
+ <tr>
+ <td>Submitted</td><td>{driver.submissionDate}</td>
+ </tr>
+ <tr>
+ <td>Supervise</td><td>{driver.supervise}</td>
+ </tr>
+ }
+
+ private def retryRow(retryState: Option[MesosClusterRetryState]): Seq[Node] = {
+ retryState.map { state =>
+ <tr>
+ <td>
+ {state.lastFailureStatus}
+ </td>
+ <td>
+ {state.nextRetry}
+ </td>
+ <td>
+ {state.retries}
+ </td>
+ </tr>
+ }.getOrElse(Seq[Node]())
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
index 7b2005e0f1..7419fa9699 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
@@ -56,8 +56,9 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
}
private def queuedRow(submission: MesosDriverDescription): Seq[Node] = {
+ val id = submission.submissionId
<tr>
- <td>{submission.submissionId}</td>
+ <td><a href={s"driver?id=$id"}>{id}</a></td>
<td>{submission.submissionDate}</td>
<td>{submission.command.mainClass}</td>
<td>cpus: {submission.cores}, mem: {submission.mem}</td>
@@ -65,8 +66,9 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
}
private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
+ val id = state.driverDescription.submissionId
<tr>
- <td>{state.driverDescription.submissionId}</td>
+ <td><a href={s"driver?id=$id"}>{id}</a></td>
<td>{state.driverDescription.submissionDate}</td>
<td>{state.driverDescription.command.mainClass}</td>
<td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td>
@@ -77,8 +79,9 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
}
private def retryRow(submission: MesosDriverDescription): Seq[Node] = {
+ val id = submission.submissionId
<tr>
- <td>{submission.submissionId}</td>
+ <td><a href={s"driver?id=$id"}>{id}</a></td>
<td>{submission.submissionDate}</td>
<td>{submission.command.mainClass}</td>
<td>{submission.retryState.get.lastFailureStatus}</td>
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
index 4865d46dbc..3f693545a0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
@@ -39,6 +39,7 @@ private[spark] class MesosClusterUI(
override def initialize() {
attachPage(new MesosClusterPage(this))
+ attachPage(new DriverPage(this))
attachHandler(createStaticHandler(MesosClusterUI.STATIC_RESOURCE_DIR, "/static"))
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
index fd17a980c9..8198296eeb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -53,7 +53,7 @@ private[spark] class MesosRestServer(
new MesosStatusRequestServlet(scheduler, masterConf)
}
-private[deploy] class MesosSubmitRequestServlet(
+private[mesos] class MesosSubmitRequestServlet(
scheduler: MesosClusterScheduler,
conf: SparkConf)
extends SubmitRequestServlet {
@@ -139,7 +139,7 @@ private[deploy] class MesosSubmitRequestServlet(
}
}
-private[deploy] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
+private[mesos] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
extends KillRequestServlet {
protected override def handleKill(submissionId: String): KillSubmissionResponse = {
val k = scheduler.killDriver(submissionId)
@@ -148,7 +148,7 @@ private[deploy] class MesosKillRequestServlet(scheduler: MesosClusterScheduler,
}
}
-private[deploy] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
+private[mesos] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
extends StatusRequestServlet {
protected override def handleStatus(submissionId: String): SubmissionStatusResponse = {
val d = scheduler.getDriverStatus(submissionId)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 0396e62be5..06f0e2881c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -50,12 +50,13 @@ private[spark] class MesosClusterSubmissionState(
val taskId: TaskID,
val slaveId: SlaveID,
var mesosTaskStatus: Option[TaskStatus],
- var startDate: Date)
+ var startDate: Date,
+ var finishDate: Option[Date])
extends Serializable {
def copy(): MesosClusterSubmissionState = {
new MesosClusterSubmissionState(
- driverDescription, taskId, slaveId, mesosTaskStatus, startDate)
+ driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate)
}
}
@@ -96,6 +97,14 @@ private[spark] class MesosClusterSchedulerState(
val pendingRetryDrivers: Iterable[MesosDriverDescription])
/**
+ * The full state of a Mesos driver, that is being used to display driver information on the UI.
+ */
+private[spark] class MesosDriverState(
+ val state: String,
+ val description: MesosDriverDescription,
+ val submissionState: Option[MesosClusterSubmissionState] = None)
+
+/**
* A Mesos scheduler that is responsible for launching submitted Spark drivers in cluster mode
* as Mesos tasks in a Mesos cluster.
* All drivers are launched asynchronously by the framework, which will eventually be launched
@@ -233,6 +242,22 @@ private[spark] class MesosClusterScheduler(
s
}
+ /**
+ * Gets the driver state to be displayed on the Web UI.
+ */
+ def getDriverState(submissionId: String): Option[MesosDriverState] = {
+ stateLock.synchronized {
+ queuedDrivers.find(_.submissionId.equals(submissionId))
+ .map(d => new MesosDriverState("QUEUED", d))
+ .orElse(launchedDrivers.get(submissionId)
+ .map(d => new MesosDriverState("RUNNING", d.driverDescription, Some(d))))
+ .orElse(finishedDrivers.find(_.driverDescription.submissionId.equals(submissionId))
+ .map(d => new MesosDriverState("FINISHED", d.driverDescription, Some(d))))
+ .orElse(pendingRetryDrivers.find(_.submissionId.equals(submissionId))
+ .map(d => new MesosDriverState("RETRYING", d)))
+ }
+ }
+
private def isQueueFull(): Boolean = launchedDrivers.size >= queuedCapacity
/**
@@ -439,7 +464,7 @@ private[spark] class MesosClusterScheduler(
logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
submission.submissionId)
val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId,
- None, new Date())
+ None, new Date(), None)
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)
afterLaunchCallback(submission.submissionId)
@@ -534,6 +559,7 @@ private[spark] class MesosClusterScheduler(
// Check if the driver is supervise enabled and can be relaunched.
if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
removeFromLaunchedDrivers(taskId)
+ state.finishDate = Some(new Date())
val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
val (retries, waitTimeSec) = retryState
.map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) }
@@ -546,6 +572,7 @@ private[spark] class MesosClusterScheduler(
pendingRetryDriversState.persist(taskId, newDriverDescription)
} else if (TaskState.isFinished(TaskState.fromMesos(status.getState))) {
removeFromLaunchedDrivers(taskId)
+ state.finishDate = Some(new Date())
if (finishedDrivers.size >= retainedDrivers) {
val toRemove = math.max(retainedDrivers / 10, 1)
finishedDrivers.trimStart(toRemove)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 8346a24074..86a7d0fb58 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -23,7 +23,7 @@ import java.util.{ArrayList => JArrayList, Collections, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
-import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
+import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.spark.executor.MesosExecutorBackend
@@ -56,7 +56,7 @@ private[spark] class MesosSchedulerBackend(
// The listener bus to publish executor added/removed events.
val listenerBus = sc.listenerBus
-
+
private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1)
@volatile var appId: String = _