aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-11-18 12:13:23 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-18 12:13:37 -0800
commit047b45800654b4b2605d1347cace28faaa25f521 (patch)
tree5fc408ae58f439c016d4a0738c26295570e5f711 /sql
parent48d601f0bac33583c345b2ceebd30a639a20db4e (diff)
downloadspark-047b45800654b4b2605d1347cace28faaa25f521.tar.gz
spark-047b45800654b4b2605d1347cace28faaa25f521.tar.bz2
spark-047b45800654b4b2605d1347cace28faaa25f521.zip
[SQL] Support partitioned parquet tables that have the key in both the directory and the file
Author: Michael Armbrust <michael@databricks.com> Closes #3272 from marmbrus/keyInPartitionedTable and squashes the following commits: 447f08c [Michael Armbrust] Support partitioned parquet tables that have the key in both the directory and the file (cherry picked from commit 90d72ec8502f7ec11d2fe42f08c884ad2159266f) Signed-off-by: Michael Armbrust <michael@databricks.com>
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala18
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala158
2 files changed, 108 insertions, 68 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 3a49dddd85..56fc85239e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -58,12 +58,18 @@ private[hive] trait HiveStrategies {
def lowerCase =
new SchemaRDD(s.sqlContext, s.logicalPlan)
- def addPartitioningAttributes(attrs: Seq[Attribute]) =
- new SchemaRDD(
- s.sqlContext,
- s.logicalPlan transform {
- case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
- })
+ def addPartitioningAttributes(attrs: Seq[Attribute]) = {
+ // Don't add the partitioning key if its already present in the data.
+ if (attrs.map(_.name).toSet.subsetOf(s.logicalPlan.output.map(_.name).toSet)) {
+ s
+ } else {
+ new SchemaRDD(
+ s.sqlContext,
+ s.logicalPlan transform {
+ case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
+ })
+ }
+ }
}
implicit class PhysicalPlanHacks(originalPlan: SparkPlan) {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
index 86adbbf3ad..cc65242c0d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
@@ -27,7 +27,11 @@ import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive._
+// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
+// The data that also includes the partitioning key
+case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
+
/**
* Tests for our SerDe -> Native parquet scan conversion.
@@ -45,6 +49,17 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
.saveAsParquetFile(partDir.getCanonicalPath)
}
+ val partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql")
+ partitionedTableDirWithKey.delete()
+ partitionedTableDirWithKey.mkdir()
+
+ (1 to 10).foreach { p =>
+ val partDir = new File(partitionedTableDirWithKey, s"p=$p")
+ sparkContext.makeRDD(1 to 10)
+ .map(i => ParquetDataWithKey(p, i, s"part-$p"))
+ .saveAsParquetFile(partDir.getCanonicalPath)
+ }
+
sql(s"""
create external table partitioned_parquet
(
@@ -60,6 +75,20 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
""")
sql(s"""
+ create external table partitioned_parquet_with_key
+ (
+ intField INT,
+ stringField STRING
+ )
+ PARTITIONED BY (p int)
+ ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ STORED AS
+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ location '${partitionedTableDirWithKey.getCanonicalPath}'
+ """)
+
+ sql(s"""
create external table normal_parquet
(
intField INT,
@@ -76,6 +105,10 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
}
+ (1 to 10).foreach { p =>
+ sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
+ }
+
setConf("spark.sql.hive.convertMetastoreParquet", "true")
}
@@ -83,75 +116,76 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
setConf("spark.sql.hive.convertMetastoreParquet", "false")
}
- test("project the partitioning column") {
- checkAnswer(
- sql("SELECT p, count(*) FROM partitioned_parquet group by p"),
- (1, 10) ::
- (2, 10) ::
- (3, 10) ::
- (4, 10) ::
- (5, 10) ::
- (6, 10) ::
- (7, 10) ::
- (8, 10) ::
- (9, 10) ::
- (10, 10) :: Nil
- )
- }
+ Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
+ test(s"project the partitioning column $table") {
+ checkAnswer(
+ sql(s"SELECT p, count(*) FROM $table group by p"),
+ (1, 10) ::
+ (2, 10) ::
+ (3, 10) ::
+ (4, 10) ::
+ (5, 10) ::
+ (6, 10) ::
+ (7, 10) ::
+ (8, 10) ::
+ (9, 10) ::
+ (10, 10) :: Nil
+ )
+ }
- test("project partitioning and non-partitioning columns") {
- checkAnswer(
- sql("SELECT stringField, p, count(intField) " +
- "FROM partitioned_parquet GROUP BY p, stringField"),
- ("part-1", 1, 10) ::
- ("part-2", 2, 10) ::
- ("part-3", 3, 10) ::
- ("part-4", 4, 10) ::
- ("part-5", 5, 10) ::
- ("part-6", 6, 10) ::
- ("part-7", 7, 10) ::
- ("part-8", 8, 10) ::
- ("part-9", 9, 10) ::
- ("part-10", 10, 10) :: Nil
- )
- }
+ test(s"project partitioning and non-partitioning columns $table") {
+ checkAnswer(
+ sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"),
+ ("part-1", 1, 10) ::
+ ("part-2", 2, 10) ::
+ ("part-3", 3, 10) ::
+ ("part-4", 4, 10) ::
+ ("part-5", 5, 10) ::
+ ("part-6", 6, 10) ::
+ ("part-7", 7, 10) ::
+ ("part-8", 8, 10) ::
+ ("part-9", 9, 10) ::
+ ("part-10", 10, 10) :: Nil
+ )
+ }
- test("simple count") {
- checkAnswer(
- sql("SELECT COUNT(*) FROM partitioned_parquet"),
- 100)
- }
+ test(s"simple count $table") {
+ checkAnswer(
+ sql(s"SELECT COUNT(*) FROM $table"),
+ 100)
+ }
- test("pruned count") {
- checkAnswer(
- sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p = 1"),
- 10)
- }
+ test(s"pruned count $table") {
+ checkAnswer(
+ sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
+ 10)
+ }
- test("multi-partition pruned count") {
- checkAnswer(
- sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p IN (1,2,3)"),
- 30)
- }
+ test(s"multi-partition pruned count $table") {
+ checkAnswer(
+ sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
+ 30)
+ }
- test("non-partition predicates") {
- checkAnswer(
- sql("SELECT COUNT(*) FROM partitioned_parquet WHERE intField IN (1,2,3)"),
- 30)
- }
+ test(s"non-partition predicates $table") {
+ checkAnswer(
+ sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
+ 30)
+ }
- test("sum") {
- checkAnswer(
- sql("SELECT SUM(intField) FROM partitioned_parquet WHERE intField IN (1,2,3) AND p = 1"),
- 1 + 2 + 3)
- }
+ test(s"sum $table") {
+ checkAnswer(
+ sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"),
+ 1 + 2 + 3)
+ }
- test("hive udfs") {
- checkAnswer(
- sql("SELECT concat(stringField, stringField) FROM partitioned_parquet"),
- sql("SELECT stringField FROM partitioned_parquet").map {
- case Row(s: String) => Row(s + s)
- }.collect().toSeq)
+ test(s"hive udfs $table") {
+ checkAnswer(
+ sql(s"SELECT concat(stringField, stringField) FROM $table"),
+ sql(s"SELECT stringField FROM $table").map {
+ case Row(s: String) => Row(s + s)
+ }.collect().toSeq)
+ }
}
test("non-part select(*)") {