aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorZhenhua Wang <wzh_zju@163.com>2017-01-03 12:19:52 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-03 12:19:52 +0800
commitae83c211257c508989c703d54f2aeec8b2b5f14d (patch)
treeffad724a91c46592dab3a3118d2082bf614d1654 /sql/core/src/main
parenta6cd9dbc6095570e93dab1d93671abecdce40c25 (diff)
downloadspark-ae83c211257c508989c703d54f2aeec8b2b5f14d.tar.gz
spark-ae83c211257c508989c703d54f2aeec8b2b5f14d.tar.bz2
spark-ae83c211257c508989c703d54f2aeec8b2b5f14d.zip
[SPARK-18998][SQL] Add a cbo conf to switch between default statistics and estimated statistics
## What changes were proposed in this pull request? We add a cbo configuration to switch between default stats and estimated stats. We also define a new statistics method `planStats` in LogicalPlan with conf as its parameter, in order to pass the cbo switch and other estimation related configurations in the future. `planStats` is used on the caller sides (i.e. in Optimizer and Strategies) to make transformation decisions based on stats. ## How was this patch tested? Add a test case using a dummy LogicalPlan. Author: Zhenhua Wang <wzh_zju@163.com> Closes #16401 from wzhfy/cboSwitch.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala9
2 files changed, 15 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index ba82ec156e..81cd5ef340 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -114,9 +114,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* Matches a plan whose output should be small enough to be used in broadcast join.
*/
private def canBroadcast(plan: LogicalPlan): Boolean = {
- plan.statistics.isBroadcastable ||
- (plan.statistics.sizeInBytes >= 0 &&
- plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold)
+ plan.planStats(conf).isBroadcastable ||
+ (plan.planStats(conf).sizeInBytes >= 0 &&
+ plan.planStats(conf).sizeInBytes <= conf.autoBroadcastJoinThreshold)
}
/**
@@ -126,7 +126,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* dynamic.
*/
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
- plan.statistics.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
+ plan.planStats(conf).sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}
/**
@@ -137,7 +137,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* use the size of bytes here as estimation.
*/
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
- a.statistics.sizeInBytes * 3 <= b.statistics.sizeInBytes
+ a.planStats(conf).sizeInBytes * 3 <= b.planStats(conf).sizeInBytes
}
private def canBuildRight(joinType: JoinType): Boolean = joinType match {
@@ -206,7 +206,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Join(left, right, joinType, condition) =>
val buildSide =
- if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) {
+ if (right.planStats(conf).sizeInBytes <= left.planStats(conf).sizeInBytes) {
BuildRight
} else {
BuildLeft
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 304dcb691b..322cc7c928 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -642,6 +642,12 @@ object SQLConf {
.doubleConf
.createWithDefault(0.05)
+ val CBO_ENABLED =
+ SQLConfigBuilder("spark.sql.cbo.enabled")
+ .doc("Enables CBO for estimation of plan statistics when set true.")
+ .booleanConf
+ .createWithDefault(false)
+
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
@@ -841,6 +847,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
override def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
def ndvMaxError: Double = getConf(NDV_MAX_ERROR)
+
+ override def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED)
+
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */