aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala89
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala83
2 files changed, 136 insertions, 36 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 215e53c020..fb6173f58e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -96,7 +96,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- val doInsertion = (mode, fs.exists(qualifiedOutputPath)) match {
+ val pathExists = fs.exists(qualifiedOutputPath)
+ val doInsertion = (mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
sys.error(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
@@ -107,6 +108,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
case (SaveMode.Ignore, exists) =>
!exists
}
+ // If we are appending data to an existing dir.
+ val isAppend = (pathExists) && (mode == SaveMode.Append)
if (doInsertion) {
val job = new Job(hadoopConf)
@@ -130,10 +133,10 @@ private[sql] case class InsertIntoHadoopFsRelation(
val partitionColumns = relation.partitionColumns.fieldNames
if (partitionColumns.isEmpty) {
- insert(new DefaultWriterContainer(relation, job), df)
+ insert(new DefaultWriterContainer(relation, job, isAppend), df)
} else {
val writerContainer = new DynamicPartitionWriterContainer(
- relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME)
+ relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
}
}
@@ -277,7 +280,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
private[sql] abstract class BaseWriterContainer(
@transient val relation: HadoopFsRelation,
- @transient job: Job)
+ @transient job: Job,
+ isAppend: Boolean)
extends SparkHadoopMapReduceUtil
with Logging
with Serializable {
@@ -356,34 +360,47 @@ private[sql] abstract class BaseWriterContainer(
}
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
- val committerClass = context.getConfiguration.getClass(
- SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
-
- Option(committerClass).map { clazz =>
- logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
-
- // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
- // has an associated output committer. To override this output committer,
- // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
- // If a data source needs to override the output committer, it needs to set the
- // output committer in prepareForWrite method.
- if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
- // The specified output committer is a FileOutputCommitter.
- // So, we will use the FileOutputCommitter-specified constructor.
- val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
- ctor.newInstance(new Path(outputPath), context)
- } else {
- // The specified output committer is just a OutputCommitter.
- // So, we will use the no-argument constructor.
- val ctor = clazz.getDeclaredConstructor()
- ctor.newInstance()
+ val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
+
+ if (isAppend) {
+ // If we are appending data to an existing dir, we will only use the output committer
+ // associated with the file output format since it is not safe to use a custom
+ // committer for appending. For example, in S3, direct parquet output committer may
+ // leave partial data in the destination dir when the the appending job fails.
+ logInfo(
+ s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
+ "for appending.")
+ defaultOutputCommitter
+ } else {
+ val committerClass = context.getConfiguration.getClass(
+ SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
+
+ Option(committerClass).map { clazz =>
+ logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
+
+ // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
+ // has an associated output committer. To override this output committer,
+ // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
+ // If a data source needs to override the output committer, it needs to set the
+ // output committer in prepareForWrite method.
+ if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
+ // The specified output committer is a FileOutputCommitter.
+ // So, we will use the FileOutputCommitter-specified constructor.
+ val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+ ctor.newInstance(new Path(outputPath), context)
+ } else {
+ // The specified output committer is just a OutputCommitter.
+ // So, we will use the no-argument constructor.
+ val ctor = clazz.getDeclaredConstructor()
+ ctor.newInstance()
+ }
+ }.getOrElse {
+ // If output committer class is not set, we will use the one associated with the
+ // file output format.
+ logInfo(
+ s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
+ defaultOutputCommitter
}
- }.getOrElse {
- // If output committer class is not set, we will use the one associated with the
- // file output format.
- val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
- logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}")
- outputCommitter
}
}
@@ -433,8 +450,9 @@ private[sql] abstract class BaseWriterContainer(
private[sql] class DefaultWriterContainer(
@transient relation: HadoopFsRelation,
- @transient job: Job)
- extends BaseWriterContainer(relation, job) {
+ @transient job: Job,
+ isAppend: Boolean)
+ extends BaseWriterContainer(relation, job, isAppend) {
@transient private var writer: OutputWriter = _
@@ -473,8 +491,9 @@ private[sql] class DynamicPartitionWriterContainer(
@transient relation: HadoopFsRelation,
@transient job: Job,
partitionColumns: Array[String],
- defaultPartitionName: String)
- extends BaseWriterContainer(relation, job) {
+ defaultPartitionName: String,
+ isAppend: Boolean)
+ extends BaseWriterContainer(relation, job, isAppend) {
// All output writers are created on executor side.
@transient protected var outputWriters: mutable.Map[String, OutputWriter] = _
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index e0d8277a8e..a16ab3a00d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -17,10 +17,16 @@
package org.apache.spark.sql.sources
+import scala.collection.JavaConversions._
+
import java.io.File
import com.google.common.io.Files
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
@@ -476,7 +482,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
// more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this
// requirement. We probably want to move this test case to spark-integration-tests or spark-perf
// later.
- test("SPARK-8406: Avoids name collision while writing Parquet files") {
+ test("SPARK-8406: Avoids name collision while writing files") {
withTempPath { dir =>
val path = dir.getCanonicalPath
sqlContext
@@ -497,6 +503,81 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
}
}
}
+
+ test("SPARK-8578 specified custom output committer will not be used to append data") {
+ val clonedConf = new Configuration(configuration)
+ try {
+ val df = sqlContext.range(1, 10).toDF("i")
+ withTempPath { dir =>
+ df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+ configuration.set(
+ SQLConf.OUTPUT_COMMITTER_CLASS.key,
+ classOf[AlwaysFailOutputCommitter].getName)
+ // Since Parquet has its own output committer setting, also set it
+ // to AlwaysFailParquetOutputCommitter at here.
+ configuration.set("spark.sql.parquet.output.committer.class",
+ classOf[AlwaysFailParquetOutputCommitter].getName)
+ // Because there data already exists,
+ // this append should succeed because we will use the output committer associated
+ // with file format and AlwaysFailOutputCommitter will not be used.
+ df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+ checkAnswer(
+ sqlContext.read
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .load(dir.getCanonicalPath),
+ df.unionAll(df))
+
+ // This will fail because AlwaysFailOutputCommitter is used when we do append.
+ intercept[Exception] {
+ df.write.mode("overwrite").format(dataSourceName).save(dir.getCanonicalPath)
+ }
+ }
+ withTempPath { dir =>
+ configuration.set(
+ SQLConf.OUTPUT_COMMITTER_CLASS.key,
+ classOf[AlwaysFailOutputCommitter].getName)
+ // Since Parquet has its own output committer setting, also set it
+ // to AlwaysFailParquetOutputCommitter at here.
+ configuration.set("spark.sql.parquet.output.committer.class",
+ classOf[AlwaysFailParquetOutputCommitter].getName)
+ // Because there is no existing data,
+ // this append will fail because AlwaysFailOutputCommitter is used when we do append
+ // and there is no existing data.
+ intercept[Exception] {
+ df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+ }
+ }
+ } finally {
+ // Hadoop 1 doesn't have `Configuration.unset`
+ configuration.clear()
+ clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+ }
+ }
+}
+
+// This class is used to test SPARK-8578. We should not use any custom output committer when
+// we actually append data to an existing dir.
+class AlwaysFailOutputCommitter(
+ outputPath: Path,
+ context: TaskAttemptContext)
+ extends FileOutputCommitter(outputPath, context) {
+
+ override def commitJob(context: JobContext): Unit = {
+ sys.error("Intentional job commitment failure for testing purpose.")
+ }
+}
+
+// This class is used to test SPARK-8578. We should not use any custom output committer when
+// we actually append data to an existing dir.
+class AlwaysFailParquetOutputCommitter(
+ outputPath: Path,
+ context: TaskAttemptContext)
+ extends ParquetOutputCommitter(outputPath, context) {
+
+ override def commitJob(context: JobContext): Unit = {
+ sys.error("Intentional job commitment failure for testing purpose.")
+ }
}
class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {