diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala | 91 |
1 files changed, 40 insertions, 51 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 233ac263aa..815d1d01ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -33,9 +33,9 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.UnsafeKVExternalSorter import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.{HadoopFsRelation, OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.sources.{OutputWriter, OutputWriterFactory} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.{SerializableConfiguration, Utils} /** A container for all the details required when writing to a table. */ case class WriteRelation( @@ -129,16 +129,17 @@ private[sql] abstract class BaseWriterContainer( outputWriterFactory.newInstance(path, bucketId, dataSchema, taskAttemptContext) } catch { case e: org.apache.hadoop.fs.FileAlreadyExistsException => - if (outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) { - // Spark-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry + if (outputCommitter.getClass.getName.contains("Direct")) { + // SPARK-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry // attempts, the task will fail because the output file is created from a prior attempt. // This often means the most visible error to the user is misleading. Augment the error // to tell the user to look for the actual error. throw new SparkException("The output file already exists but this could be due to a " + "failure from an earlier attempt. Look through the earlier logs or stage page for " + - "the first error.\n File exists error: " + e) + "the first error.\n File exists error: " + e, e) + } else { + throw e } - throw e } } @@ -156,15 +157,6 @@ private[sql] abstract class BaseWriterContainer( s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " + "for appending.") defaultOutputCommitter - } else if (speculationEnabled) { - // When speculation is enabled, it's not safe to use customized output committer classes, - // especially direct output committers (e.g. `DirectParquetOutputCommitter`). - // - // See SPARK-9899 for more details. - logInfo( - s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " + - "because spark.speculation is configured to be true.") - defaultOutputCommitter } else { val configuration = context.getConfiguration val committerClass = configuration.getClass( @@ -255,19 +247,16 @@ private[sql] class DefaultWriterContainer( // If anything below fails, we should abort the task. try { - while (iterator.hasNext) { - val internalRow = iterator.next() - writer.writeInternal(internalRow) - } - - commitTask() + Utils.tryWithSafeFinallyAndFailureCallbacks { + while (iterator.hasNext) { + val internalRow = iterator.next() + writer.writeInternal(internalRow) + } + commitTask() + }(catchBlock = abortTask()) } catch { - case cause: Throwable => - logError("Aborting task.", cause) - // call failure callbacks first, so we could have a chance to cleanup the writer. - TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause) - abortTask() - throw new SparkException("Task failed while writing rows.", cause) + case t: Throwable => + throw new SparkException("Task failed while writing rows", t) } def commitTask(): Unit = { @@ -421,37 +410,37 @@ private[sql] class DynamicPartitionWriterContainer( // If anything below fails, we should abort the task. var currentWriter: OutputWriter = null try { - var currentKey: UnsafeRow = null - while (sortedIterator.next()) { - val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow] - if (currentKey != nextKey) { - if (currentWriter != null) { - currentWriter.close() - currentWriter = null + Utils.tryWithSafeFinallyAndFailureCallbacks { + var currentKey: UnsafeRow = null + while (sortedIterator.next()) { + val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow] + if (currentKey != nextKey) { + if (currentWriter != null) { + currentWriter.close() + currentWriter = null + } + currentKey = nextKey.copy() + logDebug(s"Writing partition: $currentKey") + + currentWriter = newOutputWriter(currentKey, getPartitionString) } - currentKey = nextKey.copy() - logDebug(s"Writing partition: $currentKey") - - currentWriter = newOutputWriter(currentKey, getPartitionString) + currentWriter.writeInternal(sortedIterator.getValue) + } + if (currentWriter != null) { + currentWriter.close() + currentWriter = null } - currentWriter.writeInternal(sortedIterator.getValue) - } - if (currentWriter != null) { - currentWriter.close() - currentWriter = null - } - commitTask() - } catch { - case cause: Throwable => - logError("Aborting task.", cause) - // call failure callbacks first, so we could have a chance to cleanup the writer. - TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause) + commitTask() + }(catchBlock = { if (currentWriter != null) { currentWriter.close() } abortTask() - throw new SparkException("Task failed while writing rows.", cause) + }) + } catch { + case t: Throwable => + throw new SparkException("Task failed while writing rows", t) } } } |