aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala47
-rw-r--r--docs/configuration.md2
7 files changed, 104 insertions, 29 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index ef71db8979..f631a047a7 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -58,14 +58,13 @@ private[spark] class SparkUI private (
val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)
-
- val stagesTab = new StagesTab(this)
-
var appId: String = _
/** Initialize all components of the server. */
def initialize() {
- attachTab(new JobsTab(this))
+ val jobsTab = new JobsTab(this)
+ attachTab(jobsTab)
+ val stagesTab = new StagesTab(this)
attachTab(stagesTab)
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this))
@@ -73,7 +72,9 @@ private[spark] class SparkUI private (
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
attachHandler(ApiRootResource.getServletHandler(this))
- // This should be POST only, but, the YARN AM proxy won't proxy POSTs
+ // These should be POST only, but, the YARN AM proxy won't proxy POSTs
+ attachHandler(createRedirectHandler(
+ "/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST")))
attachHandler(createRedirectHandler(
"/stages/stage/kill", "/stages/", stagesTab.handleKillRequest,
httpMethods = Set("GET", "POST")))
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index f6713097b9..173fc3cf31 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -218,7 +218,8 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
request: HttpServletRequest,
tableHeaderId: String,
jobTag: String,
- jobs: Seq[JobUIData]): Seq[Node] = {
+ jobs: Seq[JobUIData],
+ killEnabled: Boolean): Seq[Node] = {
val allParameters = request.getParameterMap.asScala.toMap
val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag))
.map(para => para._1 + "=" + para._2(0))
@@ -264,6 +265,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
parameterOtherTable,
parent.jobProgresslistener.stageIdToInfo,
parent.jobProgresslistener.stageIdToData,
+ killEnabled,
currentTime,
jobIdTitle,
pageSize = jobPageSize,
@@ -290,9 +292,12 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
val completedJobs = listener.completedJobs.reverse.toSeq
val failedJobs = listener.failedJobs.reverse.toSeq
- val activeJobsTable = jobsTable(request, "active", "activeJob", activeJobs)
- val completedJobsTable = jobsTable(request, "completed", "completedJob", completedJobs)
- val failedJobsTable = jobsTable(request, "failed", "failedJob", failedJobs)
+ val activeJobsTable =
+ jobsTable(request, "active", "activeJob", activeJobs, killEnabled = parent.killEnabled)
+ val completedJobsTable =
+ jobsTable(request, "completed", "completedJob", completedJobs, killEnabled = false)
+ val failedJobsTable =
+ jobsTable(request, "failed", "failedJob", failedJobs, killEnabled = false)
val shouldShowActiveJobs = activeJobs.nonEmpty
val shouldShowCompletedJobs = completedJobs.nonEmpty
@@ -483,6 +488,7 @@ private[ui] class JobPagedTable(
parameterOtherTable: Iterable[String],
stageIdToInfo: HashMap[Int, StageInfo],
stageIdToData: HashMap[(Int, Int), StageUIData],
+ killEnabled: Boolean,
currentTime: Long,
jobIdTitle: String,
pageSize: Int,
@@ -586,12 +592,30 @@ private[ui] class JobPagedTable(
override def row(jobTableRow: JobTableRowData): Seq[Node] = {
val job = jobTableRow.jobData
+ val killLink = if (killEnabled) {
+ val confirm =
+ s"if (window.confirm('Are you sure you want to kill job ${job.jobId} ?')) " +
+ "{ this.parentNode.submit(); return true; } else { return false; }"
+ // SPARK-6846 this should be POST-only but YARN AM won't proxy POST
+ /*
+ val killLinkUri = s"$basePathUri/jobs/job/kill/"
+ <form action={killLinkUri} method="POST" style="display:inline">
+ <input type="hidden" name="id" value={job.jobId.toString}/>
+ <a href="#" onclick={confirm} class="kill-link">(kill)</a>
+ </form>
+ */
+ val killLinkUri = s"$basePath/jobs/job/kill/?id=${job.jobId}"
+ <a href={killLinkUri} onclick={confirm} class="kill-link">(kill)</a>
+ } else {
+ Seq.empty
+ }
+
<tr id={"job-" + job.jobId}>
<td>
{job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
</td>
<td>
- {jobTableRow.jobDescription}
+ {jobTableRow.jobDescription} {killLink}
<a href={jobTableRow.detailUrl} class="name-link">{jobTableRow.lastStageName}</a>
</td>
<td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
index 7b00b558d5..620c54c2dc 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
@@ -17,6 +17,8 @@
package org.apache.spark.ui.jobs
+import javax.servlet.http.HttpServletRequest
+
import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.ui.{SparkUI, SparkUITab}
@@ -35,4 +37,19 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
attachPage(new AllJobsPage(this))
attachPage(new JobPage(this))
+
+ def handleKillRequest(request: HttpServletRequest): Unit = {
+ if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
+ val jobId = Option(request.getParameter("id")).map(_.toInt)
+ jobId.foreach { id =>
+ if (jobProgresslistener.activeJobs.contains(id)) {
+ sc.foreach(_.cancelJob(id))
+ // Do a quick pause here to give Spark time to kill the job so it shows up as
+ // killed after the refresh. Note that this will block the serving thread so the
+ // time should be limited in duration.
+ Thread.sleep(100)
+ }
+ }
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 9b9b4681ba..c9d0431e2d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -353,12 +353,13 @@ private[ui] class StagePagedTable(
val killLinkUri = s"$basePathUri/stages/stage/kill/"
<form action={killLinkUri} method="POST" style="display:inline">
<input type="hidden" name="id" value={s.stageId.toString}/>
- <input type="hidden" name="terminate" value="true"/>
<a href="#" onclick={confirm} class="kill-link">(kill)</a>
</form>
*/
- val killLinkUri = s"$basePathUri/stages/stage/kill/?id=${s.stageId}&terminate=true"
+ val killLinkUri = s"$basePathUri/stages/stage/kill/?id=${s.stageId}"
<a href={killLinkUri} onclick={confirm} class="kill-link">(kill)</a>
+ } else {
+ Seq.empty
}
val nameLinkUri = s"$basePathUri/stages/stage?id=${s.stageId}&attempt=${s.attemptId}"
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
index 573192ac17..c1f2511437 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
@@ -39,15 +39,16 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages"
def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
- val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
- val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
- if (stageId >= 0 && killFlag && progressListener.activeStages.contains(stageId)) {
- sc.get.cancelStage(stageId)
+ val stageId = Option(request.getParameter("id")).map(_.toInt)
+ stageId.foreach { id =>
+ if (progressListener.activeStages.contains(id)) {
+ sc.foreach(_.cancelStage(id))
+ // Do a quick pause here to give Spark time to kill the stage so it shows up as
+ // killed after the refresh. Note that this will block the serving thread so the
+ // time should be limited in duration.
+ Thread.sleep(100)
+ }
}
- // Do a quick pause here to give Spark time to kill the stage so it shows up as
- // killed after the refresh. Note that this will block the serving thread so the
- // time should be limited in duration.
- Thread.sleep(100)
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index fd12a21b79..e5d408a167 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -197,6 +197,22 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
withSpark(newSparkContext(killEnabled = true)) { sc =>
runSlowJob(sc)
eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ goToUi(sc, "/jobs")
+ assert(hasKillLink)
+ }
+ }
+
+ withSpark(newSparkContext(killEnabled = false)) { sc =>
+ runSlowJob(sc)
+ eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ goToUi(sc, "/jobs")
+ assert(!hasKillLink)
+ }
+ }
+
+ withSpark(newSparkContext(killEnabled = true)) { sc =>
+ runSlowJob(sc)
+ eventually(timeout(5 seconds), interval(50 milliseconds)) {
goToUi(sc, "/stages")
assert(hasKillLink)
}
@@ -453,20 +469,24 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}
test("kill stage POST/GET response is correct") {
- def getResponseCode(url: URL, method: String): Int = {
- val connection = url.openConnection().asInstanceOf[HttpURLConnection]
- connection.setRequestMethod(method)
- connection.connect()
- val code = connection.getResponseCode()
- connection.disconnect()
- code
+ withSpark(newSparkContext(killEnabled = true)) { sc =>
+ sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
+ eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ val url = new URL(
+ sc.ui.get.appUIAddress.stripSuffix("/") + "/stages/stage/kill/?id=0")
+ // SPARK-6846: should be POST only but YARN AM doesn't proxy POST
+ getResponseCode(url, "GET") should be (200)
+ getResponseCode(url, "POST") should be (200)
+ }
}
+ }
+ test("kill job POST/GET response is correct") {
withSpark(newSparkContext(killEnabled = true)) { sc =>
sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
val url = new URL(
- sc.ui.get.appUIAddress.stripSuffix("/") + "/stages/stage/kill/?id=0&terminate=true")
+ sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/kill/?id=0")
// SPARK-6846: should be POST only but YARN AM doesn't proxy POST
getResponseCode(url, "GET") should be (200)
getResponseCode(url, "POST") should be (200)
@@ -651,6 +671,17 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}
}
+ def getResponseCode(url: URL, method: String): Int = {
+ val connection = url.openConnection().asInstanceOf[HttpURLConnection]
+ connection.setRequestMethod(method)
+ try {
+ connection.connect()
+ connection.getResponseCode()
+ } finally {
+ connection.disconnect()
+ }
+ }
+
def goToUi(sc: SparkContext, path: String): Unit = {
goToUi(sc.ui.get, path)
}
diff --git a/docs/configuration.md b/docs/configuration.md
index b07867d99a..6600cb6c0a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -632,7 +632,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.ui.killEnabled</code></td>
<td>true</td>
<td>
- Allows stages and corresponding jobs to be killed from the web ui.
+ Allows jobs and stages to be killed from the web UI.
</td>
</tr>
<tr>