aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-06-25 00:06:23 -0700
committerCheng Lian <lian@databricks.com>2015-06-25 00:06:23 -0700
commitc337844ed7f9b2cb7b217dc935183ef5e1096ca1 (patch)
treee6b7c881d0335fe9f2c3ec8de0b7fe48272107ea /sql
parent7bac2fe7717c0102b4875dbd95ae0bbf964536e3 (diff)
downloadspark-c337844ed7f9b2cb7b217dc935183ef5e1096ca1.tar.gz
spark-c337844ed7f9b2cb7b217dc935183ef5e1096ca1.tar.bz2
spark-c337844ed7f9b2cb7b217dc935183ef5e1096ca1.zip
[SPARK-8604] [SQL] HadoopFsRelation subclasses should set their output format class
`HadoopFsRelation` subclasses, especially `ParquetRelation2` should set its own output format class, so that the default output committer can be setup correctly when doing appending (where we ignore user defined output committers). Author: Cheng Lian <lian@databricks.com> Closes #6998 from liancheng/spark-8604 and squashes the following commits: 9be51d1 [Cheng Lian] Adds more comments 6db1368 [Cheng Lian] HadoopFsRelation subclasses should set their output format class
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala21
4 files changed, 40 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 1d353bd8e1..bc39fae2bc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -194,6 +194,12 @@ private[sql] class ParquetRelation2(
committerClass,
classOf[ParquetOutputCommitter])
+ // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override
+ // it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why
+ // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is
+ // bundled with `ParquetOutputFormat[Row]`.
+ job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
+
// TODO There's no need to use two kinds of WriteSupport
// We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
// complex types.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 705f48f1cd..0fd7b3a91d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSer
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import org.apache.hadoop.io.{NullWritable, Writable}
-import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, RecordWriter, Reporter}
+import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
@@ -194,6 +194,16 @@ private[sql] class OrcRelation(
}
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
+ job.getConfiguration match {
+ case conf: JobConf =>
+ conf.setOutputFormat(classOf[OrcOutputFormat])
+ case conf =>
+ conf.setClass(
+ "mapred.output.format.class",
+ classOf[OrcOutputFormat],
+ classOf[MapRedOutputFormat[_, _]])
+ }
+
new OutputWriterFactory {
override def newInstance(
path: String,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 5d7cd16c12..e8141923a9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -119,6 +119,8 @@ class SimpleTextRelation(
}
override def prepareJobForWrite(job: Job): OutputWriterFactory = new OutputWriterFactory {
+ job.setOutputFormatClass(classOf[TextOutputFormat[_, _]])
+
override def newInstance(
path: String,
dataSchema: StructType,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index a16ab3a00d..afecf9675e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -719,4 +719,25 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
}
}
}
+
+ test("SPARK-8604: Parquet data source should write summary file while doing appending") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val df = sqlContext.range(0, 5)
+ df.write.mode(SaveMode.Overwrite).parquet(path)
+
+ val summaryPath = new Path(path, "_metadata")
+ val commonSummaryPath = new Path(path, "_common_metadata")
+
+ val fs = summaryPath.getFileSystem(configuration)
+ fs.delete(summaryPath, true)
+ fs.delete(commonSummaryPath, true)
+
+ df.write.mode(SaveMode.Append).parquet(path)
+ checkAnswer(sqlContext.read.parquet(path), df.unionAll(df))
+
+ assert(fs.exists(summaryPath))
+ assert(fs.exists(commonSummaryPath))
+ }
+ }
}