aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2016-09-21 17:49:36 -0400
committerAndrew Or <andrewor14@gmail.com>2016-09-21 17:49:36 -0400
commit9fcf1c51d518847eda7f5ea71337cfa7def3c45c (patch)
tree6897409cc22d91c539ca3fec581438087d4a7bd6
parent2cd1bfa4f0c6625b0ab1dbeba2b9586b9a6a9f42 (diff)
downloadspark-9fcf1c51d518847eda7f5ea71337cfa7def3c45c.tar.gz
spark-9fcf1c51d518847eda7f5ea71337cfa7def3c45c.tar.bz2
spark-9fcf1c51d518847eda7f5ea71337cfa7def3c45c.zip
[SPARK-17623][CORE] Clarify type of TaskEndReason with a failed task.
## What changes were proposed in this pull request? In TaskResultGetter, enqueueFailedTask currently deserializes the result as a TaskEndReason. But the type is actually more specific, its a TaskFailedReason. This just leads to more blind casting later on – it would be more clear if the msg was cast to the right type immediately, so method parameter types could be tightened. ## How was this patch tested? Existing unit tests via jenkins. Note that the code was already performing a blind-cast to a TaskFailedReason before in any case, just in a different spot, so there shouldn't be any behavior change. Author: Imran Rashid <irashid@cloudera.com> Closes #15181 from squito/SPARK-17623.
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala2
7 files changed, 13 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
index 7d84889a2d..326e042419 100644
--- a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
@@ -17,7 +17,7 @@
package org.apache.spark.executor
-import org.apache.spark.{TaskCommitDenied, TaskEndReason}
+import org.apache.spark.{TaskCommitDenied, TaskFailedReason}
/**
* Exception thrown when a task attempts to commit output to HDFS but is denied by the driver.
@@ -29,5 +29,5 @@ private[spark] class CommitDeniedException(
attemptNumber: Int)
extends Exception(msg) {
- def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptNumber)
+ def toTaskFailedReason: TaskFailedReason = TaskCommitDenied(jobID, splitID, attemptNumber)
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index fbf2b86db1..668ec41153 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -355,7 +355,7 @@ private[spark] class Executor(
} catch {
case ffe: FetchFailedException =>
- val reason = ffe.toTaskEndReason
+ val reason = ffe.toTaskFailedReason
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
@@ -370,7 +370,7 @@ private[spark] class Executor(
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
case CausedBy(cDE: CommitDeniedException) =>
- val reason = cDE.toTaskEndReason
+ val reason = cDE.toTaskFailedReason
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 685ef55c66..1c3fcbd461 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -118,14 +118,14 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState,
serializedData: ByteBuffer) {
- var reason : TaskEndReason = UnknownReason
+ var reason : TaskFailedReason = UnknownReason
try {
getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
val loader = Utils.getContextOrSparkClassLoader
try {
if (serializedData != null && serializedData.limit() > 0) {
- reason = serializer.get().deserialize[TaskEndReason](
+ reason = serializer.get().deserialize[TaskFailedReason](
serializedData, loader)
}
} catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index ee5cbfeb47..52a7186cbf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -431,7 +431,7 @@ private[spark] class TaskSchedulerImpl(
taskSetManager: TaskSetManager,
tid: Long,
taskState: TaskState,
- reason: TaskEndReason): Unit = synchronized {
+ reason: TaskFailedReason): Unit = synchronized {
taskSetManager.handleFailedTask(tid, taskState, reason)
if (!taskSetManager.isZombie && taskState != TaskState.KILLED) {
// Need to revive offers again now that the task set manager state has been updated to
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 2fef447b0a..226bed284a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -696,7 +696,7 @@ private[spark] class TaskSetManager(
* Marks the task as failed, re-adds it to the list of pending tasks, and notifies the
* DAG Scheduler.
*/
- def handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) {
+ def handleFailedTask(tid: Long, state: TaskState, reason: TaskFailedReason) {
val info = taskInfos(tid)
if (info.failed || info.killed) {
return
@@ -707,7 +707,7 @@ private[spark] class TaskSetManager(
copiesRunning(index) -= 1
var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty
val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " +
- reason.asInstanceOf[TaskFailedReason].toErrorString
+ reason.toErrorString
val failureException: Option[Throwable] = reason match {
case fetchFailed: FetchFailed =>
logWarning(failureReason)
@@ -765,10 +765,6 @@ private[spark] class TaskSetManager(
case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others
logWarning(failureReason)
None
-
- case e: TaskEndReason =>
- logError("Unknown TaskEndReason: " + e)
- None
}
// always add to failed executors
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
@@ -784,9 +780,7 @@ private[spark] class TaskSetManager(
addPendingTask(index)
}
- if (!isZombie && state != TaskState.KILLED
- && reason.isInstanceOf[TaskFailedReason]
- && reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {
+ if (!isZombie && state != TaskState.KILLED && reason.countTowardsTaskFailures) {
assert (null != failureReason)
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
index b2d050b218..498c12e196 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
@@ -17,7 +17,7 @@
package org.apache.spark.shuffle
-import org.apache.spark.{FetchFailed, TaskEndReason}
+import org.apache.spark.{FetchFailed, TaskFailedReason}
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils
@@ -45,7 +45,7 @@ private[spark] class FetchFailedException(
this(bmAddress, shuffleId, mapId, reduceId, cause.getMessage, cause)
}
- def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId,
+ def toTaskFailedReason: TaskFailedReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId,
Utils.exceptionString(this))
}
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index c89be22a34..00314abf49 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -146,7 +146,7 @@ class JsonProtocolSuite extends SparkFunSuite {
val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19,
"Some exception")
val fetchMetadataFailed = new MetadataFetchFailedException(17,
- 19, "metadata Fetch failed exception").toTaskEndReason
+ 19, "metadata Fetch failed exception").toTaskFailedReason
val exceptionFailure = new ExceptionFailure(exception, Seq.empty[AccumulableInfo])
testTaskEndReason(Success)
testTaskEndReason(Resubmitted)