aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2017-03-25 00:04:51 +0100
committerReynold Xin <rxin@databricks.com>2017-03-25 00:04:51 +0100
commite011004bedca47be998a0c14fe22a6f9bb5090cd (patch)
tree5e78d0fcda440161a33669cb12ccadedf36da7ba /sql/catalyst
parentb5c5bd98ea5e8dbfebcf86c5459bdf765f5ceb53 (diff)
downloadspark-e011004bedca47be998a0c14fe22a6f9bb5090cd.tar.gz
spark-e011004bedca47be998a0c14fe22a6f9bb5090cd.tar.bz2
spark-e011004bedca47be998a0c14fe22a6f9bb5090cd.zip
[SPARK-19846][SQL] Add a flag to disable constraint propagation
## What changes were proposed in this pull request? Constraint propagation can be computation expensive and block the driver execution for long time. For example, the below benchmark needs 30mins. Compared with previous PRs #16998, #16785, this is a much simpler option: add a flag to disable constraint propagation. ### Benchmark Run the following codes locally. import org.apache.spark.ml.{Pipeline, PipelineStage} import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler} import org.apache.spark.sql.internal.SQLConf spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false) val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3, "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0")) val indexers = df.columns.tail.map(c => new StringIndexer() .setInputCol(c) .setOutputCol(s"${c}_indexed") .setHandleInvalid("skip")) val encoders = indexers.map(indexer => new OneHotEncoder() .setInputCol(indexer.getOutputCol) .setOutputCol(s"${indexer.getOutputCol}_encoded") .setDropLast(true)) val stages: Array[PipelineStage] = indexers ++ encoders val pipeline = new Pipeline().setStages(stages) val startTime = System.nanoTime pipeline.fit(df).transform(df).show val runningTime = System.nanoTime - startTime Before this patch: 1786001 ms ~= 30 mins After this patch: 26392 ms = less than half of a minute Related PRs: #16998, #16785. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #17186 from viirya/add-flag-disable-constraint-propagation.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SimpleCatalystConf.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala22
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala11
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala11
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala19
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala30
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala40
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala18
13 files changed, 158 insertions, 20 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SimpleCatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SimpleCatalystConf.scala
index ac97987c55..8498cf1c9b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SimpleCatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SimpleCatalystConf.scala
@@ -43,7 +43,8 @@ case class SimpleCatalystConf(
override val starSchemaDetection: Boolean = false,
override val warehousePath: String = "/user/hive/warehouse",
override val sessionLocalTimeZone: String = TimeZone.getDefault().getID,
- override val maxNestedViewDepth: Int = 100)
+ override val maxNestedViewDepth: Int = 100,
+ override val constraintPropagationEnabled: Boolean = true)
extends SQLConf {
override def clone(): SimpleCatalystConf = this.copy()
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d7524a57ad..ee7de86921 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -83,12 +83,12 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
// Operator push down
PushProjectionThroughUnion,
ReorderJoin(conf),
- EliminateOuterJoin,
+ EliminateOuterJoin(conf),
PushPredicateThroughJoin,
PushDownPredicate,
LimitPushDown(conf),
ColumnPruning,
- InferFiltersFromConstraints,
+ InferFiltersFromConstraints(conf),
// Operator combine
CollapseRepartition,
CollapseProject,
@@ -107,7 +107,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
SimplifyConditionals,
RemoveDispensableExpressions,
SimplifyBinaryComparison,
- PruneFilters,
+ PruneFilters(conf),
EliminateSorts,
SimplifyCasts,
SimplifyCaseConversionExpressions,
@@ -615,8 +615,16 @@ object CollapseWindow extends Rule[LogicalPlan] {
* Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
* LeftSemi joins.
*/
-object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case class InferFiltersFromConstraints(conf: CatalystConf)
+ extends Rule[LogicalPlan] with PredicateHelper {
+ def apply(plan: LogicalPlan): LogicalPlan = if (conf.constraintPropagationEnabled) {
+ inferFilters(plan)
+ } else {
+ plan
+ }
+
+
+ private def inferFilters(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, child) =>
val newFilters = filter.constraints --
(child.constraints ++ splitConjunctivePredicates(condition))
@@ -705,7 +713,7 @@ object EliminateSorts extends Rule[LogicalPlan] {
* 2) by substituting a dummy empty relation when the filter will always evaluate to `false`.
* 3) by eliminating the always-true conditions given the constraints on the child's output.
*/
-object PruneFilters extends Rule[LogicalPlan] with PredicateHelper {
+case class PruneFilters(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// If the filter condition always evaluate to true, remove the filter.
case Filter(Literal(true, BooleanType), child) => child
@@ -718,7 +726,7 @@ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper {
case f @ Filter(fc, p: LogicalPlan) =>
val (prunedPredicates, remainingPredicates) =
splitConjunctivePredicates(fc).partition { cond =>
- cond.deterministic && p.constraints.contains(cond)
+ cond.deterministic && p.getConstraints(conf.constraintPropagationEnabled).contains(cond)
}
if (prunedPredicates.isEmpty) {
f
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 58e4a230f4..5f7316566b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.annotation.tailrec
+import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.{ExtractFiltersAndInnerJoins, PhysicalOperation}
import org.apache.spark.sql.catalyst.plans._
@@ -439,7 +440,7 @@ case class ReorderJoin(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHe
*
* This rule should be executed before pushing down the Filter
*/
-object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
+case class EliminateOuterJoin(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper {
/**
* Returns whether the expression returns null or false when all inputs are nulls.
@@ -455,7 +456,8 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
}
private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
- val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
+ val conditions = splitConjunctivePredicates(filter.condition) ++
+ filter.getConstraints(conf.constraintPropagationEnabled)
val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index a5761703fd..9fd95a4b36 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -187,6 +187,17 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
lazy val constraints: ExpressionSet = ExpressionSet(getRelevantConstraints(validConstraints))
/**
+ * Returns [[constraints]] depending on the config of enabling constraint propagation. If the
+ * flag is disabled, simply returning an empty constraints.
+ */
+ private[spark] def getConstraints(constraintPropagationEnabled: Boolean): ExpressionSet =
+ if (constraintPropagationEnabled) {
+ constraints
+ } else {
+ ExpressionSet(Set.empty)
+ }
+
+ /**
* This method can be overridden by any child class of QueryPlan to specify a set of constraints
* based on the given operator's constraint propagation logic. These constraints are then
* canonicalized and filtered automatically to contain only those attributes that appear in the
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d5006c1646..5566b06aa3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -187,6 +187,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val CONSTRAINT_PROPAGATION_ENABLED = buildConf("spark.sql.constraintPropagation.enabled")
+ .internal()
+ .doc("When true, the query optimizer will infer and propagate data constraints in the query " +
+ "plan to optimize them. Constraint propagation can sometimes be computationally expensive" +
+ "for certain kinds of query plans (such as those with a large number of predicates and " +
+ "aliases) which might negatively impact overall runtime.")
+ .booleanConf
+ .createWithDefault(true)
+
val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema")
.doc("When true, the Parquet data source merges schemas collected from all data files, " +
"otherwise the schema is picked from the summary file or a random data file " +
@@ -887,6 +896,8 @@ class SQLConf extends Serializable with Logging {
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
+ def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED)
+
/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala
index a0d489681f..2bfddb7bc2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala
@@ -30,15 +30,16 @@ import org.apache.spark.sql.catalyst.rules._
class BinaryComparisonSimplificationSuite extends PlanTest with PredicateHelper {
object Optimize extends RuleExecutor[LogicalPlan] {
+ val conf = SimpleCatalystConf(caseSensitiveAnalysis = true)
val batches =
Batch("AnalysisNodes", Once,
EliminateSubqueryAliases) ::
Batch("Constant Folding", FixedPoint(50),
- NullPropagation(SimpleCatalystConf(caseSensitiveAnalysis = true)),
+ NullPropagation(conf),
ConstantFolding,
BooleanSimplification,
SimplifyBinaryComparison,
- PruneFilters) :: Nil
+ PruneFilters(conf)) :: Nil
}
val nullableRelation = LocalRelation('a.int.withNullability(true))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index 1b9db06014..4d404f55aa 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -30,14 +30,15 @@ import org.apache.spark.sql.catalyst.rules._
class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
object Optimize extends RuleExecutor[LogicalPlan] {
+ val conf = SimpleCatalystConf(caseSensitiveAnalysis = true)
val batches =
Batch("AnalysisNodes", Once,
EliminateSubqueryAliases) ::
Batch("Constant Folding", FixedPoint(50),
- NullPropagation(SimpleCatalystConf(caseSensitiveAnalysis = true)),
+ NullPropagation(conf),
ConstantFolding,
BooleanSimplification,
- PruneFilters) :: Nil
+ PruneFilters(conf)) :: Nil
}
val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.string)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
index 9f57f66a2e..98d8b897a9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.optimizer
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
@@ -31,7 +32,17 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
Batch("InferAndPushDownFilters", FixedPoint(100),
PushPredicateThroughJoin,
PushDownPredicate,
- InferFiltersFromConstraints,
+ InferFiltersFromConstraints(SimpleCatalystConf(caseSensitiveAnalysis = true)),
+ CombineFilters) :: Nil
+ }
+
+ object OptimizeWithConstraintPropagationDisabled extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("InferAndPushDownFilters", FixedPoint(100),
+ PushPredicateThroughJoin,
+ PushDownPredicate,
+ InferFiltersFromConstraints(SimpleCatalystConf(caseSensitiveAnalysis = true,
+ constraintPropagationEnabled = false)),
CombineFilters) :: Nil
}
@@ -201,4 +212,10 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
+
+ test("No inferred filter when constraint propagation is disabled") {
+ val originalQuery = testRelation.where('a === 1 && 'a === 'b).analyze
+ val optimized = OptimizeWithConstraintPropagationDisabled.execute(originalQuery)
+ comparePlans(optimized, originalQuery)
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
index c168a55e40..cbabc1fa6d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.optimizer
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
@@ -31,7 +32,17 @@ class OuterJoinEliminationSuite extends PlanTest {
Batch("Subqueries", Once,
EliminateSubqueryAliases) ::
Batch("Outer Join Elimination", Once,
- EliminateOuterJoin,
+ EliminateOuterJoin(SimpleCatalystConf(caseSensitiveAnalysis = true)),
+ PushPredicateThroughJoin) :: Nil
+ }
+
+ object OptimizeWithConstraintPropagationDisabled extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("Subqueries", Once,
+ EliminateSubqueryAliases) ::
+ Batch("Outer Join Elimination", Once,
+ EliminateOuterJoin(SimpleCatalystConf(caseSensitiveAnalysis = true,
+ constraintPropagationEnabled = false)),
PushPredicateThroughJoin) :: Nil
}
@@ -231,4 +242,21 @@ class OuterJoinEliminationSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+
+ test("no outer join elimination if constraint propagation is disabled") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+
+ // The predicate "x.b + y.d >= 3" will be inferred constraints like:
+ // "x.b != null" and "y.d != null", if constraint propagation is enabled.
+ // When we disable it, the predicate can't be evaluated on left or right plan and used to
+ // filter out nulls. So the Outer Join will not be eliminated.
+ val originalQuery =
+ x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
+ .where("x.b".attr + "y.d".attr >= 3)
+
+ val optimized = OptimizeWithConstraintPropagationDisabled.execute(originalQuery.analyze)
+
+ comparePlans(optimized, originalQuery.analyze)
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
index 908dde7a66..f771e3e9eb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans._
@@ -33,7 +34,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
ReplaceExceptWithAntiJoin,
ReplaceIntersectWithSemiJoin,
PushDownPredicate,
- PruneFilters,
+ PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true)),
PropagateEmptyRelation) :: Nil
}
@@ -45,7 +46,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
ReplaceExceptWithAntiJoin,
ReplaceIntersectWithSemiJoin,
PushDownPredicate,
- PruneFilters) :: Nil
+ PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil
}
val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1)))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
index d8cfec5391..20f7f69e86 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.optimizer
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
@@ -33,7 +34,19 @@ class PruneFiltersSuite extends PlanTest {
EliminateSubqueryAliases) ::
Batch("Filter Pushdown and Pruning", Once,
CombineFilters,
- PruneFilters,
+ PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true)),
+ PushDownPredicate,
+ PushPredicateThroughJoin) :: Nil
+ }
+
+ object OptimizeWithConstraintPropagationDisabled extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("Subqueries", Once,
+ EliminateSubqueryAliases) ::
+ Batch("Filter Pushdown and Pruning", Once,
+ CombineFilters,
+ PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true,
+ constraintPropagationEnabled = false)),
PushDownPredicate,
PushPredicateThroughJoin) :: Nil
}
@@ -133,4 +146,29 @@ class PruneFiltersSuite extends PlanTest {
val correctAnswer = testRelation.where(Rand(10) > 5).where(Rand(10) > 5).select('a).analyze
comparePlans(optimized, correctAnswer)
}
+
+ test("No pruning when constraint propagation is disabled") {
+ val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1)
+ val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2)
+
+ val query = tr1
+ .where("tr1.a".attr > 10 || "tr1.c".attr < 10)
+ .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr))
+
+ val queryWithUselessFilter =
+ query.where(
+ ("tr1.a".attr > 10 || "tr1.c".attr < 10) &&
+ 'd.attr < 100)
+
+ val optimized =
+ OptimizeWithConstraintPropagationDisabled.execute(queryWithUselessFilter.analyze)
+ // When constraint propagation is disabled, the useless filter won't be pruned.
+ // It gets pushed down. Because the rule `CombineFilters` runs only once, there are redundant
+ // and duplicate filters.
+ val correctAnswer = tr1
+ .where("tr1.a".attr > 10 || "tr1.c".attr < 10).where("tr1.a".attr > 10 || "tr1.c".attr < 10)
+ .join(tr2.where('d.attr < 100).where('d.attr < 100),
+ Inner, Some("tr1.a".attr === "tr2.a".attr)).analyze
+ comparePlans(optimized, correctAnswer)
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
index 21b7f49e14..ca4976f0d6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.optimizer
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
@@ -34,7 +35,7 @@ class SetOperationSuite extends PlanTest {
CombineUnions,
PushProjectionThroughUnion,
PushDownPredicate,
- PruneFilters) :: Nil
+ PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil
}
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
index 908b370408..4061394b86 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
@@ -397,4 +397,22 @@ class ConstraintPropagationSuite extends SparkFunSuite {
IsNotNull(resolveColumn(tr, "a")),
IsNotNull(resolveColumn(tr, "c")))))
}
+
+ test("enable/disable constraint propagation") {
+ val tr = LocalRelation('a.int, 'b.string, 'c.int)
+ val filterRelation = tr.where('a.attr > 10)
+
+ verifyConstraints(
+ filterRelation.analyze.getConstraints(constraintPropagationEnabled = true),
+ filterRelation.analyze.constraints)
+
+ assert(filterRelation.analyze.getConstraints(constraintPropagationEnabled = false).isEmpty)
+
+ val aliasedRelation = tr.where('c.attr > 10 && 'a.attr < 5)
+ .groupBy('a, 'c, 'b)('a, 'c.as("c1"), count('a).as("a3")).select('c1, 'a, 'a3)
+
+ verifyConstraints(aliasedRelation.analyze.getConstraints(constraintPropagationEnabled = true),
+ aliasedRelation.analyze.constraints)
+ assert(aliasedRelation.analyze.getConstraints(constraintPropagationEnabled = false).isEmpty)
+ }
}