aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala20
2 files changed, 25 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 77d851ca48..44649a68b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -292,6 +292,10 @@ private[sql] class ParquetRelation(
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
+ // When merging schemas is enabled and the column of the given filter does not exist,
+ // Parquet emits an exception which is an issue of Parquet (PARQUET-389).
+ val safeParquetFilterPushDown = !shouldMergeSchemas && parquetFilterPushDown
+
// Parquet row group size. We will use this value as the value for
// mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
// of these flags are smaller than the parquet row group size.
@@ -305,7 +309,7 @@ private[sql] class ParquetRelation(
dataSchema,
parquetBlockSize,
useMetadataCache,
- parquetFilterPushDown,
+ safeParquetFilterPushDown,
assumeBinaryIsString,
assumeInt96IsTimestamp) _
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 13fdd555a4..b2101beb92 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -316,4 +316,24 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}
+
+ test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
+ import testImplicits._
+
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
+ SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
+ withTempPath { dir =>
+ var pathOne = s"${dir.getCanonicalPath}/table1"
+ (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
+ var pathTwo = s"${dir.getCanonicalPath}/table2"
+ (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)
+
+ // If the "c = 1" filter gets pushed down, this query will throw an exception which
+ // Parquet emits. This is a Parquet issue (PARQUET-389).
+ checkAnswer(
+ sqlContext.read.parquet(pathOne, pathTwo).filter("c = 1"),
+ (1 to 1).map(i => Row(i, i.toString, null)))
+ }
+ }
+ }
}