aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRui Li <rui.li@intel.com>2017-01-05 14:51:13 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2017-01-05 14:51:13 -0800
commitf5d18af6a8a0b9f8c2e9677f9d8ae1712eb701c6 (patch)
tree51d688be41c3fa81e86f9b4e7a19143601497928
parent30345c43b7d17bb00184b60a547225bae8ee78e7 (diff)
downloadspark-f5d18af6a8a0b9f8c2e9677f9d8ae1712eb701c6.tar.gz
spark-f5d18af6a8a0b9f8c2e9677f9d8ae1712eb701c6.tar.bz2
spark-f5d18af6a8a0b9f8c2e9677f9d8ae1712eb701c6.zip
[SPARK-14958][CORE] Failed task not handled when there's error deserializing failure reason
## What changes were proposed in this pull request? TaskResultGetter tries to deserialize the TaskEndReason before handling the failed task. If an error is thrown during deserialization, the failed task won't be handled, which leaves the job hanging. The PR proposes to handle the failed task in a finally block. ## How was this patch tested? In my case I hit a NoClassDefFoundError and the job hangs. Manually verified the patch can fix it. Author: Rui Li <rui.li@intel.com> Author: Rui Li <lirui@apache.org> Author: Rui Li <shlr@cn.ibm.com> Closes #12775 from lirui-intel/SPARK-14958.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala21
2 files changed, 25 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index b1addc128e..a284f7956c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -143,8 +143,12 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
logError(
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
case ex: Exception => // No-op
+ } finally {
+ // If there's an error while deserializing the TaskEndReason, this Runnable
+ // will die. Still tell the scheduler about the task failure, to avoid a hang
+ // where the scheduler thinks the task is still running.
+ scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
}
- scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
}
})
} catch {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index c9e682f53c..3e55d399e9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler
-import java.io.File
+import java.io.{File, ObjectInputStream}
import java.net.URL
import java.nio.ByteBuffer
@@ -248,5 +248,24 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
assert(resSizeAfter.exists(_.toString.toLong > 0L))
}
+ test("failed task is handled when error occurs deserializing the reason") {
+ sc = new SparkContext("local", "test", conf)
+ val rdd = sc.parallelize(Seq(1), 1).map { _ =>
+ throw new UndeserializableException
+ }
+ val message = intercept[SparkException] {
+ rdd.collect()
+ }.getMessage
+ // Job failed, even though the failure reason is unknown.
+ val unknownFailure = """(?s).*Lost task.*: UnknownReason.*""".r
+ assert(unknownFailure.findFirstMatchIn(message).isDefined)
+ }
+
+}
+
+private class UndeserializableException extends Exception {
+ private def readObject(in: ObjectInputStream): Unit = {
+ throw new NoClassDefFoundError()
+ }
}