diff options
3 files changed, 47 insertions, 44 deletions
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 9185b9529c..85018cb046 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -56,7 +56,7 @@ trait FutureAction[T] extends Future[T] { override def result(atMost: Duration)(implicit permit: CanAwait): T /** - * When this action is completed, either through an exception, or a value, apply the provided + * When this action is completed, either through an exception, or a value, applies the provided * function. */ def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext) @@ -91,7 +91,7 @@ class FutureJob[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T) extends FutureAction[T] { override def cancel() { - jobWaiter.kill() + jobWaiter.cancel() } override def ready(atMost: Duration)(implicit permit: CanAwait): FutureJob.this.type = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 727454725d..58f238d8cf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -39,7 +39,12 @@ private[spark] class JobWaiter[T]( // partition RDDs), we set the jobResult directly to JobSucceeded. private var jobResult: JobResult = if (jobFinished) JobSucceeded else null - def kill() { + /** + * Sends a signal to the DAGScheduler to cancel the job. The cancellation itself is handled + * asynchronously. After the low level scheduler cancels all the tasks belonging to this job, it + * will fail this job with a SparkException. + */ + def cancel() { dagScheduler.cancelJob(jobId) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala index 1198bac6dd..7e0667ba03 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -461,54 +461,52 @@ private[spark] class ClusterTaskSetManager( // Check if the problem is a map output fetch failure. In that case, this // task will never succeed on any node, so tell the scheduler about it. reason.foreach { - _ match { - case fetchFailed: FetchFailed => - logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress) - sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null) - successful(index) = true - tasksSuccessful += 1 - sched.taskSetFinished(this) - removeAllRunningTasks() - return - - case TaskKilled => - logInfo("Task %d was killed.".format(tid)) - abort("Task %d was killed".format(tid)) - return - - case ef: ExceptionFailure => - sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null)) - val key = ef.description - val now = clock.getTime() - val (printFull, dupCount) = { - if (recentExceptions.contains(key)) { - val (dupCount, printTime) = recentExceptions(key) - if (now - printTime > EXCEPTION_PRINT_INTERVAL) { - recentExceptions(key) = (0, now) - (true, 0) - } else { - recentExceptions(key) = (dupCount + 1, printTime) - (false, dupCount + 1) - } - } else { + case fetchFailed: FetchFailed => + logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress) + sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null) + successful(index) = true + tasksSuccessful += 1 + sched.taskSetFinished(this) + removeAllRunningTasks() + return + + case TaskKilled => + logInfo("Task %d was killed.".format(tid)) + sched.listener.taskEnded(tasks(index), reason.get, null, null, info, null) + return + + case ef: ExceptionFailure => + sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null)) + val key = ef.description + val now = clock.getTime() + val (printFull, dupCount) = { + if (recentExceptions.contains(key)) { + val (dupCount, printTime) = recentExceptions(key) + if (now - printTime > EXCEPTION_PRINT_INTERVAL) { recentExceptions(key) = (0, now) (true, 0) + } else { + recentExceptions(key) = (dupCount + 1, printTime) + (false, dupCount + 1) } - } - if (printFull) { - val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString)) - logInfo("Loss was due to %s\n%s\n%s".format( - ef.className, ef.description, locs.mkString("\n"))) } else { - logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount)) + recentExceptions(key) = (0, now) + (true, 0) } + } + if (printFull) { + val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString)) + logInfo("Loss was due to %s\n%s\n%s".format( + ef.className, ef.description, locs.mkString("\n"))) + } else { + logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount)) + } - case TaskResultLost => - logInfo("Lost result for TID %s on host %s".format(tid, info.host)) - sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null) + case TaskResultLost => + logInfo("Lost result for TID %s on host %s".format(tid, info.host)) + sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null) - case _ => {} - } + case _ => {} } // On non-fetch failures, re-enqueue the task as pending for a max number of retries addPendingTask(index) |