diff options
author | Dilip Biswal <dbiswal@us.ibm.com> | 2017-04-12 12:18:01 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-04-12 12:18:01 +0800 |
commit | b14bfc3f8e97479ac5927c071b00ed18f2104c95 (patch) | |
tree | 0ee9e829fa341baa93677ecd578055d3267cc4d5 /sql/core/src/main | |
parent | 8ad63ee158815de5ffff7bf03cdf25aef312095f (diff) | |
download | spark-b14bfc3f8e97479ac5927c071b00ed18f2104c95.tar.gz spark-b14bfc3f8e97479ac5927c071b00ed18f2104c95.tar.bz2 spark-b14bfc3f8e97479ac5927c071b00ed18f2104c95.zip |
[SPARK-19993][SQL] Caching logical plans containing subquery expressions does not work.
## What changes were proposed in this pull request?
The sameResult() method does not work when the logical plan contains subquery expressions.
**Before the fix**
```SQL
scala> val ds = spark.sql("select * from s1 where s1.c1 in (select s2.c1 from s2 where s1.c1 = s2.c1)")
ds: org.apache.spark.sql.DataFrame = [c1: int]
scala> ds.cache
res13: ds.type = [c1: int]
scala> spark.sql("select * from s1 where s1.c1 in (select s2.c1 from s2 where s1.c1 = s2.c1)").explain(true)
== Analyzed Logical Plan ==
c1: int
Project [c1#86]
+- Filter c1#86 IN (list#78 [c1#86])
: +- Project [c1#87]
: +- Filter (outer(c1#86) = c1#87)
: +- SubqueryAlias s2
: +- Relation[c1#87] parquet
+- SubqueryAlias s1
+- Relation[c1#86] parquet
== Optimized Logical Plan ==
Join LeftSemi, ((c1#86 = c1#87) && (c1#86 = c1#87))
:- Relation[c1#86] parquet
+- Relation[c1#87] parquet
```
**Plan after fix**
```SQL
== Analyzed Logical Plan ==
c1: int
Project [c1#22]
+- Filter c1#22 IN (list#14 [c1#22])
: +- Project [c1#23]
: +- Filter (outer(c1#22) = c1#23)
: +- SubqueryAlias s2
: +- Relation[c1#23] parquet
+- SubqueryAlias s1
+- Relation[c1#22] parquet
== Optimized Logical Plan ==
InMemoryRelation [c1#22], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *BroadcastHashJoin [c1#1, c1#1], [c1#2, c1#2], LeftSemi, BuildRight
:- *FileScan parquet default.s1[c1#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/dbiswal/mygit/apache/spark/bin/spark-warehouse/s1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int>
+- BroadcastExchange HashedRelationBroadcastMode(List((shiftleft(cast(input[0, int, true] as bigint), 32) | (cast(input[0, int, true] as bigint) & 4294967295))))
+- *FileScan parquet default.s2[c1#2] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/dbiswal/mygit/apache/spark/bin/spark-warehouse/s2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int>
```
## How was this patch tested?
New tests are added to CachedTableSuite.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes #17330 from dilipbiswal/subquery_cache_final.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala | 7 |
1 files changed, 4 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 3a9132d74a..866fa98533 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} @@ -516,10 +517,10 @@ case class FileSourceScanExec( override lazy val canonicalized: FileSourceScanExec = { FileSourceScanExec( relation, - output.map(normalizeExprId(_, output)), + output.map(QueryPlan.normalizeExprId(_, output)), requiredSchema, - partitionFilters.map(normalizeExprId(_, output)), - dataFilters.map(normalizeExprId(_, output)), + partitionFilters.map(QueryPlan.normalizeExprId(_, output)), + dataFilters.map(QueryPlan.normalizeExprId(_, output)), None) } } |