From 59db9e9c382fab40aac0633f2c779bee8cf2025f Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 30 Oct 2015 18:17:35 +0800 Subject: [SPARK-11103][SQL] Filter applied on Merged Parquet shema with new column fail When enabling mergedSchema and predicate filter, this fails since Parquet does not accept filters pushed down when the columns of the filters do not exist in the schema. This is related with Parquet issue (https://issues.apache.org/jira/browse/PARQUET-389). For now, it just simply disables predicate push down when using merged schema in this PR. Author: hyukjinkwon Closes #9327 from HyukjinKwon/SPARK-11103. --- .../datasources/parquet/ParquetRelation.scala | 6 +++++- .../datasources/parquet/ParquetFilterSuite.scala | 20 ++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 77d851ca48..44649a68b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -292,6 +292,10 @@ private[sql] class ParquetRelation( val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp + // When merging schemas is enabled and the column of the given filter does not exist, + // Parquet emits an exception which is an issue of Parquet (PARQUET-389). + val safeParquetFilterPushDown = !shouldMergeSchemas && parquetFilterPushDown + // Parquet row group size. We will use this value as the value for // mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value // of these flags are smaller than the parquet row group size. @@ -305,7 +309,7 @@ private[sql] class ParquetRelation( dataSchema, parquetBlockSize, useMetadataCache, - parquetFilterPushDown, + safeParquetFilterPushDown, assumeBinaryIsString, assumeInt96IsTimestamp) _ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 13fdd555a4..b2101beb92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -316,4 +316,24 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") { + import testImplicits._ + + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", + SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") { + withTempPath { dir => + var pathOne = s"${dir.getCanonicalPath}/table1" + (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne) + var pathTwo = s"${dir.getCanonicalPath}/table2" + (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo) + + // If the "c = 1" filter gets pushed down, this query will throw an exception which + // Parquet emits. This is a Parquet issue (PARQUET-389). + checkAnswer( + sqlContext.read.parquet(pathOne, pathTwo).filter("c = 1"), + (1 to 1).map(i => Row(i, i.toString, null))) + } + } + } } -- cgit v1.2.3