aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-04-08 17:23:32 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-08 17:23:32 -0700
commit813e96e6faee44079eb52acbdc6c8aa58fb8d191 (patch)
treec9b8bfd92499f34c3a3b82a4ea5c16af59e2c7dd /sql
parent4d7c35926371f9e016577987c037abcf984443d9 (diff)
downloadspark-813e96e6faee44079eb52acbdc6c8aa58fb8d191.tar.gz
spark-813e96e6faee44079eb52acbdc6c8aa58fb8d191.tar.bz2
spark-813e96e6faee44079eb52acbdc6c8aa58fb8d191.zip
[SPARK-14454] Better exception handling while marking tasks as failed
## What changes were proposed in this pull request? This patch adds support for better handling of exceptions inside catch blocks if the code within the block throws an exception. For instance here is the code in a catch block before this change in `WriterContainer.scala`: ```scala logError("Aborting task.", cause) // call failure callbacks first, so we could have a chance to cleanup the writer. TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause) if (currentWriter != null) { currentWriter.close() } abortTask() throw new SparkException("Task failed while writing rows.", cause) ``` If `markTaskFailed` or `currentWriter.close` throws an exception, we currently lose the original cause. This PR fixes this problem by implementing a utility function `Utils.tryWithSafeCatch` that suppresses (`Throwable.addSuppressed`) the exception that are thrown within the catch block and rethrowing the original exception. ## How was this patch tested? No new functionality added Author: Sameer Agarwal <sameer@databricks.com> Closes #12234 from sameeragarwal/fix-exception.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala71
1 files changed, 34 insertions, 37 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index d2bbf196cb..b9a3162aba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{HadoopFsRelation, OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, Utils}
/** A container for all the details required when writing to a table. */
case class WriteRelation(
@@ -247,19 +247,16 @@ private[sql] class DefaultWriterContainer(
// If anything below fails, we should abort the task.
try {
- while (iterator.hasNext) {
- val internalRow = iterator.next()
- writer.writeInternal(internalRow)
- }
-
- commitTask()
+ Utils.tryWithSafeFinallyAndFailureCallbacks {
+ while (iterator.hasNext) {
+ val internalRow = iterator.next()
+ writer.writeInternal(internalRow)
+ }
+ commitTask()
+ }(catchBlock = abortTask())
} catch {
- case cause: Throwable =>
- logError("Aborting task.", cause)
- // call failure callbacks first, so we could have a chance to cleanup the writer.
- TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
- abortTask()
- throw new SparkException("Task failed while writing rows.", cause)
+ case t: Throwable =>
+ throw new SparkException("Task failed while writing rows", t)
}
def commitTask(): Unit = {
@@ -413,37 +410,37 @@ private[sql] class DynamicPartitionWriterContainer(
// If anything below fails, we should abort the task.
var currentWriter: OutputWriter = null
try {
- var currentKey: UnsafeRow = null
- while (sortedIterator.next()) {
- val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
- if (currentKey != nextKey) {
- if (currentWriter != null) {
- currentWriter.close()
- currentWriter = null
+ Utils.tryWithSafeFinallyAndFailureCallbacks {
+ var currentKey: UnsafeRow = null
+ while (sortedIterator.next()) {
+ val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
+ if (currentKey != nextKey) {
+ if (currentWriter != null) {
+ currentWriter.close()
+ currentWriter = null
+ }
+ currentKey = nextKey.copy()
+ logDebug(s"Writing partition: $currentKey")
+
+ currentWriter = newOutputWriter(currentKey, getPartitionString)
}
- currentKey = nextKey.copy()
- logDebug(s"Writing partition: $currentKey")
-
- currentWriter = newOutputWriter(currentKey, getPartitionString)
+ currentWriter.writeInternal(sortedIterator.getValue)
+ }
+ if (currentWriter != null) {
+ currentWriter.close()
+ currentWriter = null
}
- currentWriter.writeInternal(sortedIterator.getValue)
- }
- if (currentWriter != null) {
- currentWriter.close()
- currentWriter = null
- }
- commitTask()
- } catch {
- case cause: Throwable =>
- logError("Aborting task.", cause)
- // call failure callbacks first, so we could have a chance to cleanup the writer.
- TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
+ commitTask()
+ }(catchBlock = {
if (currentWriter != null) {
currentWriter.close()
}
abortTask()
- throw new SparkException("Task failed while writing rows.", cause)
+ })
+ } catch {
+ case t: Throwable =>
+ throw new SparkException("Task failed while writing rows", t)
}
}
}