aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-04-23 16:52:49 -0700
committerReynold Xin <rxin@apache.org>2014-04-23 16:52:49 -0700
commit432201c7ee9e1ea1d70a6418cbad1c5ad2653ed3 (patch)
tree83885c8515418ff00f9d20673bfe21f4dbe9daf9 /core
parentdd1b7a61d9193c93ab95ab550622259f4bc26f53 (diff)
downloadspark-432201c7ee9e1ea1d70a6418cbad1c5ad2653ed3.tar.gz
spark-432201c7ee9e1ea1d70a6418cbad1c5ad2653ed3.tar.bz2
spark-432201c7ee9e1ea1d70a6418cbad1c5ad2653ed3.zip
SPARK-1582 Invoke Thread.interrupt() when cancelling jobs
Sometimes executor threads are blocked waiting for IO or monitors, and the current implementation of job cancellation may never recover these threads. By simply invoking Thread.interrupt() during cancellation, we can often safely unblock the threads and use them for subsequent work. Note that this feature must remain optional for now because of a bug in HDFS where Thread.interrupt() may cause nodes to be marked as permanently dead (as the InterruptedException is reinterpreted as an IOException during communication with some node). Author: Aaron Davidson <aaron@databricks.com> Closes #498 from aarondav/cancel and squashes the following commits: e52b829 [Aaron Davidson] Don't use job.properties when null 82f78bb [Aaron Davidson] Update DAGSchedulerSuite b67f472 [Aaron Davidson] Add comment on why interruptOnCancel is in setJobGroup 4cb9fd6 [Aaron Davidson] SPARK-1582 Invoke Thread.interrupt() when cancelling jobs
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala2
15 files changed, 57 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c14dce8273..dcb6b6824b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -381,16 +381,27 @@ class SparkContext(config: SparkConf) extends Logging {
* // In a separate thread:
* sc.cancelJobGroup("some_job_to_cancel")
* }}}
+ *
+ * If interruptOnCancel is set to true for the job group, then job cancellation will result
+ * in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
+ * that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
+ * where HDFS may respond to Thread.interrupt() by marking nodes as dead.
*/
- def setJobGroup(groupId: String, description: String) {
+ def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description)
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
+ // Note: Specifying interruptOnCancel in setJobGroup (rather than cancelJobGroup) avoids
+ // changing several public APIs and allows Spark cancellations outside of the cancelJobGroup
+ // APIs to also take advantage of this property (e.g., internal job failures or canceling from
+ // JobProgressTab UI) on a per-job basis.
+ setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, interruptOnCancel.toString)
}
/** Clear the current thread's job group ID and its description. */
def clearJobGroup() {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null)
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null)
+ setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
}
// Post init
@@ -1244,6 +1255,8 @@ object SparkContext extends Logging {
private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
+ private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"
+
private[spark] val SPARK_UNKNOWN_USER = "<unknown>"
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 8a843fbb0e..0d71fdbb03 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -169,7 +169,6 @@ private[spark] class PythonRDD[T: ClassTag](
val update = new Array[Byte](updateLen)
stream.readFully(update)
accumulator += Collections.singletonList(update)
-
}
Array.empty[Byte]
}
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 6327ac0166..9ac7365f47 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -69,12 +69,12 @@ private[spark] class CoarseGrainedExecutorBackend(
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}
- case KillTask(taskId, _) =>
+ case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
logError("Received KillTask command but executor was null")
System.exit(1)
} else {
- executor.killTask(taskId)
+ executor.killTask(taskId, interruptThread)
}
case x: DisassociatedEvent =>
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 2bfb9c387e..914bc205ce 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -136,10 +136,10 @@ private[spark] class Executor(
threadPool.execute(tr)
}
- def killTask(taskId: Long) {
+ def killTask(taskId: Long, interruptThread: Boolean) {
val tr = runningTasks.get(taskId)
if (tr != null) {
- tr.kill()
+ tr.kill(interruptThread)
}
}
@@ -166,11 +166,11 @@ private[spark] class Executor(
@volatile private var killed = false
@volatile private var task: Task[Any] = _
- def kill() {
+ def kill(interruptThread: Boolean) {
logInfo("Executor is trying to kill task " + taskId)
killed = true
if (task != null) {
- task.kill()
+ task.kill(interruptThread)
}
}
@@ -257,7 +257,7 @@ private[spark] class Executor(
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
}
- case TaskKilledException => {
+ case TaskKilledException | _: InterruptedException if task.killed => {
logInfo("Executor killed task " + taskId)
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
}
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index 6fc702fdb1..64e24506e8 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -76,7 +76,8 @@ private[spark] class MesosExecutorBackend
if (executor == null) {
logError("Received KillTask but executor was null")
} else {
- executor.killTask(t.getValue.toLong)
+ // TODO: Determine the 'interruptOnCancel' property set for the given job.
+ executor.killTask(t.getValue.toLong, interruptThread = false)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c6cbf14e20..dbde9b591d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1055,6 +1055,10 @@ class DAGScheduler(
val error = new SparkException(failureReason)
job.listener.jobFailed(error)
+ val shouldInterruptThread =
+ if (job.properties == null) false
+ else job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean
+
// Cancel all independent, running stages.
val stages = jobIdToStageIds(job.jobId)
if (stages.isEmpty) {
@@ -1073,7 +1077,7 @@ class DAGScheduler(
// This is the only job that uses this stage, so fail the stage if it is running.
val stage = stageIdToStage(stageId)
if (runningStages.contains(stage)) {
- taskScheduler.cancelTasks(stageId)
+ taskScheduler.cancelTasks(stageId, shouldInterruptThread)
val stageInfo = stageToInfos(stage)
stageInfo.stageFailed(failureReason)
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index f1924a4573..6a6d8e609b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -28,5 +28,6 @@ private[spark] trait SchedulerBackend {
def reviveOffers(): Unit
def defaultParallelism(): Int
- def killTask(taskId: Long, executorId: String): Unit = throw new UnsupportedOperationException
+ def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
+ throw new UnsupportedOperationException
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index a8bcb7dfe2..2ca3479c80 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -44,8 +44,9 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
final def run(attemptId: Long): T = {
context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)
+ taskThread = Thread.currentThread()
if (_killed) {
- kill()
+ kill(interruptThread = false)
}
runTask(context)
}
@@ -62,6 +63,9 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
// Task context, to be initialized in run().
@transient protected var context: TaskContext = _
+ // The actual Thread on which the task is running, if any. Initialized in run().
+ @volatile @transient private var taskThread: Thread = _
+
// A flag to indicate whether the task is killed. This is used in case context is not yet
// initialized when kill() is invoked.
@volatile @transient private var _killed = false
@@ -75,12 +79,16 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
* Kills a task by setting the interrupted flag to true. This relies on the upper level Spark
* code and user code to properly handle the flag. This function should be idempotent so it can
* be called multiple times.
+ * If interruptThread is true, we will also call Thread.interrupt() on the Task's executor thread.
*/
- def kill() {
+ def kill(interruptThread: Boolean) {
_killed = true
if (context != null) {
context.interrupted = true
}
+ if (interruptThread && taskThread != null) {
+ taskThread.interrupt()
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 92616c997e..819c35257b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -47,7 +47,7 @@ private[spark] trait TaskScheduler {
def submitTasks(taskSet: TaskSet): Unit
// Cancel a stage.
- def cancelTasks(stageId: Int)
+ def cancelTasks(stageId: Int, interruptThread: Boolean)
// Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
def setDAGScheduler(dagScheduler: DAGScheduler): Unit
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index fe72ab3e43..be19d9b885 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -170,7 +170,7 @@ private[spark] class TaskSchedulerImpl(
backend.reviveOffers()
}
- override def cancelTasks(stageId: Int): Unit = synchronized {
+ override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
logInfo("Cancelling stage " + stageId)
activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) =>
// There are two possible cases here:
@@ -181,7 +181,7 @@ private[spark] class TaskSchedulerImpl(
// simply abort the stage.
tsm.runningTasksSet.foreach { tid =>
val execId = taskIdToExecutorId(tid)
- backend.killTask(tid, execId)
+ backend.killTask(tid, execId, interruptThread)
}
tsm.abort("Stage %s cancelled".format(stageId))
logInfo("Stage %d was cancelled".format(stageId))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
index 03bf760837..613fa7850b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
@@ -31,8 +31,8 @@ private[spark] class TaskSet(
val properties: Properties) {
val id: String = stageId + "." + attempt
- def kill() {
- tasks.foreach(_.kill())
+ def kill(interruptThread: Boolean) {
+ tasks.foreach(_.kill(interruptThread))
}
override def toString: String = "TaskSet " + id
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 4a9a1659d8..ddbc74e82a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -30,7 +30,8 @@ private[spark] object CoarseGrainedClusterMessages {
// Driver to executors
case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
- case class KillTask(taskId: Long, executor: String) extends CoarseGrainedClusterMessage
+ case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
+ extends CoarseGrainedClusterMessage
case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
extends CoarseGrainedClusterMessage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7bfc30b420..a6d6b3d26a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -101,8 +101,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
case ReviveOffers =>
makeOffers()
- case KillTask(taskId, executorId) =>
- executorActor(executorId) ! KillTask(taskId, executorId)
+ case KillTask(taskId, executorId, interruptThread) =>
+ executorActor(executorId) ! KillTask(taskId, executorId, interruptThread)
case StopDriver =>
sender ! true
@@ -207,8 +207,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
driverActor ! ReviveOffers
}
- override def killTask(taskId: Long, executorId: String) {
- driverActor ! KillTask(taskId, executorId)
+ override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) {
+ driverActor ! KillTask(taskId, executorId, interruptThread)
}
override def defaultParallelism(): Int = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 16e2f5cf30..43f0e18a0c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -30,7 +30,7 @@ private case class ReviveOffers()
private case class StatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)
-private case class KillTask(taskId: Long)
+private case class KillTask(taskId: Long, interruptThread: Boolean)
/**
* Calls to LocalBackend are all serialized through LocalActor. Using an actor makes the calls on
@@ -61,8 +61,8 @@ private[spark] class LocalActor(
reviveOffers()
}
- case KillTask(taskId) =>
- executor.killTask(taskId)
+ case KillTask(taskId, interruptThread) =>
+ executor.killTask(taskId, interruptThread)
}
def reviveOffers() {
@@ -99,8 +99,8 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
override def defaultParallelism() = totalCores
- override def killTask(taskId: Long, executorId: String) {
- localActor ! KillTask(taskId)
+ override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) {
+ localActor ! KillTask(taskId, interruptThread)
}
override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 35a7ac9d04..ff69eb7e53 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -58,7 +58,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
taskSets += taskSet
}
- override def cancelTasks(stageId: Int) {
+ override def cancelTasks(stageId: Int, interruptThread: Boolean) {
cancelledStages += stageId
}
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}