aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSundeep Narravula <sundeepn@superduel.local>2014-04-10 17:10:11 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-10 17:10:11 -0700
commit2c557837b4a12c644cc37bd00d02be04f3807637 (patch)
tree32a1d0c76aa7cc1215d898f517849528fb571ecb /core
parentf99401a6308d5b9a9259d7597a35ba92f927aa50 (diff)
downloadspark-2c557837b4a12c644cc37bd00d02be04f3807637.tar.gz
spark-2c557837b4a12c644cc37bd00d02be04f3807637.tar.bz2
spark-2c557837b4a12c644cc37bd00d02be04f3807637.zip
SPARK-1202 - Add a "cancel" button in the UI for stages
Author: Sundeep Narravula <sundeepn@superduel.local> Author: Sundeep Narravula <sundeepn@dhcpx-204-110.corp.yahoo.com> Closes #246 from sundeepn/uikilljob and squashes the following commits: 5fdd0e2 [Sundeep Narravula] Fix test string f6fdff1 [Sundeep Narravula] Format fix; reduced line size to less than 100 chars d1daeb9 [Sundeep Narravula] Incorporating review comments. 8d97923 [Sundeep Narravula] Ability to kill jobs thru the UI. This behavior can be turned on be settings the following variable: spark.ui.killEnabled=true (default=false) Adding DAGScheduler event StageCancelled and corresponding handlers. Added cancellation reason to handlers.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala29
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala2
9 files changed, 80 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e6c9b7000d..3bcc8ce2b2 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1138,6 +1138,16 @@ class SparkContext(config: SparkConf) extends Logging {
dagScheduler.cancelAllJobs()
}
+ /** Cancel a given job if it's scheduled or running */
+ private[spark] def cancelJob(jobId: Int) {
+ dagScheduler.cancelJob(jobId)
+ }
+
+ /** Cancel a given stage and all jobs associated with it */
+ private[spark] def cancelStage(stageId: Int) {
+ dagScheduler.cancelStage(stageId)
+ }
+
/**
* Clean a closure to make it ready to serialized and send to tasks
* (removes unreferenced variables in $outer's, updates REPL variables)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c41d6d75a1..c6cbf14e20 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -512,6 +512,13 @@ class DAGScheduler(
}
/**
+ * Cancel all jobs associated with a running or scheduled stage.
+ */
+ def cancelStage(stageId: Int) {
+ eventProcessActor ! StageCancelled(stageId)
+ }
+
+ /**
* Process one event retrieved from the event processing actor.
*
* @param event The event to be processed.
@@ -551,6 +558,9 @@ class DAGScheduler(
submitStage(finalStage)
}
+ case StageCancelled(stageId) =>
+ handleStageCancellation(stageId)
+
case JobCancelled(jobId) =>
handleJobCancellation(jobId)
@@ -560,11 +570,13 @@ class DAGScheduler(
val activeInGroup = activeJobs.filter(activeJob =>
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
val jobIds = activeInGroup.map(_.jobId)
- jobIds.foreach(handleJobCancellation)
+ jobIds.foreach(jobId => handleJobCancellation(jobId,
+ "as part of cancelled job group %s".format(groupId)))
case AllJobsCancelled =>
// Cancel all running jobs.
- runningStages.map(_.jobId).foreach(handleJobCancellation)
+ runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId,
+ "as part of cancellation of all jobs"))
activeJobs.clear() // These should already be empty by this point,
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
@@ -991,11 +1003,23 @@ class DAGScheduler(
}
}
- private def handleJobCancellation(jobId: Int) {
+ private def handleStageCancellation(stageId: Int) {
+ if (stageIdToJobIds.contains(stageId)) {
+ val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray
+ jobsThatUseStage.foreach(jobId => {
+ handleJobCancellation(jobId, "because Stage %s was cancelled".format(stageId))
+ })
+ } else {
+ logInfo("No active jobs to kill for Stage " + stageId)
+ }
+ }
+
+ private def handleJobCancellation(jobId: Int, reason: String = "") {
if (!jobIdToStageIds.contains(jobId)) {
logDebug("Trying to cancel unregistered job " + jobId)
} else {
- failJobAndIndependentStages(jobIdToActiveJob(jobId), s"Job $jobId cancelled", None)
+ failJobAndIndependentStages(jobIdToActiveJob(jobId),
+ "Job %d cancelled %s".format(jobId, reason), None)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 293cfb6564..7367c08b5d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -44,6 +44,8 @@ private[scheduler] case class JobSubmitted(
properties: Properties = null)
extends DAGSchedulerEvent
+private[scheduler] case class StageCancelled(stageId: Int) extends DAGSchedulerEvent
+
private[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEvent
private[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index b8e6e15880..dac11ec1cf 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -46,6 +46,7 @@ private[spark] class SparkUI(
val live = sc != null
val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf)
+ val killEnabled = conf.getBoolean("spark.ui.killEnabled", true)
private val localHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
index f811aff616..5da5d1f2a3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
@@ -32,6 +32,7 @@ private[ui] class IndexPage(parent: JobProgressUI) {
private val sc = parent.sc
private lazy val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler
+ private val killEnabled = parent.killEnabled
private def appName = parent.appName
@@ -42,7 +43,18 @@ private[ui] class IndexPage(parent: JobProgressUI) {
val failedStages = listener.failedStages.reverse.toSeq
val now = System.currentTimeMillis()
- val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
+ if (killEnabled) {
+ val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
+ val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
+
+ if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
+ sc.cancelStage(stageId)
+ }
+ }
+
+
+ val activeStagesTable =
+ new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled)
val completedStagesTable =
new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
index ad1a12cdc4..9de659d6c7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
@@ -32,6 +32,7 @@ private[ui] class JobProgressUI(parent: SparkUI) {
val basePath = parent.basePath
val live = parent.live
val sc = parent.sc
+ val killEnabled = parent.killEnabled
lazy val listener = _listener.get
lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 0bcbd7461c..b6c3e3cf45 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.{Utils, Distribution}
private[ui] class StagePage(parent: JobProgressUI) {
private val basePath = parent.basePath
private lazy val listener = parent.listener
+ private lazy val sc = parent.sc
private def appName = parent.appName
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index ac61568af5..1e874ae496 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -27,7 +27,11 @@ import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.util.Utils
/** Page showing list of all ongoing and recently finished stages */
-private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
+private[ui] class StageTable(
+ stages: Seq[StageInfo],
+ parent: JobProgressUI,
+ killEnabled: Boolean = false) {
+
private val basePath = parent.basePath
private lazy val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler
@@ -71,15 +75,28 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
</div>
}
- /** Render an HTML row that represents a stage */
- private def stageRow(s: StageInfo): Seq[Node] = {
- val poolName = listener.stageIdToPool.get(s.stageId)
+ private def makeDescription(s: StageInfo): Seq[Node] = {
val nameLink =
<a href={"%s/stages/stage?id=%s".format(UIUtils.prependBaseUri(basePath), s.stageId)}>
{s.name}
</a>
+ val killLink = if (killEnabled) {
+ <div>[<a href=
+ {"%s/stages?id=%s&terminate=true".format(UIUtils.prependBaseUri(basePath), s.stageId)}>
+ Kill
+ </a>]</div>
+
+ }
val description = listener.stageIdToDescription.get(s.stageId)
- .map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
+ .map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
+ .getOrElse(<div>{nameLink} {killLink}</div>)
+
+ return description
+ }
+
+ /** Render an HTML row that represents a stage */
+ private def stageRow(s: StageInfo): Seq[Node] = {
+ val poolName = listener.stageIdToPool.get(s.stageId)
val submissionTime = s.submissionTime match {
case Some(t) => WebUI.formatDate(new Date(t))
case None => "Unknown"
@@ -118,7 +135,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
</a>
</td>
}}
- <td>{description}</td>
+ <td>{makeDescription(s)}</td>
<td valign="middle">{submissionTime}</td>
<td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
<td class="progress-cell">
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index a74724d785..db4df1d121 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -290,7 +290,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val rdd = makeRdd(1, Nil)
val jobId = submit(rdd, Array(0))
cancel(jobId)
- assert(failure.getMessage === s"Job $jobId cancelled")
+ assert(failure.getMessage === s"Job $jobId cancelled ")
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)