From e44274870dee308f4e3e8ce79457d8d19693b6e5 Mon Sep 17 00:00:00 2001 From: wangzhenhua Date: Wed, 8 Mar 2017 16:01:28 +0100 Subject: [SPARK-17080][SQL] join reorder ## What changes were proposed in this pull request? Reorder the joins using a dynamic programming algorithm (Selinger paper): First we put all items (basic joined nodes) into level 1, then we build all two-way joins at level 2 from plans at level 1 (single items), then build all 3-way joins from plans at previous levels (two-way joins and single items), then 4-way joins ... etc, until we build all n-way joins and pick the best plan among them. When building m-way joins, we only keep the best plan (with the lowest cost) for the same set of m items. E.g., for 3-way joins, we keep only the best plan for items {A, B, C} among plans (A J B) J C, (A J C) J B and (B J C) J A. Thus, the plans maintained for each level when reordering four items A, B, C, D are as follows: ``` level 1: p({A}), p({B}), p({C}), p({D}) level 2: p({A, B}), p({A, C}), p({A, D}), p({B, C}), p({B, D}), p({C, D}) level 3: p({A, B, C}), p({A, B, D}), p({A, C, D}), p({B, C, D}) level 4: p({A, B, C, D}) ``` where p({A, B, C, D}) is the final output plan. For cost evaluation, since physical costs for operators are not available currently, we use cardinalities and sizes to compute costs. ## How was this patch tested? add test cases Author: wangzhenhua Author: Zhenhua Wang Closes #17138 from wzhfy/joinReorder. --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) (limited to 'sql/core/src/main/scala/org/apache') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fd3acd42e8..94e3fa7dd1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -668,6 +668,18 @@ object SQLConf { .booleanConf .createWithDefault(false) + val JOIN_REORDER_ENABLED = + buildConf("spark.sql.cbo.joinReorder.enabled") + .doc("Enables join reorder in CBO.") + .booleanConf + .createWithDefault(false) + + val JOIN_REORDER_DP_THRESHOLD = + buildConf("spark.sql.cbo.joinReorder.dp.threshold") + .doc("The maximum number of joined nodes allowed in the dynamic programming algorithm.") + .intConf + .createWithDefault(12) + val SESSION_LOCAL_TIMEZONE = buildConf("spark.sql.session.timeZone") .doc("""The ID of session local timezone, e.g. "GMT", "America/Los_Angeles", etc.""") @@ -885,6 +897,10 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { override def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED) + override def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED) + + override def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ -- cgit v1.2.3