From 62e62124419f3fa07b324f5e42feb2c5b4fde715 Mon Sep 17 00:00:00 2001 From: Michael Gummelt Date: Tue, 9 Aug 2016 10:55:33 +0100 Subject: [SPARK-16809] enable history server links in dispatcher UI ## What changes were proposed in this pull request? Links the Spark Mesos Dispatcher UI to the history server UI - adds spark.mesos.dispatcher.historyServer.url - explicitly generates frameworkIDs for the launched drivers, so the dispatcher knows how to correlate drivers and frameworkIDs ## How was this patch tested? manual testing Author: Michael Gummelt Author: Sergiusz Urbaniak Closes #14414 from mgummelt/history-server. --- .../spark/deploy/mesos/ui/MesosClusterPage.scala | 21 +++++++++++++++-- .../spark/deploy/mesos/ui/MesosClusterUI.scala | 2 +- .../cluster/mesos/MesosClusterScheduler.scala | 27 ++++++++++++++++++---- .../mesos/MesosCoarseGrainedSchedulerBackend.scala | 7 +++++- .../mesos/MesosFineGrainedSchedulerBackend.scala | 7 +++++- .../cluster/mesos/MesosSchedulerUtils.scala | 11 +++++++++ docs/running-on-mesos.md | 10 ++++++++ 7 files changed, 75 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala index 166f666fbc..8dcbdaad86 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala @@ -28,10 +28,17 @@ import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState import org.apache.spark.ui.{UIUtils, WebUIPage} private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") { + private val historyServerURL = parent.conf.getOption("spark.mesos.dispatcher.historyServer.url") + def render(request: HttpServletRequest): Seq[Node] = { val state = parent.scheduler.getSchedulerState() - val queuedHeaders = Seq("Driver ID", "Submit Date", "Main Class", "Driver Resources") - val driverHeaders = queuedHeaders ++ + + val driverHeader = Seq("Driver ID") + val historyHeader = historyServerURL.map(url => Seq("History")).getOrElse(Nil) + val submissionHeader = Seq("Submit Date", "Main Class", "Driver Resources") + + val queuedHeaders = driverHeader ++ submissionHeader + val driverHeaders = driverHeader ++ historyHeader ++ submissionHeader ++ Seq("Start Date", "Mesos Slave ID", "State") val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++ Seq("Last Failed Status", "Next Retry Time", "Attempt Count") @@ -68,8 +75,18 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage( private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = { val id = state.driverDescription.submissionId + + val historyCol = if (historyServerURL.isDefined) { + + + {state.frameworkId} + + + } else Nil + {id} + {historyCol} {state.driverDescription.submissionDate} {state.driverDescription.command.mainClass} cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem} diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala index baad098a0c..604978967d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala @@ -28,7 +28,7 @@ import org.apache.spark.ui.JettyUtils._ private[spark] class MesosClusterUI( securityManager: SecurityManager, port: Int, - conf: SparkConf, + val conf: SparkConf, dispatcherPublicAddress: String, val scheduler: MesosClusterScheduler) extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index ae531e1997..2189fca67a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -43,6 +43,8 @@ import org.apache.spark.util.Utils * @param slaveId Slave ID that the task is assigned to * @param mesosTaskStatus The last known task status update. * @param startDate The date the task was launched + * @param finishDate The date the task finished + * @param frameworkId Mesos framework ID the task registers with */ private[spark] class MesosClusterSubmissionState( val driverDescription: MesosDriverDescription, @@ -50,12 +52,13 @@ private[spark] class MesosClusterSubmissionState( val slaveId: SlaveID, var mesosTaskStatus: Option[TaskStatus], var startDate: Date, - var finishDate: Option[Date]) + var finishDate: Option[Date], + val frameworkId: String) extends Serializable { def copy(): MesosClusterSubmissionState = { new MesosClusterSubmissionState( - driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate) + driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate, frameworkId) } } @@ -63,6 +66,7 @@ private[spark] class MesosClusterSubmissionState( * Tracks the retry state of a driver, which includes the next time it should be scheduled * and necessary information to do exponential backoff. * This class is not thread-safe, and we expect the caller to handle synchronizing state. + * * @param lastFailureStatus Last Task status when it failed. * @param retries Number of times it has been retried. * @param nextRetry Time at which it should be retried next @@ -80,6 +84,7 @@ private[spark] class MesosClusterRetryState( /** * The full state of the cluster scheduler, currently being used for displaying * information on the UI. + * * @param frameworkId Mesos Framework id for the cluster scheduler. * @param masterUrl The Mesos master url * @param queuedDrivers All drivers queued to be launched @@ -355,7 +360,15 @@ private[spark] class MesosClusterScheduler( private def getDriverExecutorURI(desc: MesosDriverDescription): Option[String] = { desc.conf.getOption("spark.executor.uri") - .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI")) + .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI")) + } + + private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = { + m.updated(k, f(m.getOrElse(k, default))) + } + + private def getDriverFrameworkID(desc: MesosDriverDescription): String = { + s"${frameworkId}-${desc.submissionId}" } private def getDriverEnvironment(desc: MesosDriverDescription): Environment = { @@ -364,7 +377,11 @@ private[spark] class MesosClusterScheduler( val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts) val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") - driverEnv ++ executorEnv ++ desc.command.environment + var commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")( + v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}" + ) + + driverEnv ++ executorEnv ++ commandEnv } val envBuilder = Environment.newBuilder() @@ -552,7 +569,7 @@ private[spark] class MesosClusterScheduler( logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " + submission.submissionId) val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId, - None, new Date(), None) + None, new Date(), None, getDriverFrameworkID(submission)) launchedDrivers(submission.submissionId) = newState launchedDriversState.persist(submission.submissionId, newState) afterLaunchCallback(submission.submissionId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 5177557132..0933a03a0f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -152,8 +152,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.sparkUser, sc.appName, sc.conf, - sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)) + sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)), + None, + None, + sc.conf.getOption("spark.mesos.driver.frameworkId") ) + + unsetFrameworkID(sc) startScheduler(driver) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index d8d661da31..f1e48fa7c5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -77,8 +77,13 @@ private[spark] class MesosFineGrainedSchedulerBackend( sc.sparkUser, sc.appName, sc.conf, - sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)) + sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)), + Option.empty, + Option.empty, + sc.conf.getOption("spark.mesos.driver.frameworkId") ) + + unsetFrameworkID(sc) startScheduler(driver) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index cd4b45f8de..81db789166 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -357,4 +357,15 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") } + /** + * spark.mesos.driver.frameworkId is set by the cluster dispatcher to correlate driver + * submissions with frameworkIDs. However, this causes issues when a driver process launches + * more than one framework (more than one SparkContext(, because they all try to register with + * the same frameworkID. To enforce that only the first driver registers with the configured + * framework ID, the driver calls this method after the first registration. + */ + def unsetFrameworkID(sc: SparkContext) { + sc.conf.remove("spark.mesos.driver.frameworkId") + System.clearProperty("spark.mesos.driver.frameworkId") + } } diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index d037e7be0a..613da68531 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -468,6 +468,16 @@ See the [configuration page](configuration.html) for information on Spark config If unset it will point to Spark's internal web UI. + + spark.mesos.dispatcher.historyServer.url + (none) + + Set the URL of the history + server. The dispatcher will then link each driver to its entry + in the history server. + + + # Troubleshooting and Debugging -- cgit v1.2.3