aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-06-24 09:50:03 -0700
committerYin Huai <yhuai@databricks.com>2015-06-24 09:50:03 -0700
commitbba6699d0e9093bc041a9a33dd31992790f32174 (patch)
tree0531f84b232f250709e640c2d0d0f6d6a97a7bc3 /sql/core
parent9d36ec24312f0a9865b4392f89e9611a5b80916d (diff)
downloadspark-bba6699d0e9093bc041a9a33dd31992790f32174.tar.gz
spark-bba6699d0e9093bc041a9a33dd31992790f32174.tar.bz2
spark-bba6699d0e9093bc041a9a33dd31992790f32174.zip
[SPARK-8578] [SQL] Should ignore user defined output committer when appending data
https://issues.apache.org/jira/browse/SPARK-8578 It is not very safe to use a custom output committer when append data to an existing dir. This changes adds the logic to check if we are appending data, and if so, we use the output committer associated with the file output format. Author: Yin Huai <yhuai@databricks.com> Closes #6964 from yhuai/SPARK-8578 and squashes the following commits: 43544c4 [Yin Huai] Do not use a custom output commiter when appendiing data.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala89
1 files changed, 54 insertions, 35 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 215e53c020..fb6173f58e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -96,7 +96,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- val doInsertion = (mode, fs.exists(qualifiedOutputPath)) match {
+ val pathExists = fs.exists(qualifiedOutputPath)
+ val doInsertion = (mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
sys.error(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
@@ -107,6 +108,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
case (SaveMode.Ignore, exists) =>
!exists
}
+ // If we are appending data to an existing dir.
+ val isAppend = (pathExists) && (mode == SaveMode.Append)
if (doInsertion) {
val job = new Job(hadoopConf)
@@ -130,10 +133,10 @@ private[sql] case class InsertIntoHadoopFsRelation(
val partitionColumns = relation.partitionColumns.fieldNames
if (partitionColumns.isEmpty) {
- insert(new DefaultWriterContainer(relation, job), df)
+ insert(new DefaultWriterContainer(relation, job, isAppend), df)
} else {
val writerContainer = new DynamicPartitionWriterContainer(
- relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME)
+ relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
}
}
@@ -277,7 +280,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
private[sql] abstract class BaseWriterContainer(
@transient val relation: HadoopFsRelation,
- @transient job: Job)
+ @transient job: Job,
+ isAppend: Boolean)
extends SparkHadoopMapReduceUtil
with Logging
with Serializable {
@@ -356,34 +360,47 @@ private[sql] abstract class BaseWriterContainer(
}
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
- val committerClass = context.getConfiguration.getClass(
- SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
-
- Option(committerClass).map { clazz =>
- logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
-
- // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
- // has an associated output committer. To override this output committer,
- // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
- // If a data source needs to override the output committer, it needs to set the
- // output committer in prepareForWrite method.
- if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
- // The specified output committer is a FileOutputCommitter.
- // So, we will use the FileOutputCommitter-specified constructor.
- val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
- ctor.newInstance(new Path(outputPath), context)
- } else {
- // The specified output committer is just a OutputCommitter.
- // So, we will use the no-argument constructor.
- val ctor = clazz.getDeclaredConstructor()
- ctor.newInstance()
+ val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
+
+ if (isAppend) {
+ // If we are appending data to an existing dir, we will only use the output committer
+ // associated with the file output format since it is not safe to use a custom
+ // committer for appending. For example, in S3, direct parquet output committer may
+ // leave partial data in the destination dir when the the appending job fails.
+ logInfo(
+ s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
+ "for appending.")
+ defaultOutputCommitter
+ } else {
+ val committerClass = context.getConfiguration.getClass(
+ SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
+
+ Option(committerClass).map { clazz =>
+ logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
+
+ // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
+ // has an associated output committer. To override this output committer,
+ // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
+ // If a data source needs to override the output committer, it needs to set the
+ // output committer in prepareForWrite method.
+ if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
+ // The specified output committer is a FileOutputCommitter.
+ // So, we will use the FileOutputCommitter-specified constructor.
+ val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+ ctor.newInstance(new Path(outputPath), context)
+ } else {
+ // The specified output committer is just a OutputCommitter.
+ // So, we will use the no-argument constructor.
+ val ctor = clazz.getDeclaredConstructor()
+ ctor.newInstance()
+ }
+ }.getOrElse {
+ // If output committer class is not set, we will use the one associated with the
+ // file output format.
+ logInfo(
+ s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
+ defaultOutputCommitter
}
- }.getOrElse {
- // If output committer class is not set, we will use the one associated with the
- // file output format.
- val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
- logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}")
- outputCommitter
}
}
@@ -433,8 +450,9 @@ private[sql] abstract class BaseWriterContainer(
private[sql] class DefaultWriterContainer(
@transient relation: HadoopFsRelation,
- @transient job: Job)
- extends BaseWriterContainer(relation, job) {
+ @transient job: Job,
+ isAppend: Boolean)
+ extends BaseWriterContainer(relation, job, isAppend) {
@transient private var writer: OutputWriter = _
@@ -473,8 +491,9 @@ private[sql] class DynamicPartitionWriterContainer(
@transient relation: HadoopFsRelation,
@transient job: Job,
partitionColumns: Array[String],
- defaultPartitionName: String)
- extends BaseWriterContainer(relation, job) {
+ defaultPartitionName: String,
+ isAppend: Boolean)
+ extends BaseWriterContainer(relation, job, isAppend) {
// All output writers are created on executor side.
@transient protected var outputWriters: mutable.Map[String, OutputWriter] = _