aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-08-12 20:01:34 +0800
committerCheng Lian <lian@databricks.com>2015-08-12 20:01:34 +0800
commit3ecb3794302dc12d0989f8d725483b2cc37762cf (patch)
treeb8c3a132482fe273a71f3f9bb2235bebc395744e /sql/hive
parent9d0822455ddc8d765440d58c463367a4d67ef456 (diff)
downloadspark-3ecb3794302dc12d0989f8d725483b2cc37762cf.tar.gz
spark-3ecb3794302dc12d0989f8d725483b2cc37762cf.tar.bz2
spark-3ecb3794302dc12d0989f8d725483b2cc37762cf.zip
[SPARK-9407] [SQL] Relaxes Parquet ValidTypeMap to allow ENUM predicates to be pushed down
This PR adds a hacky workaround for PARQUET-201, and should be removed once we upgrade to parquet-mr 1.8.1 or higher versions. In Parquet, not all types of columns can be used for filter push-down optimization. The set of valid column types is controlled by `ValidTypeMap`. Unfortunately, in parquet-mr 1.7.0 and prior versions, this limitation is too strict, and doesn't allow `BINARY (ENUM)` columns to be pushed down. On the other hand, `BINARY (ENUM)` is commonly seen in Parquet files written by libraries like `parquet-avro`. This restriction is problematic for Spark SQL, because Spark SQL doesn't have a type that maps to Parquet `BINARY (ENUM)` directly, and always converts `BINARY (ENUM)` to Catalyst `StringType`. Thus, a predicate involving a `BINARY (ENUM)` is recognized as one involving a string field instead and can be pushed down by the query optimizer. Such predicates are actually perfectly legal except that it fails the `ValidTypeMap` check. The workaround added here is relaxing `ValidTypeMap` to include `BINARY (ENUM)`. I also took the chance to simplify `ParquetCompatibilityTest` a little bit when adding regression test. Author: Cheng Lian <lian@databricks.com> Closes #8107 from liancheng/spark-9407/parquet-enum-filter-push-down.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala83
1 files changed, 42 insertions, 41 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
index 80eb9f122a..251e0324bf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
@@ -32,53 +32,54 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
* Set the staging directory (and hence path to ignore Parquet files under)
* to that set by [[HiveConf.ConfVars.STAGINGDIR]].
*/
- override val stagingDir: Option[String] =
- Some(new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR))
+ private val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)
- override protected def beforeAll(): Unit = {
- super.beforeAll()
+ test("Read Parquet file generated by parquet-hive") {
+ withTable("parquet_compat") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
- withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
- withTempTable("data") {
- sqlContext.sql(
- s"""CREATE TABLE parquet_compat(
- | bool_column BOOLEAN,
- | byte_column TINYINT,
- | short_column SMALLINT,
- | int_column INT,
- | long_column BIGINT,
- | float_column FLOAT,
- | double_column DOUBLE,
- |
- | strings_column ARRAY<STRING>,
- | int_to_string_column MAP<INT, STRING>
- |)
- |STORED AS PARQUET
- |LOCATION '${parquetStore.getCanonicalPath}'
- """.stripMargin)
+ withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
+ withTempTable("data") {
+ sqlContext.sql(
+ s"""CREATE TABLE parquet_compat(
+ | bool_column BOOLEAN,
+ | byte_column TINYINT,
+ | short_column SMALLINT,
+ | int_column INT,
+ | long_column BIGINT,
+ | float_column FLOAT,
+ | double_column DOUBLE,
+ |
+ | strings_column ARRAY<STRING>,
+ | int_to_string_column MAP<INT, STRING>
+ |)
+ |STORED AS PARQUET
+ |LOCATION '$path'
+ """.stripMargin)
- val schema = sqlContext.table("parquet_compat").schema
- val rowRDD = sqlContext.sparkContext.parallelize(makeRows).coalesce(1)
- sqlContext.createDataFrame(rowRDD, schema).registerTempTable("data")
- sqlContext.sql("INSERT INTO TABLE parquet_compat SELECT * FROM data")
- }
- }
- }
+ val schema = sqlContext.table("parquet_compat").schema
+ val rowRDD = sqlContext.sparkContext.parallelize(makeRows).coalesce(1)
+ sqlContext.createDataFrame(rowRDD, schema).registerTempTable("data")
+ sqlContext.sql("INSERT INTO TABLE parquet_compat SELECT * FROM data")
+ }
+ }
- override protected def afterAll(): Unit = {
- sqlContext.sql("DROP TABLE parquet_compat")
- }
+ val schema = readParquetSchema(path, { path =>
+ !path.getName.startsWith("_") && !path.getName.startsWith(stagingDir)
+ })
- test("Read Parquet file generated by parquet-hive") {
- logInfo(
- s"""Schema of the Parquet file written by parquet-hive:
- |${readParquetSchema(parquetStore.getCanonicalPath)}
- """.stripMargin)
+ logInfo(
+ s"""Schema of the Parquet file written by parquet-hive:
+ |$schema
+ """.stripMargin)
- // Unfortunately parquet-hive doesn't add `UTF8` annotation to BINARY when writing strings.
- // Have to assume all BINARY values are strings here.
- withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") {
- checkAnswer(sqlContext.read.parquet(parquetStore.getCanonicalPath), makeRows)
+ // Unfortunately parquet-hive doesn't add `UTF8` annotation to BINARY when writing strings.
+ // Have to assume all BINARY values are strings here.
+ withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") {
+ checkAnswer(sqlContext.read.parquet(path), makeRows)
+ }
+ }
}
}