aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-11-20 18:31:02 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-20 18:31:02 -0800
commit02ec058efe24348cdd3691b55942e6f0ef138732 (patch)
treeaeb665a9e313f6dfe9de73329987c26762537c8f /sql/hive
parent84d79ee9ec47465269f7b0a7971176da93c96f3f (diff)
downloadspark-02ec058efe24348cdd3691b55942e6f0ef138732.tar.gz
spark-02ec058efe24348cdd3691b55942e6f0ef138732.tar.bz2
spark-02ec058efe24348cdd3691b55942e6f0ef138732.zip
[SPARK-4413][SQL] Parquet support through datasource API
Goals: - Support for accessing parquet using SQL but not requiring Hive (thus allowing support of parquet tables with decimal columns) - Support for folder based partitioning with automatic discovery of available partitions - Caching of file metadata See scaladoc of `ParquetRelation2` for more details. Author: Michael Armbrust <michael@databricks.com> Closes #3269 from marmbrus/newParquet and squashes the following commits: 1dd75f1 [Michael Armbrust] Pass all paths for FileInputFormat at once. 645768b [Michael Armbrust] Review comments. abd8e2f [Michael Armbrust] Alternative implementation of parquet based on the datasources API. 938019e [Michael Armbrust] Add an experimental interface to data sources that exposes catalyst expressions. e9d2641 [Michael Armbrust] logging / formatting improvements.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala (renamed from sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala)178
1 files changed, 112 insertions, 66 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index cc65242c0d..7159ebd035 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -34,71 +34,52 @@ case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
/**
- * Tests for our SerDe -> Native parquet scan conversion.
+ * A suite to test the automatic conversion of metastore tables with parquet data to use the
+ * built in parquet support.
*/
-class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
+class ParquetMetastoreSuite extends ParquetTest {
override def beforeAll(): Unit = {
- val partitionedTableDir = File.createTempFile("parquettests", "sparksql")
- partitionedTableDir.delete()
- partitionedTableDir.mkdir()
-
- (1 to 10).foreach { p =>
- val partDir = new File(partitionedTableDir, s"p=$p")
- sparkContext.makeRDD(1 to 10)
- .map(i => ParquetData(i, s"part-$p"))
- .saveAsParquetFile(partDir.getCanonicalPath)
- }
-
- val partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql")
- partitionedTableDirWithKey.delete()
- partitionedTableDirWithKey.mkdir()
-
- (1 to 10).foreach { p =>
- val partDir = new File(partitionedTableDirWithKey, s"p=$p")
- sparkContext.makeRDD(1 to 10)
- .map(i => ParquetDataWithKey(p, i, s"part-$p"))
- .saveAsParquetFile(partDir.getCanonicalPath)
- }
+ super.beforeAll()
sql(s"""
- create external table partitioned_parquet
- (
- intField INT,
- stringField STRING
- )
- PARTITIONED BY (p int)
- ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- STORED AS
- INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- location '${partitionedTableDir.getCanonicalPath}'
+ create external table partitioned_parquet
+ (
+ intField INT,
+ stringField STRING
+ )
+ PARTITIONED BY (p int)
+ ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ STORED AS
+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ location '${partitionedTableDir.getCanonicalPath}'
""")
sql(s"""
- create external table partitioned_parquet_with_key
- (
- intField INT,
- stringField STRING
- )
- PARTITIONED BY (p int)
- ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- STORED AS
- INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- location '${partitionedTableDirWithKey.getCanonicalPath}'
+ create external table partitioned_parquet_with_key
+ (
+ intField INT,
+ stringField STRING
+ )
+ PARTITIONED BY (p int)
+ ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ STORED AS
+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ location '${partitionedTableDirWithKey.getCanonicalPath}'
""")
sql(s"""
- create external table normal_parquet
- (
- intField INT,
- stringField STRING
- )
- ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- STORED AS
- INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- location '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
+ create external table normal_parquet
+ (
+ intField INT,
+ stringField STRING
+ )
+ ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ STORED AS
+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ location '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
""")
(1 to 10).foreach { p =>
@@ -116,6 +97,82 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
setConf("spark.sql.hive.convertMetastoreParquet", "false")
}
+ test("conversion is working") {
+ assert(
+ sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
+ case _: HiveTableScan => true
+ }.isEmpty)
+ assert(
+ sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
+ case _: ParquetTableScan => true
+ }.nonEmpty)
+ }
+}
+
+/**
+ * A suite of tests for the Parquet support through the data sources API.
+ */
+class ParquetSourceSuite extends ParquetTest {
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ sql( s"""
+ create temporary table partitioned_parquet
+ USING org.apache.spark.sql.parquet
+ OPTIONS (
+ path '${partitionedTableDir.getCanonicalPath}'
+ )
+ """)
+
+ sql( s"""
+ create temporary table partitioned_parquet_with_key
+ USING org.apache.spark.sql.parquet
+ OPTIONS (
+ path '${partitionedTableDirWithKey.getCanonicalPath}'
+ )
+ """)
+
+ sql( s"""
+ create temporary table normal_parquet
+ USING org.apache.spark.sql.parquet
+ OPTIONS (
+ path '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
+ )
+ """)
+ }
+}
+
+/**
+ * A collection of tests for parquet data with various forms of partitioning.
+ */
+abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
+ var partitionedTableDir: File = null
+ var partitionedTableDirWithKey: File = null
+
+ override def beforeAll(): Unit = {
+ partitionedTableDir = File.createTempFile("parquettests", "sparksql")
+ partitionedTableDir.delete()
+ partitionedTableDir.mkdir()
+
+ (1 to 10).foreach { p =>
+ val partDir = new File(partitionedTableDir, s"p=$p")
+ sparkContext.makeRDD(1 to 10)
+ .map(i => ParquetData(i, s"part-$p"))
+ .saveAsParquetFile(partDir.getCanonicalPath)
+ }
+
+ partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql")
+ partitionedTableDirWithKey.delete()
+ partitionedTableDirWithKey.mkdir()
+
+ (1 to 10).foreach { p =>
+ val partDir = new File(partitionedTableDirWithKey, s"p=$p")
+ sparkContext.makeRDD(1 to 10)
+ .map(i => ParquetDataWithKey(p, i, s"part-$p"))
+ .saveAsParquetFile(partDir.getCanonicalPath)
+ }
+ }
+
Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
test(s"project the partitioning column $table") {
checkAnswer(
@@ -193,15 +250,4 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
sql("SELECT COUNT(*) FROM normal_parquet"),
10)
}
-
- test("conversion is working") {
- assert(
- sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
- case _: HiveTableScan => true
- }.isEmpty)
- assert(
- sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
- case _: ParquetTableScan => true
- }.nonEmpty)
- }
}