diff options
author | wangzhenhua <wangzhenhua@huawei.com> | 2017-03-08 16:01:28 +0100 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2017-03-08 16:01:28 +0100 |
commit | e44274870dee308f4e3e8ce79457d8d19693b6e5 (patch) | |
tree | 99cde8d5b623e14e9b1a8fa86ab152a4cba0e640 /sql/core | |
parent | 9ea201cf6482c9c62c9428759d238063db62d66e (diff) | |
download | spark-e44274870dee308f4e3e8ce79457d8d19693b6e5.tar.gz spark-e44274870dee308f4e3e8ce79457d8d19693b6e5.tar.bz2 spark-e44274870dee308f4e3e8ce79457d8d19693b6e5.zip |
[SPARK-17080][SQL] join reorder
## What changes were proposed in this pull request?
Reorder the joins using a dynamic programming algorithm (Selinger paper):
First we put all items (basic joined nodes) into level 1, then we build all two-way joins at level 2 from plans at level 1 (single items), then build all 3-way joins from plans at previous levels (two-way joins and single items), then 4-way joins ... etc, until we build all n-way joins and pick the best plan among them.
When building m-way joins, we only keep the best plan (with the lowest cost) for the same set of m items. E.g., for 3-way joins, we keep only the best plan for items {A, B, C} among plans (A J B) J C, (A J C) J B and (B J C) J A. Thus, the plans maintained for each level when reordering four items A, B, C, D are as follows:
```
level 1: p({A}), p({B}), p({C}), p({D})
level 2: p({A, B}), p({A, C}), p({A, D}), p({B, C}), p({B, D}), p({C, D})
level 3: p({A, B, C}), p({A, B, D}), p({A, C, D}), p({B, C, D})
level 4: p({A, B, C, D})
```
where p({A, B, C, D}) is the final output plan.
For cost evaluation, since physical costs for operators are not available currently, we use cardinalities and sizes to compute costs.
## How was this patch tested?
add test cases
Author: wangzhenhua <wangzhenhua@huawei.com>
Author: Zhenhua Wang <wzh_zju@163.com>
Closes #17138 from wzhfy/joinReorder.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 16 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala | 2 |
2 files changed, 17 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fd3acd42e8..94e3fa7dd1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -668,6 +668,18 @@ object SQLConf { .booleanConf .createWithDefault(false) + val JOIN_REORDER_ENABLED = + buildConf("spark.sql.cbo.joinReorder.enabled") + .doc("Enables join reorder in CBO.") + .booleanConf + .createWithDefault(false) + + val JOIN_REORDER_DP_THRESHOLD = + buildConf("spark.sql.cbo.joinReorder.dp.threshold") + .doc("The maximum number of joined nodes allowed in the dynamic programming algorithm.") + .intConf + .createWithDefault(12) + val SESSION_LOCAL_TIMEZONE = buildConf("spark.sql.session.timeZone") .doc("""The ID of session local timezone, e.g. "GMT", "America/Los_Angeles", etc.""") @@ -885,6 +897,10 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { override def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED) + override def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED) + + override def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index d44a6e41cb..a4d012cd76 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -45,7 +45,7 @@ class SparkSqlParserSuite extends PlanTest { * Normalizes plans: * - CreateTable the createTime in tableDesc will replaced by -1L. */ - private def normalizePlan(plan: LogicalPlan): LogicalPlan = { + override def normalizePlan(plan: LogicalPlan): LogicalPlan = { plan match { case CreateTable(tableDesc, mode, query) => val newTableDesc = tableDesc.copy(createTime = -1L) |