aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala71
1 files changed, 34 insertions, 37 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index d2bbf196cb..b9a3162aba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{HadoopFsRelation, OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, Utils}
/** A container for all the details required when writing to a table. */
case class WriteRelation(
@@ -247,19 +247,16 @@ private[sql] class DefaultWriterContainer(
// If anything below fails, we should abort the task.
try {
- while (iterator.hasNext) {
- val internalRow = iterator.next()
- writer.writeInternal(internalRow)
- }
-
- commitTask()
+ Utils.tryWithSafeFinallyAndFailureCallbacks {
+ while (iterator.hasNext) {
+ val internalRow = iterator.next()
+ writer.writeInternal(internalRow)
+ }
+ commitTask()
+ }(catchBlock = abortTask())
} catch {
- case cause: Throwable =>
- logError("Aborting task.", cause)
- // call failure callbacks first, so we could have a chance to cleanup the writer.
- TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
- abortTask()
- throw new SparkException("Task failed while writing rows.", cause)
+ case t: Throwable =>
+ throw new SparkException("Task failed while writing rows", t)
}
def commitTask(): Unit = {
@@ -413,37 +410,37 @@ private[sql] class DynamicPartitionWriterContainer(
// If anything below fails, we should abort the task.
var currentWriter: OutputWriter = null
try {
- var currentKey: UnsafeRow = null
- while (sortedIterator.next()) {
- val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
- if (currentKey != nextKey) {
- if (currentWriter != null) {
- currentWriter.close()
- currentWriter = null
+ Utils.tryWithSafeFinallyAndFailureCallbacks {
+ var currentKey: UnsafeRow = null
+ while (sortedIterator.next()) {
+ val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
+ if (currentKey != nextKey) {
+ if (currentWriter != null) {
+ currentWriter.close()
+ currentWriter = null
+ }
+ currentKey = nextKey.copy()
+ logDebug(s"Writing partition: $currentKey")
+
+ currentWriter = newOutputWriter(currentKey, getPartitionString)
}
- currentKey = nextKey.copy()
- logDebug(s"Writing partition: $currentKey")
-
- currentWriter = newOutputWriter(currentKey, getPartitionString)
+ currentWriter.writeInternal(sortedIterator.getValue)
+ }
+ if (currentWriter != null) {
+ currentWriter.close()
+ currentWriter = null
}
- currentWriter.writeInternal(sortedIterator.getValue)
- }
- if (currentWriter != null) {
- currentWriter.close()
- currentWriter = null
- }
- commitTask()
- } catch {
- case cause: Throwable =>
- logError("Aborting task.", cause)
- // call failure callbacks first, so we could have a chance to cleanup the writer.
- TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
+ commitTask()
+ }(catchBlock = {
if (currentWriter != null) {
currentWriter.close()
}
abortTask()
- throw new SparkException("Task failed while writing rows.", cause)
+ })
+ } catch {
+ case t: Throwable =>
+ throw new SparkException("Task failed while writing rows", t)
}
}
}