From 44f654eecd3c181f2aeaff3871acf7f00eacc6b9 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 10 Apr 2014 20:43:56 -0700 Subject: SPARK-1202: Improvements to task killing in the UI. 1. Adds a separate endpoint for the killing logic that is outside of a page. 2. Narrows the scope of the killingEnabled tracking. 3. Some style improvements. Author: Patrick Wendell Closes #386 from pwendell/kill-link and squashes the following commits: 8efe02b [Patrick Wendell] Improvements to task killing in the UI. --- .../main/resources/org/apache/spark/ui/static/webui.css | 9 +++++++++ .../src/main/scala/org/apache/spark/ui/JettyUtils.scala | 2 ++ core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 5 ++--- .../main/scala/org/apache/spark/ui/jobs/IndexPage.scala | 11 ----------- .../scala/org/apache/spark/ui/jobs/JobProgressUI.scala | 17 ++++++++++++++++- .../scala/org/apache/spark/ui/jobs/StageTable.scala | 16 +++++++++------- 6 files changed, 38 insertions(+), 22 deletions(-) (limited to 'core/src') diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css index fe54c34ffb..599c3ac9b5 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css @@ -78,3 +78,12 @@ table.sortable thead { background-repeat: repeat-x; filter: progid:dximagetransform.microsoft.gradient(startColorstr='#FFA4EDFF', endColorstr='#FF94DDFF', GradientType=0); } + +span.kill-link { + margin-right: 2px; + color: gray; +} + +span.kill-link a { + color: gray; +} diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 9ce0398d01..dd0818e8ab 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -104,10 +104,12 @@ private[spark] object JettyUtils extends Logging { def createRedirectHandler( srcPath: String, destPath: String, + beforeRedirect: HttpServletRequest => Unit = x => (), basePath: String = ""): ServletContextHandler = { val prefixedDestPath = attachPrefix(basePath, destPath) val servlet = new HttpServlet { override def doGet(request: HttpServletRequest, response: HttpServletResponse) { + beforeRedirect(request) // Make sure we don't end up with "//" in the middle val newUrl = new URL(new URL(request.getRequestURL.toString), prefixedDestPath).toString response.sendRedirect(newUrl) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index dac11ec1cf..4c891d73af 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -32,7 +32,7 @@ import org.apache.spark.util.Utils /** Top level user interface for Spark */ private[spark] class SparkUI( val sc: SparkContext, - conf: SparkConf, + val conf: SparkConf, val listenerBus: SparkListenerBus, var appName: String, val basePath: String = "") @@ -46,7 +46,6 @@ private[spark] class SparkUI( val live = sc != null val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf) - val killEnabled = conf.getBoolean("spark.ui.killEnabled", true) private val localHost = Utils.localHostName() private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost) @@ -70,7 +69,7 @@ private[spark] class SparkUI( metricsServletHandlers ++ Seq[ServletContextHandler] ( createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"), - createRedirectHandler("/", "/stages", basePath) + createRedirectHandler("/", "/stages", basePath = basePath) ) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 5da5d1f2a3..8619a31380 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -32,7 +32,6 @@ private[ui] class IndexPage(parent: JobProgressUI) { private val sc = parent.sc private lazy val listener = parent.listener private lazy val isFairScheduler = parent.isFairScheduler - private val killEnabled = parent.killEnabled private def appName = parent.appName @@ -43,16 +42,6 @@ private[ui] class IndexPage(parent: JobProgressUI) { val failedStages = listener.failedStages.reverse.toSeq val now = System.currentTimeMillis() - if (killEnabled) { - val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean - val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt - - if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) { - sc.cancelStage(stageId) - } - } - - val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled) val completedStagesTable = diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala index 9de659d6c7..30e3f35f21 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala @@ -32,7 +32,7 @@ private[ui] class JobProgressUI(parent: SparkUI) { val basePath = parent.basePath val live = parent.live val sc = parent.sc - val killEnabled = parent.killEnabled + val killEnabled = parent.conf.getBoolean("spark.ui.killEnabled", true) lazy val listener = _listener.get lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) @@ -51,7 +51,22 @@ private[ui] class JobProgressUI(parent: SparkUI) { def formatDuration(ms: Long) = Utils.msDurationToString(ms) + private def handleKillRequest(request: HttpServletRequest) = { + if (killEnabled) { + val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean + val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt + if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) { + sc.cancelStage(stageId) + } + // Do a quick pause here to give Spark time to kill the stage so it shows up as + // killed after the refresh. Note that this will block the serving thread so the + // time should be limited in duration. + Thread.sleep(100) + } + } + def getHandlers = Seq[ServletContextHandler]( + createRedirectHandler("/stages/stage/kill", "/stages", handleKillRequest), createServletHandler("/stages/stage", (request: HttpServletRequest) => stagePage.render(request), parent.securityManager, basePath), createServletHandler("/stages/pool", diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 1e874ae496..e419fae5a6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -76,20 +76,22 @@ private[ui] class StageTable( } private def makeDescription(s: StageInfo): Seq[Node] = { + // scalastyle:off + val killLink = if (killEnabled) { + + (kill) + + } + // scalastyle:on + val nameLink = {s.name} - val killLink = if (killEnabled) { -
[ - Kill - ]
- } val description = listener.stageIdToDescription.get(s.stageId) .map(d =>
{d}
{nameLink} {killLink}
) - .getOrElse(
{nameLink} {killLink}
) + .getOrElse(
{killLink}{nameLink}
) return description } -- cgit v1.2.3