diff options
7 files changed, 13 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala index 7d84889a2d..326e042419 100644 --- a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala +++ b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import org.apache.spark.{TaskCommitDenied, TaskEndReason} +import org.apache.spark.{TaskCommitDenied, TaskFailedReason} /** * Exception thrown when a task attempts to commit output to HDFS but is denied by the driver. @@ -29,5 +29,5 @@ private[spark] class CommitDeniedException( attemptNumber: Int) extends Exception(msg) { - def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptNumber) + def toTaskFailedReason: TaskFailedReason = TaskCommitDenied(jobID, splitID, attemptNumber) } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index fbf2b86db1..668ec41153 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -355,7 +355,7 @@ private[spark] class Executor( } catch { case ffe: FetchFailedException => - val reason = ffe.toTaskEndReason + val reason = ffe.toTaskFailedReason setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) @@ -370,7 +370,7 @@ private[spark] class Executor( execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) case CausedBy(cDE: CommitDeniedException) => - val reason = cDE.toTaskEndReason + val reason = cDE.toTaskFailedReason setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 685ef55c66..1c3fcbd461 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -118,14 +118,14 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, serializedData: ByteBuffer) { - var reason : TaskEndReason = UnknownReason + var reason : TaskFailedReason = UnknownReason try { getTaskResultExecutor.execute(new Runnable { override def run(): Unit = Utils.logUncaughtExceptions { val loader = Utils.getContextOrSparkClassLoader try { if (serializedData != null && serializedData.limit() > 0) { - reason = serializer.get().deserialize[TaskEndReason]( + reason = serializer.get().deserialize[TaskFailedReason]( serializedData, loader) } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index ee5cbfeb47..52a7186cbf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -431,7 +431,7 @@ private[spark] class TaskSchedulerImpl( taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, - reason: TaskEndReason): Unit = synchronized { + reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { // Need to revive offers again now that the task set manager state has been updated to diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 2fef447b0a..226bed284a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -696,7 +696,7 @@ private[spark] class TaskSetManager( * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the * DAG Scheduler. */ - def handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) { + def handleFailedTask(tid: Long, state: TaskState, reason: TaskFailedReason) { val info = taskInfos(tid) if (info.failed || info.killed) { return @@ -707,7 +707,7 @@ private[spark] class TaskSetManager( copiesRunning(index) -= 1 var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " + - reason.asInstanceOf[TaskFailedReason].toErrorString + reason.toErrorString val failureException: Option[Throwable] = reason match { case fetchFailed: FetchFailed => logWarning(failureReason) @@ -765,10 +765,6 @@ private[spark] class TaskSetManager( case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others logWarning(failureReason) None - - case e: TaskEndReason => - logError("Unknown TaskEndReason: " + e) - None } // always add to failed executors failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()). @@ -784,9 +780,7 @@ private[spark] class TaskSetManager( addPendingTask(index) } - if (!isZombie && state != TaskState.KILLED - && reason.isInstanceOf[TaskFailedReason] - && reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) { + if (!isZombie && state != TaskState.KILLED && reason.countTowardsTaskFailures) { assert (null != failureReason) numFailures(index) += 1 if (numFailures(index) >= maxTaskFailures) { diff --git a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala index b2d050b218..498c12e196 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala @@ -17,7 +17,7 @@ package org.apache.spark.shuffle -import org.apache.spark.{FetchFailed, TaskEndReason} +import org.apache.spark.{FetchFailed, TaskFailedReason} import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.Utils @@ -45,7 +45,7 @@ private[spark] class FetchFailedException( this(bmAddress, shuffleId, mapId, reduceId, cause.getMessage, cause) } - def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, + def toTaskFailedReason: TaskFailedReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, Utils.exceptionString(this)) } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index c89be22a34..00314abf49 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -146,7 +146,7 @@ class JsonProtocolSuite extends SparkFunSuite { val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19, "Some exception") val fetchMetadataFailed = new MetadataFetchFailedException(17, - 19, "metadata Fetch failed exception").toTaskEndReason + 19, "metadata Fetch failed exception").toTaskFailedReason val exceptionFailure = new ExceptionFailure(exception, Seq.empty[AccumulableInfo]) testTaskEndReason(Success) testTaskEndReason(Resubmitted) |