aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala3
2 files changed, 22 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 1b59b19d94..ad55367258 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -124,6 +124,24 @@ private[sql] abstract class BaseWriterContainer(
}
}
+ protected def newOutputWriter(path: String): OutputWriter = {
+ try {
+ outputWriterFactory.newInstance(path, dataSchema, taskAttemptContext)
+ } catch {
+ case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
+ if (outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) {
+ // Spark-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry
+ // attempts, the task will fail because the output file is created from a prior attempt.
+ // This often means the most visible error to the user is misleading. Augment the error
+ // to tell the user to look for the actual error.
+ throw new SparkException("The output file already exists but this could be due to a " +
+ "failure from an earlier attempt. Look through the earlier logs or stage page for " +
+ "the first error.\n File exists error: " + e)
+ }
+ throw e
+ }
+ }
+
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
@@ -234,7 +252,7 @@ private[sql] class DefaultWriterContainer(
executorSideSetup(taskContext)
val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
configuration.set("spark.sql.sources.output.path", outputPath)
- val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
+ val writer = newOutputWriter(getWorkPath)
writer.initConverter(dataSchema)
var writerClosed = false
@@ -403,7 +421,7 @@ private[sql] class DynamicPartitionWriterContainer(
val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
configuration.set(
"spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
- val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
+ val newWriter = super.newOutputWriter(path.toString)
newWriter.initConverter(dataSchema)
newWriter
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
index 300e8677b3..1a4e99ff10 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
@@ -41,7 +41,8 @@ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetO
* no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are
* left empty).
*/
-private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
+private[datasources] class DirectParquetOutputCommitter(
+ outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {
val LOG = Log.getLog(classOf[ParquetOutputCommitter])