aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala91
1 files changed, 40 insertions, 51 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 233ac263aa..815d1d01ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -33,9 +33,9 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.UnsafeKVExternalSorter
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.{HadoopFsRelation, OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.sources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, Utils}
/** A container for all the details required when writing to a table. */
case class WriteRelation(
@@ -129,16 +129,17 @@ private[sql] abstract class BaseWriterContainer(
outputWriterFactory.newInstance(path, bucketId, dataSchema, taskAttemptContext)
} catch {
case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
- if (outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) {
- // Spark-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry
+ if (outputCommitter.getClass.getName.contains("Direct")) {
+ // SPARK-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry
// attempts, the task will fail because the output file is created from a prior attempt.
// This often means the most visible error to the user is misleading. Augment the error
// to tell the user to look for the actual error.
throw new SparkException("The output file already exists but this could be due to a " +
"failure from an earlier attempt. Look through the earlier logs or stage page for " +
- "the first error.\n File exists error: " + e)
+ "the first error.\n File exists error: " + e, e)
+ } else {
+ throw e
}
- throw e
}
}
@@ -156,15 +157,6 @@ private[sql] abstract class BaseWriterContainer(
s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
"for appending.")
defaultOutputCommitter
- } else if (speculationEnabled) {
- // When speculation is enabled, it's not safe to use customized output committer classes,
- // especially direct output committers (e.g. `DirectParquetOutputCommitter`).
- //
- // See SPARK-9899 for more details.
- logInfo(
- s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
- "because spark.speculation is configured to be true.")
- defaultOutputCommitter
} else {
val configuration = context.getConfiguration
val committerClass = configuration.getClass(
@@ -255,19 +247,16 @@ private[sql] class DefaultWriterContainer(
// If anything below fails, we should abort the task.
try {
- while (iterator.hasNext) {
- val internalRow = iterator.next()
- writer.writeInternal(internalRow)
- }
-
- commitTask()
+ Utils.tryWithSafeFinallyAndFailureCallbacks {
+ while (iterator.hasNext) {
+ val internalRow = iterator.next()
+ writer.writeInternal(internalRow)
+ }
+ commitTask()
+ }(catchBlock = abortTask())
} catch {
- case cause: Throwable =>
- logError("Aborting task.", cause)
- // call failure callbacks first, so we could have a chance to cleanup the writer.
- TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
- abortTask()
- throw new SparkException("Task failed while writing rows.", cause)
+ case t: Throwable =>
+ throw new SparkException("Task failed while writing rows", t)
}
def commitTask(): Unit = {
@@ -421,37 +410,37 @@ private[sql] class DynamicPartitionWriterContainer(
// If anything below fails, we should abort the task.
var currentWriter: OutputWriter = null
try {
- var currentKey: UnsafeRow = null
- while (sortedIterator.next()) {
- val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
- if (currentKey != nextKey) {
- if (currentWriter != null) {
- currentWriter.close()
- currentWriter = null
+ Utils.tryWithSafeFinallyAndFailureCallbacks {
+ var currentKey: UnsafeRow = null
+ while (sortedIterator.next()) {
+ val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
+ if (currentKey != nextKey) {
+ if (currentWriter != null) {
+ currentWriter.close()
+ currentWriter = null
+ }
+ currentKey = nextKey.copy()
+ logDebug(s"Writing partition: $currentKey")
+
+ currentWriter = newOutputWriter(currentKey, getPartitionString)
}
- currentKey = nextKey.copy()
- logDebug(s"Writing partition: $currentKey")
-
- currentWriter = newOutputWriter(currentKey, getPartitionString)
+ currentWriter.writeInternal(sortedIterator.getValue)
+ }
+ if (currentWriter != null) {
+ currentWriter.close()
+ currentWriter = null
}
- currentWriter.writeInternal(sortedIterator.getValue)
- }
- if (currentWriter != null) {
- currentWriter.close()
- currentWriter = null
- }
- commitTask()
- } catch {
- case cause: Throwable =>
- logError("Aborting task.", cause)
- // call failure callbacks first, so we could have a chance to cleanup the writer.
- TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
+ commitTask()
+ }(catchBlock = {
if (currentWriter != null) {
currentWriter.close()
}
abortTask()
- throw new SparkException("Task failed while writing rows.", cause)
+ })
+ } catch {
+ case t: Throwable =>
+ throw new SparkException("Task failed while writing rows", t)
}
}
}