aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-02-28 21:15:43 +0800
committerCheng Lian <lian@databricks.com>2015-02-28 21:15:43 +0800
commite6003f0a571ba44fcd011e695c8622e11cfee7dd (patch)
treedf9e1852405e08f2c58f03b215b2a59a919ff5b5 /sql/hive
parent9168259813713a12251fb0d457ffbbed8ba857f8 (diff)
downloadspark-e6003f0a571ba44fcd011e695c8622e11cfee7dd.tar.gz
spark-e6003f0a571ba44fcd011e695c8622e11cfee7dd.tar.bz2
spark-e6003f0a571ba44fcd011e695c8622e11cfee7dd.zip
[SPARK-5775] [SQL] BugFix: GenericRow cannot be cast to SpecificMutableRow when nested data and partitioned table
This PR adapts anselmevignon's #4697 to master and branch-1.3. Please refer to PR description of #4697 for details. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4792) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Author: Cheng Lian <liancheng@users.noreply.github.com> Author: Yin Huai <yhuai@databricks.com> Closes #4792 from liancheng/spark-5775 and squashes the following commits: 538f506 [Cheng Lian] Addresses comments cee55cf [Cheng Lian] Merge pull request #4 from yhuai/spark-5775-yin b0b74fb [Yin Huai] Remove runtime pattern matching. ca6e038 [Cheng Lian] Fixes SPARK-5775
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala134
1 files changed, 131 insertions, 3 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 6a9d9daf67..c8da8eea4e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -36,6 +36,20 @@ case class ParquetData(intField: Int, stringField: String)
// The data that also includes the partitioning key
case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
+case class StructContainer(intStructField :Int, stringStructField: String)
+
+case class ParquetDataWithComplexTypes(
+ intField: Int,
+ stringField: String,
+ structField: StructContainer,
+ arrayField: Seq[Int])
+
+case class ParquetDataWithKeyAndComplexTypes(
+ p: Int,
+ intField: Int,
+ stringField: String,
+ structField: StructContainer,
+ arrayField: Seq[Int])
/**
* A suite to test the automatic conversion of metastore tables with parquet data to use the
@@ -86,6 +100,38 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
location '${new File(normalTableDir, "normal").getCanonicalPath}'
""")
+ sql(s"""
+ CREATE EXTERNAL TABLE partitioned_parquet_with_complextypes
+ (
+ intField INT,
+ stringField STRING,
+ structField STRUCT<intStructField: INT, stringStructField: STRING>,
+ arrayField ARRAY<INT>
+ )
+ PARTITIONED BY (p int)
+ ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ STORED AS
+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ LOCATION '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
+ """)
+
+ sql(s"""
+ CREATE EXTERNAL TABLE partitioned_parquet_with_key_and_complextypes
+ (
+ intField INT,
+ stringField STRING,
+ structField STRUCT<intStructField: INT, stringStructField: STRING>,
+ arrayField ARRAY<INT>
+ )
+ PARTITIONED BY (p int)
+ ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ STORED AS
+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ LOCATION '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
+ """)
+
(1 to 10).foreach { p =>
sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
}
@@ -94,7 +140,15 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
}
- val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+ (1 to 10).foreach { p =>
+ sql(s"ALTER TABLE partitioned_parquet_with_key_and_complextypes ADD PARTITION (p=$p)")
+ }
+
+ (1 to 10).foreach { p =>
+ sql(s"ALTER TABLE partitioned_parquet_with_complextypes ADD PARTITION (p=$p)")
+ }
+
+ val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
jsonRDD(rdd1).registerTempTable("jt")
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}"""))
jsonRDD(rdd2).registerTempTable("jt_array")
@@ -105,6 +159,8 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
override def afterAll(): Unit = {
sql("DROP TABLE partitioned_parquet")
sql("DROP TABLE partitioned_parquet_with_key")
+ sql("DROP TABLE partitioned_parquet_with_complextypes")
+ sql("DROP TABLE partitioned_parquet_with_key_and_complextypes")
sql("DROP TABLE normal_parquet")
sql("DROP TABLE IF EXISTS jt")
sql("DROP TABLE IF EXISTS jt_array")
@@ -409,6 +465,22 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
path '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
)
""")
+
+ sql( s"""
+ CREATE TEMPORARY TABLE partitioned_parquet_with_key_and_complextypes
+ USING org.apache.spark.sql.parquet
+ OPTIONS (
+ path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
+ )
+ """)
+
+ sql( s"""
+ CREATE TEMPORARY TABLE partitioned_parquet_with_complextypes
+ USING org.apache.spark.sql.parquet
+ OPTIONS (
+ path '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
+ )
+ """)
}
test("SPARK-6016 make sure to use the latest footers") {
@@ -473,7 +545,8 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
var partitionedTableDir: File = null
var normalTableDir: File = null
var partitionedTableDirWithKey: File = null
-
+ var partitionedTableDirWithComplexTypes: File = null
+ var partitionedTableDirWithKeyAndComplexTypes: File = null
override def beforeAll(): Unit = {
partitionedTableDir = File.createTempFile("parquettests", "sparksql")
@@ -509,9 +582,45 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
.toDF()
.saveAsParquetFile(partDir.getCanonicalPath)
}
+
+ partitionedTableDirWithKeyAndComplexTypes = File.createTempFile("parquettests", "sparksql")
+ partitionedTableDirWithKeyAndComplexTypes.delete()
+ partitionedTableDirWithKeyAndComplexTypes.mkdir()
+
+ (1 to 10).foreach { p =>
+ val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p")
+ sparkContext.makeRDD(1 to 10).map { i =>
+ ParquetDataWithKeyAndComplexTypes(
+ p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
+ }.toDF().saveAsParquetFile(partDir.getCanonicalPath)
+ }
+
+ partitionedTableDirWithComplexTypes = File.createTempFile("parquettests", "sparksql")
+ partitionedTableDirWithComplexTypes.delete()
+ partitionedTableDirWithComplexTypes.mkdir()
+
+ (1 to 10).foreach { p =>
+ val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p")
+ sparkContext.makeRDD(1 to 10).map { i =>
+ ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
+ }.toDF().saveAsParquetFile(partDir.getCanonicalPath)
+ }
+ }
+
+ override protected def afterAll(): Unit = {
+ partitionedTableDir.delete()
+ normalTableDir.delete()
+ partitionedTableDirWithKey.delete()
+ partitionedTableDirWithComplexTypes.delete()
+ partitionedTableDirWithKeyAndComplexTypes.delete()
}
- Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
+ Seq(
+ "partitioned_parquet",
+ "partitioned_parquet_with_key",
+ "partitioned_parquet_with_complextypes",
+ "partitioned_parquet_with_key_and_complextypes").foreach { table =>
+
test(s"ordering of the partitioning columns $table") {
checkAnswer(
sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
@@ -601,6 +710,25 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
}
}
+ Seq(
+ "partitioned_parquet_with_key_and_complextypes",
+ "partitioned_parquet_with_complextypes").foreach { table =>
+
+ test(s"SPARK-5775 read struct from $table") {
+ checkAnswer(
+ sql(s"SELECT p, structField.intStructField, structField.stringStructField FROM $table WHERE p = 1"),
+ (1 to 10).map(i => Row(1, i, f"${i}_string")))
+ }
+
+ // Re-enable this after SPARK-5508 is fixed
+ ignore(s"SPARK-5775 read array from $table") {
+ checkAnswer(
+ sql(s"SELECT arrayField, p FROM $table WHERE p = 1"),
+ (1 to 10).map(i => Row(1 to i, 1)))
+ }
+ }
+
+
test("non-part select(*)") {
checkAnswer(
sql("SELECT COUNT(*) FROM normal_parquet"),