aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2015-10-30 18:17:35 +0800
committerCheng Lian <lian@databricks.com>2015-10-30 18:17:35 +0800
commit59db9e9c382fab40aac0633f2c779bee8cf2025f (patch)
treede2faab40e68f01e07671bc9e699a2e359b1d8f2
parent86d65265fcab7edab88a7bdb10acba47da95bcb3 (diff)
downloadspark-59db9e9c382fab40aac0633f2c779bee8cf2025f.tar.gz
spark-59db9e9c382fab40aac0633f2c779bee8cf2025f.tar.bz2
spark-59db9e9c382fab40aac0633f2c779bee8cf2025f.zip
[SPARK-11103][SQL] Filter applied on Merged Parquet shema with new column fail
When enabling mergedSchema and predicate filter, this fails since Parquet does not accept filters pushed down when the columns of the filters do not exist in the schema. This is related with Parquet issue (https://issues.apache.org/jira/browse/PARQUET-389). For now, it just simply disables predicate push down when using merged schema in this PR. Author: hyukjinkwon <gurwls223@gmail.com> Closes #9327 from HyukjinKwon/SPARK-11103.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala20
2 files changed, 25 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 77d851ca48..44649a68b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -292,6 +292,10 @@ private[sql] class ParquetRelation(
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
+ // When merging schemas is enabled and the column of the given filter does not exist,
+ // Parquet emits an exception which is an issue of Parquet (PARQUET-389).
+ val safeParquetFilterPushDown = !shouldMergeSchemas && parquetFilterPushDown
+
// Parquet row group size. We will use this value as the value for
// mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
// of these flags are smaller than the parquet row group size.
@@ -305,7 +309,7 @@ private[sql] class ParquetRelation(
dataSchema,
parquetBlockSize,
useMetadataCache,
- parquetFilterPushDown,
+ safeParquetFilterPushDown,
assumeBinaryIsString,
assumeInt96IsTimestamp) _
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 13fdd555a4..b2101beb92 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -316,4 +316,24 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}
+
+ test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
+ import testImplicits._
+
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
+ SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
+ withTempPath { dir =>
+ var pathOne = s"${dir.getCanonicalPath}/table1"
+ (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
+ var pathTwo = s"${dir.getCanonicalPath}/table2"
+ (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)
+
+ // If the "c = 1" filter gets pushed down, this query will throw an exception which
+ // Parquet emits. This is a Parquet issue (PARQUET-389).
+ checkAnswer(
+ sqlContext.read.parquet(pathOne, pathTwo).filter("c = 1"),
+ (1 to 1).map(i => Row(i, i.toString, null)))
+ }
+ }
+ }
}