diff options
author | hyukjinkwon <gurwls223@gmail.com> | 2016-12-14 11:29:11 -0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-12-14 11:29:11 -0800 |
commit | 89ae26dcdb73266fbc3a8b6da9f5dff30dc4ec95 (patch) | |
tree | 04ecef872ff084070c5876671ffe89f51e60388e /sql/core/src/test | |
parent | 169b9d73ee2136194df42c8deaaa95572b4ae56c (diff) | |
download | spark-89ae26dcdb73266fbc3a8b6da9f5dff30dc4ec95.tar.gz spark-89ae26dcdb73266fbc3a8b6da9f5dff30dc4ec95.tar.bz2 spark-89ae26dcdb73266fbc3a8b6da9f5dff30dc4ec95.zip |
[SPARK-18753][SQL] Keep pushed-down null literal as a filter in Spark-side post-filter for FileFormat datasources
## What changes were proposed in this pull request?
Currently, `FileSourceStrategy` does not handle the case when the pushed-down filter is `Literal(null)` and removes it at the post-filter in Spark-side.
For example, the codes below:
```scala
val df = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDF()
df.filter($"_1" === "true").explain(true)
```
shows it keeps `null` properly.
```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- LocalRelation [_1#17]
== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#17 as double) = cast(true as double))
+- LocalRelation [_1#17]
== Optimized Logical Plan ==
Filter (isnotnull(_1#17) && null)
+- LocalRelation [_1#17]
== Physical Plan ==
*Filter (isnotnull(_1#17) && null) << Here `null` is there
+- LocalTableScan [_1#17]
```
However, when we read it back from Parquet,
```scala
val path = "/tmp/testfile"
df.write.parquet(path)
spark.read.parquet(path).filter($"_1" === "true").explain(true)
```
`null` is removed at the post-filter.
```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#11] parquet
== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#11 as double) = cast(true as double))
+- Relation[_1#11] parquet
== Optimized Logical Plan ==
Filter (isnotnull(_1#11) && null)
+- Relation[_1#11] parquet
== Physical Plan ==
*Project [_1#11]
+- *Filter isnotnull(_1#11) << Here `null` is missing
+- *FileScan parquet [_1#11] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/tmp/testfile], PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>
```
This PR fixes it to keep it properly. In more details,
```scala
val partitionKeyFilters =
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
```
This keeps this `null` in `partitionKeyFilters` as `Literal` always don't have `children` and `references` is being empty which is always the subset of `partitionSet`.
And then in
```scala
val afterScanFilters = filterSet -- partitionKeyFilters
```
`null` is always removed from the post filter. So, if the referenced fields are empty, it should be applied into data columns too.
After this PR, it becomes as below:
```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#276] parquet
== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#276 as double) = cast(true as double))
+- Relation[_1#276] parquet
== Optimized Logical Plan ==
Filter (isnotnull(_1#276) && null)
+- Relation[_1#276] parquet
== Physical Plan ==
*Project [_1#276]
+- *Filter (isnotnull(_1#276) && null)
+- *FileScan parquet [_1#276] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-a5d59bdb-5b..., PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>
```
## How was this patch tested?
Unit test in `FileSourceStrategySuite`
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #16184 from HyukjinKwon/SPARK-18753.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala | 11 |
1 files changed, 11 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index d900ce7bb2..f36162858b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -476,6 +476,17 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi } } + test("[SPARK-18753] keep pushed-down null literal as a filter in Spark-side post-filter") { + val ds = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDS() + withTempPath { p => + val path = p.getAbsolutePath + ds.write.parquet(path) + val readBack = spark.read.parquet(path).filter($"_1" === "true") + val filtered = ds.filter($"_1" === "true").toDF() + checkAnswer(readBack, filtered) + } + } + // Helpers for checking the arguments passed to the FileFormat. protected val checkPartitionSchema = |