diff options
author | Yin Huai <yhuai@databricks.com> | 2015-06-24 09:50:03 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-06-24 09:50:03 -0700 |
commit | bba6699d0e9093bc041a9a33dd31992790f32174 (patch) | |
tree | 0531f84b232f250709e640c2d0d0f6d6a97a7bc3 /sql/core | |
parent | 9d36ec24312f0a9865b4392f89e9611a5b80916d (diff) | |
download | spark-bba6699d0e9093bc041a9a33dd31992790f32174.tar.gz spark-bba6699d0e9093bc041a9a33dd31992790f32174.tar.bz2 spark-bba6699d0e9093bc041a9a33dd31992790f32174.zip |
[SPARK-8578] [SQL] Should ignore user defined output committer when appending data
https://issues.apache.org/jira/browse/SPARK-8578
It is not very safe to use a custom output committer when append data to an existing dir. This changes adds the logic to check if we are appending data, and if so, we use the output committer associated with the file output format.
Author: Yin Huai <yhuai@databricks.com>
Closes #6964 from yhuai/SPARK-8578 and squashes the following commits:
43544c4 [Yin Huai] Do not use a custom output commiter when appendiing data.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala | 89 |
1 files changed, 54 insertions, 35 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 215e53c020..fb6173f58e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -96,7 +96,8 @@ private[sql] case class InsertIntoHadoopFsRelation( val fs = outputPath.getFileSystem(hadoopConf) val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val doInsertion = (mode, fs.exists(qualifiedOutputPath)) match { + val pathExists = fs.exists(qualifiedOutputPath) + val doInsertion = (mode, pathExists) match { case (SaveMode.ErrorIfExists, true) => sys.error(s"path $qualifiedOutputPath already exists.") case (SaveMode.Overwrite, true) => @@ -107,6 +108,8 @@ private[sql] case class InsertIntoHadoopFsRelation( case (SaveMode.Ignore, exists) => !exists } + // If we are appending data to an existing dir. + val isAppend = (pathExists) && (mode == SaveMode.Append) if (doInsertion) { val job = new Job(hadoopConf) @@ -130,10 +133,10 @@ private[sql] case class InsertIntoHadoopFsRelation( val partitionColumns = relation.partitionColumns.fieldNames if (partitionColumns.isEmpty) { - insert(new DefaultWriterContainer(relation, job), df) + insert(new DefaultWriterContainer(relation, job, isAppend), df) } else { val writerContainer = new DynamicPartitionWriterContainer( - relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME) + relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend) insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns) } } @@ -277,7 +280,8 @@ private[sql] case class InsertIntoHadoopFsRelation( private[sql] abstract class BaseWriterContainer( @transient val relation: HadoopFsRelation, - @transient job: Job) + @transient job: Job, + isAppend: Boolean) extends SparkHadoopMapReduceUtil with Logging with Serializable { @@ -356,34 +360,47 @@ private[sql] abstract class BaseWriterContainer( } private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = { - val committerClass = context.getConfiguration.getClass( - SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) - - Option(committerClass).map { clazz => - logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") - - // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat - // has an associated output committer. To override this output committer, - // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. - // If a data source needs to override the output committer, it needs to set the - // output committer in prepareForWrite method. - if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) { - // The specified output committer is a FileOutputCommitter. - // So, we will use the FileOutputCommitter-specified constructor. - val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) - ctor.newInstance(new Path(outputPath), context) - } else { - // The specified output committer is just a OutputCommitter. - // So, we will use the no-argument constructor. - val ctor = clazz.getDeclaredConstructor() - ctor.newInstance() + val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context) + + if (isAppend) { + // If we are appending data to an existing dir, we will only use the output committer + // associated with the file output format since it is not safe to use a custom + // committer for appending. For example, in S3, direct parquet output committer may + // leave partial data in the destination dir when the the appending job fails. + logInfo( + s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " + + "for appending.") + defaultOutputCommitter + } else { + val committerClass = context.getConfiguration.getClass( + SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) + + Option(committerClass).map { clazz => + logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") + + // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat + // has an associated output committer. To override this output committer, + // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. + // If a data source needs to override the output committer, it needs to set the + // output committer in prepareForWrite method. + if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) { + // The specified output committer is a FileOutputCommitter. + // So, we will use the FileOutputCommitter-specified constructor. + val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) + ctor.newInstance(new Path(outputPath), context) + } else { + // The specified output committer is just a OutputCommitter. + // So, we will use the no-argument constructor. + val ctor = clazz.getDeclaredConstructor() + ctor.newInstance() + } + }.getOrElse { + // If output committer class is not set, we will use the one associated with the + // file output format. + logInfo( + s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}") + defaultOutputCommitter } - }.getOrElse { - // If output committer class is not set, we will use the one associated with the - // file output format. - val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context) - logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}") - outputCommitter } } @@ -433,8 +450,9 @@ private[sql] abstract class BaseWriterContainer( private[sql] class DefaultWriterContainer( @transient relation: HadoopFsRelation, - @transient job: Job) - extends BaseWriterContainer(relation, job) { + @transient job: Job, + isAppend: Boolean) + extends BaseWriterContainer(relation, job, isAppend) { @transient private var writer: OutputWriter = _ @@ -473,8 +491,9 @@ private[sql] class DynamicPartitionWriterContainer( @transient relation: HadoopFsRelation, @transient job: Job, partitionColumns: Array[String], - defaultPartitionName: String) - extends BaseWriterContainer(relation, job) { + defaultPartitionName: String, + isAppend: Boolean) + extends BaseWriterContainer(relation, job, isAppend) { // All output writers are created on executor side. @transient protected var outputWriters: mutable.Map[String, OutputWriter] = _ |