aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala89
1 files changed, 54 insertions, 35 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 215e53c020..fb6173f58e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -96,7 +96,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- val doInsertion = (mode, fs.exists(qualifiedOutputPath)) match {
+ val pathExists = fs.exists(qualifiedOutputPath)
+ val doInsertion = (mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
sys.error(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
@@ -107,6 +108,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
case (SaveMode.Ignore, exists) =>
!exists
}
+ // If we are appending data to an existing dir.
+ val isAppend = (pathExists) && (mode == SaveMode.Append)
if (doInsertion) {
val job = new Job(hadoopConf)
@@ -130,10 +133,10 @@ private[sql] case class InsertIntoHadoopFsRelation(
val partitionColumns = relation.partitionColumns.fieldNames
if (partitionColumns.isEmpty) {
- insert(new DefaultWriterContainer(relation, job), df)
+ insert(new DefaultWriterContainer(relation, job, isAppend), df)
} else {
val writerContainer = new DynamicPartitionWriterContainer(
- relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME)
+ relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
}
}
@@ -277,7 +280,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
private[sql] abstract class BaseWriterContainer(
@transient val relation: HadoopFsRelation,
- @transient job: Job)
+ @transient job: Job,
+ isAppend: Boolean)
extends SparkHadoopMapReduceUtil
with Logging
with Serializable {
@@ -356,34 +360,47 @@ private[sql] abstract class BaseWriterContainer(
}
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
- val committerClass = context.getConfiguration.getClass(
- SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
-
- Option(committerClass).map { clazz =>
- logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
-
- // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
- // has an associated output committer. To override this output committer,
- // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
- // If a data source needs to override the output committer, it needs to set the
- // output committer in prepareForWrite method.
- if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
- // The specified output committer is a FileOutputCommitter.
- // So, we will use the FileOutputCommitter-specified constructor.
- val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
- ctor.newInstance(new Path(outputPath), context)
- } else {
- // The specified output committer is just a OutputCommitter.
- // So, we will use the no-argument constructor.
- val ctor = clazz.getDeclaredConstructor()
- ctor.newInstance()
+ val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
+
+ if (isAppend) {
+ // If we are appending data to an existing dir, we will only use the output committer
+ // associated with the file output format since it is not safe to use a custom
+ // committer for appending. For example, in S3, direct parquet output committer may
+ // leave partial data in the destination dir when the the appending job fails.
+ logInfo(
+ s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
+ "for appending.")
+ defaultOutputCommitter
+ } else {
+ val committerClass = context.getConfiguration.getClass(
+ SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
+
+ Option(committerClass).map { clazz =>
+ logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
+
+ // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
+ // has an associated output committer. To override this output committer,
+ // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
+ // If a data source needs to override the output committer, it needs to set the
+ // output committer in prepareForWrite method.
+ if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
+ // The specified output committer is a FileOutputCommitter.
+ // So, we will use the FileOutputCommitter-specified constructor.
+ val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+ ctor.newInstance(new Path(outputPath), context)
+ } else {
+ // The specified output committer is just a OutputCommitter.
+ // So, we will use the no-argument constructor.
+ val ctor = clazz.getDeclaredConstructor()
+ ctor.newInstance()
+ }
+ }.getOrElse {
+ // If output committer class is not set, we will use the one associated with the
+ // file output format.
+ logInfo(
+ s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
+ defaultOutputCommitter
}
- }.getOrElse {
- // If output committer class is not set, we will use the one associated with the
- // file output format.
- val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
- logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}")
- outputCommitter
}
}
@@ -433,8 +450,9 @@ private[sql] abstract class BaseWriterContainer(
private[sql] class DefaultWriterContainer(
@transient relation: HadoopFsRelation,
- @transient job: Job)
- extends BaseWriterContainer(relation, job) {
+ @transient job: Job,
+ isAppend: Boolean)
+ extends BaseWriterContainer(relation, job, isAppend) {
@transient private var writer: OutputWriter = _
@@ -473,8 +491,9 @@ private[sql] class DynamicPartitionWriterContainer(
@transient relation: HadoopFsRelation,
@transient job: Job,
partitionColumns: Array[String],
- defaultPartitionName: String)
- extends BaseWriterContainer(relation, job) {
+ defaultPartitionName: String,
+ isAppend: Boolean)
+ extends BaseWriterContainer(relation, job, isAppend) {
// All output writers are created on executor side.
@transient protected var outputWriters: mutable.Map[String, OutputWriter] = _