aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala4
6 files changed, 12 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
index 7d84889a2d..326e042419 100644
--- a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
@@ -17,7 +17,7 @@
package org.apache.spark.executor
-import org.apache.spark.{TaskCommitDenied, TaskEndReason}
+import org.apache.spark.{TaskCommitDenied, TaskFailedReason}
/**
* Exception thrown when a task attempts to commit output to HDFS but is denied by the driver.
@@ -29,5 +29,5 @@ private[spark] class CommitDeniedException(
attemptNumber: Int)
extends Exception(msg) {
- def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptNumber)
+ def toTaskFailedReason: TaskFailedReason = TaskCommitDenied(jobID, splitID, attemptNumber)
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index fbf2b86db1..668ec41153 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -355,7 +355,7 @@ private[spark] class Executor(
} catch {
case ffe: FetchFailedException =>
- val reason = ffe.toTaskEndReason
+ val reason = ffe.toTaskFailedReason
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
@@ -370,7 +370,7 @@ private[spark] class Executor(
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
case CausedBy(cDE: CommitDeniedException) =>
- val reason = cDE.toTaskEndReason
+ val reason = cDE.toTaskFailedReason
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 685ef55c66..1c3fcbd461 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -118,14 +118,14 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState,
serializedData: ByteBuffer) {
- var reason : TaskEndReason = UnknownReason
+ var reason : TaskFailedReason = UnknownReason
try {
getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
val loader = Utils.getContextOrSparkClassLoader
try {
if (serializedData != null && serializedData.limit() > 0) {
- reason = serializer.get().deserialize[TaskEndReason](
+ reason = serializer.get().deserialize[TaskFailedReason](
serializedData, loader)
}
} catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index ee5cbfeb47..52a7186cbf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -431,7 +431,7 @@ private[spark] class TaskSchedulerImpl(
taskSetManager: TaskSetManager,
tid: Long,
taskState: TaskState,
- reason: TaskEndReason): Unit = synchronized {
+ reason: TaskFailedReason): Unit = synchronized {
taskSetManager.handleFailedTask(tid, taskState, reason)
if (!taskSetManager.isZombie && taskState != TaskState.KILLED) {
// Need to revive offers again now that the task set manager state has been updated to
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 2fef447b0a..226bed284a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -696,7 +696,7 @@ private[spark] class TaskSetManager(
* Marks the task as failed, re-adds it to the list of pending tasks, and notifies the
* DAG Scheduler.
*/
- def handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) {
+ def handleFailedTask(tid: Long, state: TaskState, reason: TaskFailedReason) {
val info = taskInfos(tid)
if (info.failed || info.killed) {
return
@@ -707,7 +707,7 @@ private[spark] class TaskSetManager(
copiesRunning(index) -= 1
var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty
val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " +
- reason.asInstanceOf[TaskFailedReason].toErrorString
+ reason.toErrorString
val failureException: Option[Throwable] = reason match {
case fetchFailed: FetchFailed =>
logWarning(failureReason)
@@ -765,10 +765,6 @@ private[spark] class TaskSetManager(
case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others
logWarning(failureReason)
None
-
- case e: TaskEndReason =>
- logError("Unknown TaskEndReason: " + e)
- None
}
// always add to failed executors
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
@@ -784,9 +780,7 @@ private[spark] class TaskSetManager(
addPendingTask(index)
}
- if (!isZombie && state != TaskState.KILLED
- && reason.isInstanceOf[TaskFailedReason]
- && reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {
+ if (!isZombie && state != TaskState.KILLED && reason.countTowardsTaskFailures) {
assert (null != failureReason)
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
index b2d050b218..498c12e196 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
@@ -17,7 +17,7 @@
package org.apache.spark.shuffle
-import org.apache.spark.{FetchFailed, TaskEndReason}
+import org.apache.spark.{FetchFailed, TaskFailedReason}
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils
@@ -45,7 +45,7 @@ private[spark] class FetchFailedException(
this(bmAddress, shuffleId, mapId, reduceId, cause.getMessage, cause)
}
- def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId,
+ def toTaskFailedReason: TaskFailedReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId,
Utils.exceptionString(this))
}