aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorJason Moore <jasonmoore2k@outlook.com>2016-04-09 23:34:57 -0700
committerAndrew Or <andrew@databricks.com>2016-04-09 23:34:57 -0700
commit22014e6fb919a35c31d852b7c2f5b7eb05751208 (patch)
tree056fcecf28966453066591ae350dadcf5e252deb /core/src/main/scala
parent2c95e4e966b90d2a315350608d4b21b0381dfd11 (diff)
downloadspark-22014e6fb919a35c31d852b7c2f5b7eb05751208.tar.gz
spark-22014e6fb919a35c31d852b7c2f5b7eb05751208.tar.bz2
spark-22014e6fb919a35c31d852b7c2f5b7eb05751208.zip
[SPARK-14357][CORE] Properly handle the root cause being a commit denied exception
## What changes were proposed in this pull request? When deciding whether a CommitDeniedException caused a task to fail, consider the root cause of the Exception. ## How was this patch tested? Added a test suite for the component that extracts the root cause of the error. Made a distribution after cherry-picking this commit to branch-1.6 and used to run our Spark application that would quite often fail due to the CommitDeniedException. Author: Jason Moore <jasonmoore2k@outlook.com> Closes #12228 from jasonmoore2k/SPARK-14357.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/CausedBy.scala36
2 files changed, 37 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 09c5733565..afa4d6093a 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -321,7 +321,7 @@ private[spark] class Executor(
logInfo(s"Executor killed $taskName (TID $taskId)")
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
- case cDE: CommitDeniedException =>
+ case CausedBy(cDE: CommitDeniedException) =>
val reason = cDE.toTaskEndReason
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
diff --git a/core/src/main/scala/org/apache/spark/util/CausedBy.scala b/core/src/main/scala/org/apache/spark/util/CausedBy.scala
new file mode 100644
index 0000000000..73df446d98
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/CausedBy.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/**
+ * Extractor Object for pulling out the root cause of an error.
+ * If the error contains no cause, it will return the error itself.
+ *
+ * Usage:
+ * try {
+ * ...
+ * } catch {
+ * case CausedBy(ex: CommitDeniedException) => ...
+ * }
+ */
+private[spark] object CausedBy {
+
+ def unapply(e: Throwable): Option[Throwable] = {
+ Option(e.getCause).flatMap(cause => unapply(cause)).orElse(Some(e))
+ }
+}