diff options
author | Zhenhua Wang <wzh_zju@163.com> | 2017-01-03 12:19:52 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-01-03 12:19:52 +0800 |
commit | ae83c211257c508989c703d54f2aeec8b2b5f14d (patch) | |
tree | ffad724a91c46592dab3a3118d2082bf614d1654 /sql/core/src/main | |
parent | a6cd9dbc6095570e93dab1d93671abecdce40c25 (diff) | |
download | spark-ae83c211257c508989c703d54f2aeec8b2b5f14d.tar.gz spark-ae83c211257c508989c703d54f2aeec8b2b5f14d.tar.bz2 spark-ae83c211257c508989c703d54f2aeec8b2b5f14d.zip |
[SPARK-18998][SQL] Add a cbo conf to switch between default statistics and estimated statistics
## What changes were proposed in this pull request?
We add a cbo configuration to switch between default stats and estimated stats.
We also define a new statistics method `planStats` in LogicalPlan with conf as its parameter, in order to pass the cbo switch and other estimation related configurations in the future. `planStats` is used on the caller sides (i.e. in Optimizer and Strategies) to make transformation decisions based on stats.
## How was this patch tested?
Add a test case using a dummy LogicalPlan.
Author: Zhenhua Wang <wzh_zju@163.com>
Closes #16401 from wzhfy/cboSwitch.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala | 12 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 9 |
2 files changed, 15 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index ba82ec156e..81cd5ef340 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -114,9 +114,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * Matches a plan whose output should be small enough to be used in broadcast join. */ private def canBroadcast(plan: LogicalPlan): Boolean = { - plan.statistics.isBroadcastable || - (plan.statistics.sizeInBytes >= 0 && - plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold) + plan.planStats(conf).isBroadcastable || + (plan.planStats(conf).sizeInBytes >= 0 && + plan.planStats(conf).sizeInBytes <= conf.autoBroadcastJoinThreshold) } /** @@ -126,7 +126,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * dynamic. */ private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = { - plan.statistics.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions + plan.planStats(conf).sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions } /** @@ -137,7 +137,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * use the size of bytes here as estimation. */ private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = { - a.statistics.sizeInBytes * 3 <= b.statistics.sizeInBytes + a.planStats(conf).sizeInBytes * 3 <= b.planStats(conf).sizeInBytes } private def canBuildRight(joinType: JoinType): Boolean = joinType match { @@ -206,7 +206,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Join(left, right, joinType, condition) => val buildSide = - if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) { + if (right.planStats(conf).sizeInBytes <= left.planStats(conf).sizeInBytes) { BuildRight } else { BuildLeft diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 304dcb691b..322cc7c928 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -642,6 +642,12 @@ object SQLConf { .doubleConf .createWithDefault(0.05) + val CBO_ENABLED = + SQLConfigBuilder("spark.sql.cbo.enabled") + .doc("Enables CBO for estimation of plan statistics when set true.") + .booleanConf + .createWithDefault(false) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } @@ -841,6 +847,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { override def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED) def ndvMaxError: Double = getConf(NDV_MAX_ERROR) + + override def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ |