aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-08-24 14:11:19 -0700
committerDavies Liu <davies.liu@gmail.com>2015-08-24 14:11:30 -0700
commitd36f3517c8ddd8f9b5f05d0634dc2d49448200d9 (patch)
tree243f5c4122795f599714b4afa88363bcb8909a90
parent831f78ee5d2deed9b529214b2613c7e972453514 (diff)
downloadspark-d36f3517c8ddd8f9b5f05d0634dc2d49448200d9.tar.gz
spark-d36f3517c8ddd8f9b5f05d0634dc2d49448200d9.tar.bz2
spark-d36f3517c8ddd8f9b5f05d0634dc2d49448200d9.zip
[SPARK-8580] [SQL] Refactors ParquetHiveCompatibilitySuite and adds more test cases
This PR refactors `ParquetHiveCompatibilitySuite` so that it's easier to add new test cases. Hit two bugs, SPARK-10177 and HIVE-11625, while working on this, added test cases for them and marked as ignored for now. SPARK-10177 will be addressed in a separate PR. Author: Cheng Lian <lian@databricks.com> Closes #8392 from liancheng/spark-8580/parquet-hive-compat-tests. (cherry picked from commit a2f4cdceba32aaa0df59df335ca0ce1ac73fc6c2) Signed-off-by: Davies Liu <davies.liu@gmail.com>
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala132
1 files changed, 93 insertions, 39 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
index 13452e71a1..bc30180cf0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
@@ -17,15 +17,17 @@
package org.apache.spark.sql.hive
+import java.sql.Timestamp
+import java.util.{Locale, TimeZone}
+
import org.apache.hadoop.hive.conf.HiveConf
+import org.scalatest.BeforeAndAfterAll
-import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest
+import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
-class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
- import ParquetCompatibilityTest.makeNullable
-
+class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with BeforeAndAfterAll {
override def _sqlContext: SQLContext = TestHive
private val sqlContext = _sqlContext
@@ -35,69 +37,121 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
*/
private val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)
- test("Read Parquet file generated by parquet-hive") {
+ private val originalTimeZone = TimeZone.getDefault
+ private val originalLocale = Locale.getDefault
+
+ protected override def beforeAll(): Unit = {
+ TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
+ Locale.setDefault(Locale.US)
+ }
+
+ override protected def afterAll(): Unit = {
+ TimeZone.setDefault(originalTimeZone)
+ Locale.setDefault(originalLocale)
+ }
+
+ override protected def logParquetSchema(path: String): Unit = {
+ val schema = readParquetSchema(path, { path =>
+ !path.getName.startsWith("_") && !path.getName.startsWith(stagingDir)
+ })
+
+ logInfo(
+ s"""Schema of the Parquet file written by parquet-avro:
+ |$schema
+ """.stripMargin)
+ }
+
+ private def testParquetHiveCompatibility(row: Row, hiveTypes: String*): Unit = {
withTable("parquet_compat") {
withTempPath { dir =>
val path = dir.getCanonicalPath
+ // Hive columns are always nullable, so here we append a all-null row.
+ val rows = row :: Row(Seq.fill(row.length)(null): _*) :: Nil
+
+ // Don't convert Hive metastore Parquet tables to let Hive write those Parquet files.
withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
withTempTable("data") {
- sqlContext.sql(
+ val fields = hiveTypes.zipWithIndex.map { case (typ, index) => s" col_$index $typ" }
+
+ val ddl =
s"""CREATE TABLE parquet_compat(
- | bool_column BOOLEAN,
- | byte_column TINYINT,
- | short_column SMALLINT,
- | int_column INT,
- | long_column BIGINT,
- | float_column FLOAT,
- | double_column DOUBLE,
- |
- | strings_column ARRAY<STRING>,
- | int_to_string_column MAP<INT, STRING>
+ |${fields.mkString(",\n")}
|)
|STORED AS PARQUET
|LOCATION '$path'
+ """.stripMargin
+
+ logInfo(
+ s"""Creating testing Parquet table with the following DDL:
+ |$ddl
""".stripMargin)
+ sqlContext.sql(ddl)
+
val schema = sqlContext.table("parquet_compat").schema
- val rowRDD = sqlContext.sparkContext.parallelize(makeRows).coalesce(1)
+ val rowRDD = sqlContext.sparkContext.parallelize(rows).coalesce(1)
sqlContext.createDataFrame(rowRDD, schema).registerTempTable("data")
sqlContext.sql("INSERT INTO TABLE parquet_compat SELECT * FROM data")
}
}
- val schema = readParquetSchema(path, { path =>
- !path.getName.startsWith("_") && !path.getName.startsWith(stagingDir)
- })
-
- logInfo(
- s"""Schema of the Parquet file written by parquet-hive:
- |$schema
- """.stripMargin)
+ logParquetSchema(path)
// Unfortunately parquet-hive doesn't add `UTF8` annotation to BINARY when writing strings.
// Have to assume all BINARY values are strings here.
withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") {
- checkAnswer(sqlContext.read.parquet(path), makeRows)
+ checkAnswer(sqlContext.read.parquet(path), rows)
}
}
}
}
- def makeRows: Seq[Row] = {
- (0 until 10).map { i =>
- def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i)
+ test("simple primitives") {
+ testParquetHiveCompatibility(
+ Row(true, 1.toByte, 2.toShort, 3, 4.toLong, 5.1f, 6.1d, "foo"),
+ "BOOLEAN", "TINYINT", "SMALLINT", "INT", "BIGINT", "FLOAT", "DOUBLE", "STRING")
+ }
+ ignore("SPARK-10177 timestamp") {
+ testParquetHiveCompatibility(Row(Timestamp.valueOf("2015-08-24 00:31:00")), "TIMESTAMP")
+ }
+
+ test("array") {
+ testParquetHiveCompatibility(
Row(
- nullable(i % 2 == 0: java.lang.Boolean),
- nullable(i.toByte: java.lang.Byte),
- nullable((i + 1).toShort: java.lang.Short),
- nullable(i + 2: Integer),
- nullable(i.toLong * 10: java.lang.Long),
- nullable(i.toFloat + 0.1f: java.lang.Float),
- nullable(i.toDouble + 0.2d: java.lang.Double),
- nullable(Seq.tabulate(3)(n => s"arr_${i + n}")),
- nullable(Seq.tabulate(3)(n => (i + n: Integer) -> s"val_${i + n}").toMap))
- }
+ Seq[Integer](1: Integer, null, 2: Integer, null),
+ Seq[String]("foo", null, "bar", null),
+ Seq[Seq[Integer]](
+ Seq[Integer](1: Integer, null),
+ Seq[Integer](2: Integer, null))),
+ "ARRAY<INT>",
+ "ARRAY<STRING>",
+ "ARRAY<ARRAY<INT>>")
+ }
+
+ test("map") {
+ testParquetHiveCompatibility(
+ Row(
+ Map[Integer, String](
+ (1: Integer) -> "foo",
+ (2: Integer) -> null)),
+ "MAP<INT, STRING>")
+ }
+
+ // HIVE-11625: Parquet map entries with null keys are dropped by Hive
+ ignore("map entries with null keys") {
+ testParquetHiveCompatibility(
+ Row(
+ Map[Integer, String](
+ null.asInstanceOf[Integer] -> "bar",
+ null.asInstanceOf[Integer] -> null)),
+ "MAP<INT, STRING>")
+ }
+
+ test("struct") {
+ testParquetHiveCompatibility(
+ Row(Row(1, Seq("foo", "bar", null))),
+ "STRUCT<f0: INT, f1: ARRAY<STRING>>")
}
}