aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorAla Luszczak <ala@databricks.com>2017-02-10 21:10:02 +0100
committerReynold Xin <rxin@databricks.com>2017-02-10 21:10:02 +0100
commitd785217b791882e075ad537852d49d78fc1ca31b (patch)
treefbfb20bdc0098918327080e7dca2d88feedd6d7f /core/src
parent3a43ae7c0bbce8eda98f50a97a0138f860197a98 (diff)
downloadspark-d785217b791882e075ad537852d49d78fc1ca31b.tar.gz
spark-d785217b791882e075ad537852d49d78fc1ca31b.tar.bz2
spark-d785217b791882e075ad537852d49d78fc1ca31b.zip
[SPARK-19549] Allow providing reason for stage/job cancelling
## What changes were proposed in this pull request? This change add an optional argument to `SparkContext.cancelStage()` and `SparkContext.cancelJob()` functions, which allows the caller to provide exact reason for the cancellation. ## How was this patch tested? Adds unit test. Author: Ala Luszczak <ala@databricks.com> Closes #16887 from ala/cancel.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala69
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala2
6 files changed, 123 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index eb13686f26..cbab7b8844 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2207,10 +2207,32 @@ class SparkContext(config: SparkConf) extends Logging {
* Cancel a given job if it's scheduled or running.
*
* @param jobId the job ID to cancel
+ * @param reason optional reason for cancellation
* @note Throws `InterruptedException` if the cancel message cannot be sent
*/
- def cancelJob(jobId: Int) {
- dagScheduler.cancelJob(jobId)
+ def cancelJob(jobId: Int, reason: String): Unit = {
+ dagScheduler.cancelJob(jobId, Option(reason))
+ }
+
+ /**
+ * Cancel a given job if it's scheduled or running.
+ *
+ * @param jobId the job ID to cancel
+ * @note Throws `InterruptedException` if the cancel message cannot be sent
+ */
+ def cancelJob(jobId: Int): Unit = {
+ dagScheduler.cancelJob(jobId, None)
+ }
+
+ /**
+ * Cancel a given stage and all jobs associated with it.
+ *
+ * @param stageId the stage ID to cancel
+ * @param reason reason for cancellation
+ * @note Throws `InterruptedException` if the cancel message cannot be sent
+ */
+ def cancelStage(stageId: Int, reason: String): Unit = {
+ dagScheduler.cancelStage(stageId, Option(reason))
}
/**
@@ -2219,8 +2241,8 @@ class SparkContext(config: SparkConf) extends Logging {
* @param stageId the stage ID to cancel
* @note Throws `InterruptedException` if the cancel message cannot be sent
*/
- def cancelStage(stageId: Int) {
- dagScheduler.cancelStage(stageId)
+ def cancelStage(stageId: Int): Unit = {
+ dagScheduler.cancelStage(stageId, None)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 6177bafc11..b9d7e1328d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -696,9 +696,9 @@ class DAGScheduler(
/**
* Cancel a job that is running or waiting in the queue.
*/
- def cancelJob(jobId: Int): Unit = {
+ def cancelJob(jobId: Int, reason: Option[String]): Unit = {
logInfo("Asked to cancel job " + jobId)
- eventProcessLoop.post(JobCancelled(jobId))
+ eventProcessLoop.post(JobCancelled(jobId, reason))
}
/**
@@ -719,7 +719,7 @@ class DAGScheduler(
private[scheduler] def doCancelAllJobs() {
// Cancel all running jobs.
runningStages.map(_.firstJobId).foreach(handleJobCancellation(_,
- reason = "as part of cancellation of all jobs"))
+ Option("as part of cancellation of all jobs")))
activeJobs.clear() // These should already be empty by this point,
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
}
@@ -727,8 +727,8 @@ class DAGScheduler(
/**
* Cancel all jobs associated with a running or scheduled stage.
*/
- def cancelStage(stageId: Int) {
- eventProcessLoop.post(StageCancelled(stageId))
+ def cancelStage(stageId: Int, reason: Option[String]) {
+ eventProcessLoop.post(StageCancelled(stageId, reason))
}
/**
@@ -785,7 +785,8 @@ class DAGScheduler(
}
}
val jobIds = activeInGroup.map(_.jobId)
- jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
+ jobIds.foreach(handleJobCancellation(_,
+ Option("part of cancelled job group %s".format(groupId))))
}
private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
@@ -1377,24 +1378,30 @@ class DAGScheduler(
}
}
- private[scheduler] def handleStageCancellation(stageId: Int) {
+ private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]) {
stageIdToStage.get(stageId) match {
case Some(stage) =>
val jobsThatUseStage: Array[Int] = stage.jobIds.toArray
jobsThatUseStage.foreach { jobId =>
- handleJobCancellation(jobId, s"because Stage $stageId was cancelled")
+ val reasonStr = reason match {
+ case Some(originalReason) =>
+ s"because $originalReason"
+ case None =>
+ s"because Stage $stageId was cancelled"
+ }
+ handleJobCancellation(jobId, Option(reasonStr))
}
case None =>
logInfo("No active jobs to kill for Stage " + stageId)
}
}
- private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") {
+ private[scheduler] def handleJobCancellation(jobId: Int, reason: Option[String]) {
if (!jobIdToStageIds.contains(jobId)) {
logDebug("Trying to cancel unregistered job " + jobId)
} else {
failJobAndIndependentStages(
- jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason))
+ jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason.getOrElse("")))
}
}
@@ -1636,11 +1643,11 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
- case StageCancelled(stageId) =>
- dagScheduler.handleStageCancellation(stageId)
+ case StageCancelled(stageId, reason) =>
+ dagScheduler.handleStageCancellation(stageId, reason)
- case JobCancelled(jobId) =>
- dagScheduler.handleJobCancellation(jobId)
+ case JobCancelled(jobId, reason) =>
+ dagScheduler.handleJobCancellation(jobId, reason)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 03781a2a2b..cda0585f15 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -53,9 +53,15 @@ private[scheduler] case class MapStageSubmitted(
properties: Properties = null)
extends DAGSchedulerEvent
-private[scheduler] case class StageCancelled(stageId: Int) extends DAGSchedulerEvent
+private[scheduler] case class StageCancelled(
+ stageId: Int,
+ reason: Option[String])
+ extends DAGSchedulerEvent
-private[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEvent
+private[scheduler] case class JobCancelled(
+ jobId: Int,
+ reason: Option[String])
+ extends DAGSchedulerEvent
private[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
index 9012289f04..65d7184231 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -50,7 +50,7 @@ private[spark] class JobWaiter[T](
* will fail this job with a SparkException.
*/
def cancel() {
- dagScheduler.cancelJob(jobId)
+ dagScheduler.cancelJob(jobId, None)
}
override def taskSucceeded(index: Int, result: Any): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 8ae5d2fe96..5a41e1c619 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -22,19 +22,21 @@ import java.net.MalformedURLException
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
+import scala.concurrent.duration._
import scala.concurrent.Await
-import scala.concurrent.duration.Duration
import com.google.common.io.Files
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
+import org.scalatest.concurrent.Eventually
import org.scalatest.Matchers._
-import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskStart}
import org.apache.spark.util.Utils
-class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
+
+class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventually {
test("Only one SparkContext may be active at a time") {
// Regression test for SPARK-4180
@@ -465,4 +467,65 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
assert(!sc.listenerBus.listeners.contains(sparkListener1))
assert(sc.listenerBus.listeners.contains(sparkListener2))
}
+
+ test("Cancelling stages/jobs with custom reasons.") {
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+ val REASON = "You shall not pass"
+
+ val listener = new SparkListener {
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+ if (SparkContextSuite.cancelStage) {
+ eventually(timeout(10.seconds)) {
+ assert(SparkContextSuite.isTaskStarted)
+ }
+ sc.cancelStage(taskStart.stageId, REASON)
+ SparkContextSuite.cancelStage = false
+ }
+ }
+
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ if (SparkContextSuite.cancelJob) {
+ eventually(timeout(10.seconds)) {
+ assert(SparkContextSuite.isTaskStarted)
+ }
+ sc.cancelJob(jobStart.jobId, REASON)
+ SparkContextSuite.cancelJob = false
+ }
+ }
+ }
+ sc.addSparkListener(listener)
+
+ for (cancelWhat <- Seq("stage", "job")) {
+ SparkContextSuite.isTaskStarted = false
+ SparkContextSuite.cancelStage = (cancelWhat == "stage")
+ SparkContextSuite.cancelJob = (cancelWhat == "job")
+
+ val ex = intercept[SparkException] {
+ sc.range(0, 10000L).mapPartitions { x =>
+ org.apache.spark.SparkContextSuite.isTaskStarted = true
+ x
+ }.cartesian(sc.range(0, 10L))count()
+ }
+
+ ex.getCause() match {
+ case null =>
+ assert(ex.getMessage().contains(REASON))
+ case cause: SparkException =>
+ assert(cause.getMessage().contains(REASON))
+ case cause: Throwable =>
+ fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.")
+ }
+
+ eventually(timeout(20.seconds)) {
+ assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
+ }
+ }
+ }
+
+}
+
+object SparkContextSuite {
+ @volatile var cancelJob = false
+ @volatile var cancelStage = false
+ @volatile var isTaskStarted = false
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index f3d3f701af..4e5f267e23 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -329,7 +329,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
/** Sends JobCancelled to the DAG scheduler. */
private def cancel(jobId: Int) {
- runEvent(JobCancelled(jobId))
+ runEvent(JobCancelled(jobId, None))
}
test("[SPARK-3353] parent stage should have lower stage id") {