aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Gummelt <mgummelt@mesosphere.io>2016-08-09 10:55:33 +0100
committerSean Owen <sowen@cloudera.com>2016-08-09 10:55:33 +0100
commit62e62124419f3fa07b324f5e42feb2c5b4fde715 (patch)
treea9ad17839a122ab0228c557d432d3ec8b073db89
parent2154345b6a1cb193b1380ab386912e478928c6b2 (diff)
downloadspark-62e62124419f3fa07b324f5e42feb2c5b4fde715.tar.gz
spark-62e62124419f3fa07b324f5e42feb2c5b4fde715.tar.bz2
spark-62e62124419f3fa07b324f5e42feb2c5b4fde715.zip
[SPARK-16809] enable history server links in dispatcher UI
## What changes were proposed in this pull request? Links the Spark Mesos Dispatcher UI to the history server UI - adds spark.mesos.dispatcher.historyServer.url - explicitly generates frameworkIDs for the launched drivers, so the dispatcher knows how to correlate drivers and frameworkIDs ## How was this patch tested? manual testing Author: Michael Gummelt <mgummelt@mesosphere.io> Author: Sergiusz Urbaniak <sur@mesosphere.io> Closes #14414 from mgummelt/history-server.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala11
-rw-r--r--docs/running-on-mesos.md10
7 files changed, 75 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
index 166f666fbc..8dcbdaad86 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
@@ -28,10 +28,17 @@ import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState
import org.apache.spark.ui.{UIUtils, WebUIPage}
private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") {
+ private val historyServerURL = parent.conf.getOption("spark.mesos.dispatcher.historyServer.url")
+
def render(request: HttpServletRequest): Seq[Node] = {
val state = parent.scheduler.getSchedulerState()
- val queuedHeaders = Seq("Driver ID", "Submit Date", "Main Class", "Driver Resources")
- val driverHeaders = queuedHeaders ++
+
+ val driverHeader = Seq("Driver ID")
+ val historyHeader = historyServerURL.map(url => Seq("History")).getOrElse(Nil)
+ val submissionHeader = Seq("Submit Date", "Main Class", "Driver Resources")
+
+ val queuedHeaders = driverHeader ++ submissionHeader
+ val driverHeaders = driverHeader ++ historyHeader ++ submissionHeader ++
Seq("Start Date", "Mesos Slave ID", "State")
val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++
Seq("Last Failed Status", "Next Retry Time", "Attempt Count")
@@ -68,8 +75,18 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
val id = state.driverDescription.submissionId
+
+ val historyCol = if (historyServerURL.isDefined) {
+ <td>
+ <a href={s"${historyServerURL.get}/history/${state.frameworkId}"}>
+ {state.frameworkId}
+ </a>
+ </td>
+ } else Nil
+
<tr>
<td><a href={s"driver?id=$id"}>{id}</a></td>
+ {historyCol}
<td>{state.driverDescription.submissionDate}</td>
<td>{state.driverDescription.command.mainClass}</td>
<td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td>
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
index baad098a0c..604978967d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
@@ -28,7 +28,7 @@ import org.apache.spark.ui.JettyUtils._
private[spark] class MesosClusterUI(
securityManager: SecurityManager,
port: Int,
- conf: SparkConf,
+ val conf: SparkConf,
dispatcherPublicAddress: String,
val scheduler: MesosClusterScheduler)
extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index ae531e1997..2189fca67a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -43,6 +43,8 @@ import org.apache.spark.util.Utils
* @param slaveId Slave ID that the task is assigned to
* @param mesosTaskStatus The last known task status update.
* @param startDate The date the task was launched
+ * @param finishDate The date the task finished
+ * @param frameworkId Mesos framework ID the task registers with
*/
private[spark] class MesosClusterSubmissionState(
val driverDescription: MesosDriverDescription,
@@ -50,12 +52,13 @@ private[spark] class MesosClusterSubmissionState(
val slaveId: SlaveID,
var mesosTaskStatus: Option[TaskStatus],
var startDate: Date,
- var finishDate: Option[Date])
+ var finishDate: Option[Date],
+ val frameworkId: String)
extends Serializable {
def copy(): MesosClusterSubmissionState = {
new MesosClusterSubmissionState(
- driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate)
+ driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate, frameworkId)
}
}
@@ -63,6 +66,7 @@ private[spark] class MesosClusterSubmissionState(
* Tracks the retry state of a driver, which includes the next time it should be scheduled
* and necessary information to do exponential backoff.
* This class is not thread-safe, and we expect the caller to handle synchronizing state.
+ *
* @param lastFailureStatus Last Task status when it failed.
* @param retries Number of times it has been retried.
* @param nextRetry Time at which it should be retried next
@@ -80,6 +84,7 @@ private[spark] class MesosClusterRetryState(
/**
* The full state of the cluster scheduler, currently being used for displaying
* information on the UI.
+ *
* @param frameworkId Mesos Framework id for the cluster scheduler.
* @param masterUrl The Mesos master url
* @param queuedDrivers All drivers queued to be launched
@@ -355,7 +360,15 @@ private[spark] class MesosClusterScheduler(
private def getDriverExecutorURI(desc: MesosDriverDescription): Option[String] = {
desc.conf.getOption("spark.executor.uri")
- .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
+ .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
+ }
+
+ private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
+ m.updated(k, f(m.getOrElse(k, default)))
+ }
+
+ private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
+ s"${frameworkId}-${desc.submissionId}"
}
private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
@@ -364,7 +377,11 @@ private[spark] class MesosClusterScheduler(
val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts)
val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.")
- driverEnv ++ executorEnv ++ desc.command.environment
+ var commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
+ v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
+ )
+
+ driverEnv ++ executorEnv ++ commandEnv
}
val envBuilder = Environment.newBuilder()
@@ -552,7 +569,7 @@ private[spark] class MesosClusterScheduler(
logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
submission.submissionId)
val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
- None, new Date(), None)
+ None, new Date(), None, getDriverFrameworkID(submission))
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)
afterLaunchCallback(submission.submissionId)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 5177557132..0933a03a0f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -152,8 +152,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.sparkUser,
sc.appName,
sc.conf,
- sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress))
+ sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)),
+ None,
+ None,
+ sc.conf.getOption("spark.mesos.driver.frameworkId")
)
+
+ unsetFrameworkID(sc)
startScheduler(driver)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index d8d661da31..f1e48fa7c5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -77,8 +77,13 @@ private[spark] class MesosFineGrainedSchedulerBackend(
sc.sparkUser,
sc.appName,
sc.conf,
- sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress))
+ sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)),
+ Option.empty,
+ Option.empty,
+ sc.conf.getOption("spark.mesos.driver.frameworkId")
)
+
+ unsetFrameworkID(sc)
startScheduler(driver)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index cd4b45f8de..81db789166 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -357,4 +357,15 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
}
+ /**
+ * spark.mesos.driver.frameworkId is set by the cluster dispatcher to correlate driver
+ * submissions with frameworkIDs. However, this causes issues when a driver process launches
+ * more than one framework (more than one SparkContext(, because they all try to register with
+ * the same frameworkID. To enforce that only the first driver registers with the configured
+ * framework ID, the driver calls this method after the first registration.
+ */
+ def unsetFrameworkID(sc: SparkContext) {
+ sc.conf.remove("spark.mesos.driver.frameworkId")
+ System.clearProperty("spark.mesos.driver.frameworkId")
+ }
}
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index d037e7be0a..613da68531 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -468,6 +468,16 @@ See the [configuration page](configuration.html) for information on Spark config
If unset it will point to Spark's internal web UI.
</td>
</tr>
+<tr>
+ <td><code>spark.mesos.dispatcher.historyServer.url</code></td>
+ <td><code>(none)</code></td>
+ <td>
+ Set the URL of the <a href="http://spark.apache.org/docs/latest/monitoring.html#viewing-after-the-fact">history
+ server</a>. The dispatcher will then link each driver to its entry
+ in the history server.
+ </td>
+</tr>
+
</table>
# Troubleshooting and Debugging