diff options
author | Dilip Biswal <dbiswal@us.ibm.com> | 2017-01-08 23:09:07 +0100 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2017-01-08 23:09:07 +0100 |
commit | 4351e62207957bec663108a571cff2bfaaa9e7d5 (patch) | |
tree | d716198441b06c53be549985879d706a51609339 /sql/core/src/main | |
parent | cd1d00adaff65e8adfebc2342dd422c53f98166b (diff) | |
download | spark-4351e62207957bec663108a571cff2bfaaa9e7d5.tar.gz spark-4351e62207957bec663108a571cff2bfaaa9e7d5.tar.bz2 spark-4351e62207957bec663108a571cff2bfaaa9e7d5.zip |
[SPARK-19093][SQL] Cached tables are not used in SubqueryExpression
## What changes were proposed in this pull request?
Consider the plans inside subquery expressions while looking up cache manager to make
use of cached data. Currently CacheManager.useCachedData does not consider the
subquery expressions in the plan.
SQL
```
select * from rows where not exists (select * from rows)
```
Before the fix
```
== Optimized Logical Plan ==
Join LeftAnti
:- InMemoryRelation [_1#3775, _2#3776], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *FileScan parquet [_1#3775,_2#3776] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/rows], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:string,_2:string>
+- Project [_1#3775 AS _1#3775#4001, _2#3776 AS _2#3776#4002]
+- Relation[_1#3775,_2#3776] parquet
```
After
```
== Optimized Logical Plan ==
Join LeftAnti
:- InMemoryRelation [_1#256, _2#257], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *FileScan parquet [_1#256,_2#257] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/rows], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:string,_2:string>
+- Project [_1#256 AS _1#256#298, _2#257 AS _2#257#299]
+- InMemoryRelation [_1#256, _2#257], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *FileScan parquet [_1#256,_2#257] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/rows], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:string,_2:string>
```
Query2
```
SELECT * FROM t1
WHERE
c1 IN (SELECT c1 FROM t2 WHERE c1 IN (SELECT c1 FROM t3 WHERE c1 = 1))
```
Before
```
== Analyzed Logical Plan ==
c1: int
Project [c1#3]
+- Filter predicate-subquery#47 [(c1#3 = c1#10)]
: +- Project [c1#10]
: +- Filter predicate-subquery#46 [(c1#10 = c1#17)]
: : +- Project [c1#17]
: : +- Filter (c1#17 = 1)
: : +- SubqueryAlias t3, `t3`
: : +- Project [value#15 AS c1#17]
: : +- LocalRelation [value#15]
: +- SubqueryAlias t2, `t2`
: +- Project [value#8 AS c1#10]
: +- LocalRelation [value#8]
+- SubqueryAlias t1, `t1`
+- Project [value#1 AS c1#3]
+- LocalRelation [value#1]
== Optimized Logical Plan ==
Join LeftSemi, (c1#3 = c1#10)
:- InMemoryRelation [c1#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t1
: +- LocalTableScan [c1#3]
+- Project [value#8 AS c1#10]
+- Join LeftSemi, (value#8 = c1#17)
:- LocalRelation [value#8]
+- Project [value#15 AS c1#17]
+- Filter (value#15 = 1)
+- LocalRelation [value#15]
```
After
```
== Analyzed Logical Plan ==
c1: int
Project [c1#3]
+- Filter predicate-subquery#47 [(c1#3 = c1#10)]
: +- Project [c1#10]
: +- Filter predicate-subquery#46 [(c1#10 = c1#17)]
: : +- Project [c1#17]
: : +- Filter (c1#17 = 1)
: : +- SubqueryAlias t3, `t3`
: : +- Project [value#15 AS c1#17]
: : +- LocalRelation [value#15]
: +- SubqueryAlias t2, `t2`
: +- Project [value#8 AS c1#10]
: +- LocalRelation [value#8]
+- SubqueryAlias t1, `t1`
+- Project [value#1 AS c1#3]
+- LocalRelation [value#1]
== Optimized Logical Plan ==
Join LeftSemi, (c1#3 = c1#10)
:- InMemoryRelation [c1#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t1
: +- LocalTableScan [c1#3]
+- Join LeftSemi, (c1#10 = c1#17)
:- InMemoryRelation [c1#10], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t2
: +- LocalTableScan [c1#10]
+- Filter (c1#17 = 1)
+- InMemoryRelation [c1#17], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t1
+- LocalTableScan [c1#3]
```
## How was this patch tested?
Added new tests in CachedTableSuite.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes #16493 from dilipbiswal/SPARK-19093.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala | 7 |
1 files changed, 6 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 526623a36d..4ca1347008 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.Dataset import org.apache.spark.sql.execution.columnar.InMemoryRelation @@ -131,12 +132,16 @@ class CacheManager extends Logging { /** Replaces segments of the given logical plan with cached versions where possible. */ def useCachedData(plan: LogicalPlan): LogicalPlan = { - plan transformDown { + val newPlan = plan transformDown { case currentFragment => lookupCachedData(currentFragment) .map(_.cachedRepresentation.withOutput(currentFragment.output)) .getOrElse(currentFragment) } + + newPlan transformAllExpressions { + case s: SubqueryExpression => s.withNewPlan(useCachedData(s.plan)) + } } /** |