aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-10-11 18:15:04 -0700
committerReynold Xin <rxin@apache.org>2013-10-11 18:15:04 -0700
commitab0940f0c258085bbf930d43be0b9034aad039cf (patch)
tree811dcd9dec83b899dbfc5979e0b12bb8d8354ad9
parent97ffebbe87a63da994191dd71727e089b15fa0e5 (diff)
downloadspark-ab0940f0c258085bbf930d43be0b9034aad039cf.tar.gz
spark-ab0940f0c258085bbf930d43be0b9034aad039cf.tar.bz2
spark-ab0940f0c258085bbf930d43be0b9034aad039cf.zip
Job cancellation: addressed code review feedback round 2 from Kay.
-rw-r--r--core/src/main/scala/org/apache/spark/FutureAction.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala80
3 files changed, 47 insertions, 44 deletions
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 9185b9529c..85018cb046 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -56,7 +56,7 @@ trait FutureAction[T] extends Future[T] {
override def result(atMost: Duration)(implicit permit: CanAwait): T
/**
- * When this action is completed, either through an exception, or a value, apply the provided
+ * When this action is completed, either through an exception, or a value, applies the provided
* function.
*/
def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext)
@@ -91,7 +91,7 @@ class FutureJob[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
extends FutureAction[T] {
override def cancel() {
- jobWaiter.kill()
+ jobWaiter.cancel()
}
override def ready(atMost: Duration)(implicit permit: CanAwait): FutureJob.this.type = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
index 727454725d..58f238d8cf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -39,7 +39,12 @@ private[spark] class JobWaiter[T](
// partition RDDs), we set the jobResult directly to JobSucceeded.
private var jobResult: JobResult = if (jobFinished) JobSucceeded else null
- def kill() {
+ /**
+ * Sends a signal to the DAGScheduler to cancel the job. The cancellation itself is handled
+ * asynchronously. After the low level scheduler cancels all the tasks belonging to this job, it
+ * will fail this job with a SparkException.
+ */
+ def cancel() {
dagScheduler.cancelJob(jobId)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 1198bac6dd..7e0667ba03 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -461,54 +461,52 @@ private[spark] class ClusterTaskSetManager(
// Check if the problem is a map output fetch failure. In that case, this
// task will never succeed on any node, so tell the scheduler about it.
reason.foreach {
- _ match {
- case fetchFailed: FetchFailed =>
- logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress)
- sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null)
- successful(index) = true
- tasksSuccessful += 1
- sched.taskSetFinished(this)
- removeAllRunningTasks()
- return
-
- case TaskKilled =>
- logInfo("Task %d was killed.".format(tid))
- abort("Task %d was killed".format(tid))
- return
-
- case ef: ExceptionFailure =>
- sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
- val key = ef.description
- val now = clock.getTime()
- val (printFull, dupCount) = {
- if (recentExceptions.contains(key)) {
- val (dupCount, printTime) = recentExceptions(key)
- if (now - printTime > EXCEPTION_PRINT_INTERVAL) {
- recentExceptions(key) = (0, now)
- (true, 0)
- } else {
- recentExceptions(key) = (dupCount + 1, printTime)
- (false, dupCount + 1)
- }
- } else {
+ case fetchFailed: FetchFailed =>
+ logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress)
+ sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null)
+ successful(index) = true
+ tasksSuccessful += 1
+ sched.taskSetFinished(this)
+ removeAllRunningTasks()
+ return
+
+ case TaskKilled =>
+ logInfo("Task %d was killed.".format(tid))
+ sched.listener.taskEnded(tasks(index), reason.get, null, null, info, null)
+ return
+
+ case ef: ExceptionFailure =>
+ sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
+ val key = ef.description
+ val now = clock.getTime()
+ val (printFull, dupCount) = {
+ if (recentExceptions.contains(key)) {
+ val (dupCount, printTime) = recentExceptions(key)
+ if (now - printTime > EXCEPTION_PRINT_INTERVAL) {
recentExceptions(key) = (0, now)
(true, 0)
+ } else {
+ recentExceptions(key) = (dupCount + 1, printTime)
+ (false, dupCount + 1)
}
- }
- if (printFull) {
- val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
- logInfo("Loss was due to %s\n%s\n%s".format(
- ef.className, ef.description, locs.mkString("\n")))
} else {
- logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
+ recentExceptions(key) = (0, now)
+ (true, 0)
}
+ }
+ if (printFull) {
+ val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
+ logInfo("Loss was due to %s\n%s\n%s".format(
+ ef.className, ef.description, locs.mkString("\n")))
+ } else {
+ logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
+ }
- case TaskResultLost =>
- logInfo("Lost result for TID %s on host %s".format(tid, info.host))
- sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
+ case TaskResultLost =>
+ logInfo("Lost result for TID %s on host %s".format(tid, info.host))
+ sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
- case _ => {}
- }
+ case _ => {}
}
// On non-fetch failures, re-enqueue the task as pending for a max number of retries
addPendingTask(index)