diff options
author | Tejas Patil <tejasp@fb.com> | 2016-09-10 09:27:22 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-09-10 09:27:22 +0800 |
commit | 335491704c526921da3b3c5035175677ba5b92de (patch) | |
tree | 43194c82c849831ea54c96c012aef4f6f04d49ca /licenses | |
parent | f7d2143705c8c1baeed0bc62940f9dba636e705b (diff) | |
download | spark-335491704c526921da3b3c5035175677ba5b92de.tar.gz spark-335491704c526921da3b3c5035175677ba5b92de.tar.bz2 spark-335491704c526921da3b3c5035175677ba5b92de.zip |
[SPARK-15453][SQL] FileSourceScanExec to extract `outputOrdering` information
## What changes were proposed in this pull request?
Jira : https://issues.apache.org/jira/browse/SPARK-15453
Extracting sort ordering information in `FileSourceScanExec` so that planner can make use of it. My motivation to make this change was to get Sort Merge join in par with Hive's Sort-Merge-Bucket join when the source tables are bucketed + sorted.
Query:
```
val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1)
df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table8")
df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table9")
context.sql("SELECT * FROM table8 a JOIN table9 b ON a.j=b.j AND a.k=b.k").explain(true)
```
Before:
```
== Physical Plan ==
*SortMergeJoin [j#120, k#121], [j#123, k#124], Inner
:- *Sort [j#120 ASC, k#121 ASC], false, 0
: +- *Project [i#119, j#120, k#121]
: +- *Filter (isnotnull(k#121) && isnotnull(j#120))
: +- *FileScan orc default.table8[i#119,j#120,k#121] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
+- *Sort [j#123 ASC, k#124 ASC], false, 0
+- *Project [i#122, j#123, k#124]
+- *Filter (isnotnull(k#124) && isnotnull(j#123))
+- *FileScan orc default.table9[i#122,j#123,k#124] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
```
After: (note that the `Sort` step is no longer there)
```
== Physical Plan ==
*SortMergeJoin [j#49, k#50], [j#52, k#53], Inner
:- *Project [i#48, j#49, k#50]
: +- *Filter (isnotnull(k#50) && isnotnull(j#49))
: +- *FileScan orc default.table8[i#48,j#49,k#50] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
+- *Project [i#51, j#52, k#53]
+- *Filter (isnotnull(j#52) && isnotnull(k#53))
+- *FileScan orc default.table9[i#51,j#52,k#53] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
```
## How was this patch tested?
Added a test case in `JoinSuite`. Ran all other tests in `JoinSuite`
Author: Tejas Patil <tejasp@fb.com>
Closes #14864 from tejasapatil/SPARK-15453_smb_optimization.
Diffstat (limited to 'licenses')
0 files changed, 0 insertions, 0 deletions