aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SimpleCatalystConf.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala22
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala11
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala11
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala19
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala30
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala40
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala18
13 files changed, 158 insertions, 20 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SimpleCatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SimpleCatalystConf.scala
index ac97987c55..8498cf1c9b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SimpleCatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SimpleCatalystConf.scala
@@ -43,7 +43,8 @@ case class SimpleCatalystConf(
override val starSchemaDetection: Boolean = false,
override val warehousePath: String = "/user/hive/warehouse",
override val sessionLocalTimeZone: String = TimeZone.getDefault().getID,
- override val maxNestedViewDepth: Int = 100)
+ override val maxNestedViewDepth: Int = 100,
+ override val constraintPropagationEnabled: Boolean = true)
extends SQLConf {
override def clone(): SimpleCatalystConf = this.copy()
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d7524a57ad..ee7de86921 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -83,12 +83,12 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
// Operator push down
PushProjectionThroughUnion,
ReorderJoin(conf),
- EliminateOuterJoin,
+ EliminateOuterJoin(conf),
PushPredicateThroughJoin,
PushDownPredicate,
LimitPushDown(conf),
ColumnPruning,
- InferFiltersFromConstraints,
+ InferFiltersFromConstraints(conf),
// Operator combine
CollapseRepartition,
CollapseProject,
@@ -107,7 +107,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
SimplifyConditionals,
RemoveDispensableExpressions,
SimplifyBinaryComparison,
- PruneFilters,
+ PruneFilters(conf),
EliminateSorts,
SimplifyCasts,
SimplifyCaseConversionExpressions,
@@ -615,8 +615,16 @@ object CollapseWindow extends Rule[LogicalPlan] {
* Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
* LeftSemi joins.
*/
-object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case class InferFiltersFromConstraints(conf: CatalystConf)
+ extends Rule[LogicalPlan] with PredicateHelper {
+ def apply(plan: LogicalPlan): LogicalPlan = if (conf.constraintPropagationEnabled) {
+ inferFilters(plan)
+ } else {
+ plan
+ }
+
+
+ private def inferFilters(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, child) =>
val newFilters = filter.constraints --
(child.constraints ++ splitConjunctivePredicates(condition))
@@ -705,7 +713,7 @@ object EliminateSorts extends Rule[LogicalPlan] {
* 2) by substituting a dummy empty relation when the filter will always evaluate to `false`.
* 3) by eliminating the always-true conditions given the constraints on the child's output.
*/
-object PruneFilters extends Rule[LogicalPlan] with PredicateHelper {
+case class PruneFilters(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// If the filter condition always evaluate to true, remove the filter.
case Filter(Literal(true, BooleanType), child) => child
@@ -718,7 +726,7 @@ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper {
case f @ Filter(fc, p: LogicalPlan) =>
val (prunedPredicates, remainingPredicates) =
splitConjunctivePredicates(fc).partition { cond =>
- cond.deterministic && p.constraints.contains(cond)
+ cond.deterministic && p.getConstraints(conf.constraintPropagationEnabled).contains(cond)
}
if (prunedPredicates.isEmpty) {
f
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 58e4a230f4..5f7316566b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.annotation.tailrec
+import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.{ExtractFiltersAndInnerJoins, PhysicalOperation}
import org.apache.spark.sql.catalyst.plans._
@@ -439,7 +440,7 @@ case class ReorderJoin(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHe
*
* This rule should be executed before pushing down the Filter
*/
-object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
+case class EliminateOuterJoin(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper {
/**
* Returns whether the expression returns null or false when all inputs are nulls.
@@ -455,7 +456,8 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
}
private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
- val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
+ val conditions = splitConjunctivePredicates(filter.condition) ++
+ filter.getConstraints(conf.constraintPropagationEnabled)
val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index a5761703fd..9fd95a4b36 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -187,6 +187,17 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
lazy val constraints: ExpressionSet = ExpressionSet(getRelevantConstraints(validConstraints))
/**
+ * Returns [[constraints]] depending on the config of enabling constraint propagation. If the
+ * flag is disabled, simply returning an empty constraints.
+ */
+ private[spark] def getConstraints(constraintPropagationEnabled: Boolean): ExpressionSet =
+ if (constraintPropagationEnabled) {
+ constraints
+ } else {
+ ExpressionSet(Set.empty)
+ }
+
+ /**
* This method can be overridden by any child class of QueryPlan to specify a set of constraints
* based on the given operator's constraint propagation logic. These constraints are then
* canonicalized and filtered automatically to contain only those attributes that appear in the
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d5006c1646..5566b06aa3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -187,6 +187,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val CONSTRAINT_PROPAGATION_ENABLED = buildConf("spark.sql.constraintPropagation.enabled")
+ .internal()
+ .doc("When true, the query optimizer will infer and propagate data constraints in the query " +
+ "plan to optimize them. Constraint propagation can sometimes be computationally expensive" +
+ "for certain kinds of query plans (such as those with a large number of predicates and " +
+ "aliases) which might negatively impact overall runtime.")
+ .booleanConf
+ .createWithDefault(true)
+
val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema")
.doc("When true, the Parquet data source merges schemas collected from all data files, " +
"otherwise the schema is picked from the summary file or a random data file " +
@@ -887,6 +896,8 @@ class SQLConf extends Serializable with Logging {
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
+ def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED)
+
/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala
index a0d489681f..2bfddb7bc2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala
@@ -30,15 +30,16 @@ import org.apache.spark.sql.catalyst.rules._
class BinaryComparisonSimplificationSuite extends PlanTest with PredicateHelper {
object Optimize extends RuleExecutor[LogicalPlan] {
+ val conf = SimpleCatalystConf(caseSensitiveAnalysis = true)
val batches =
Batch("AnalysisNodes", Once,
EliminateSubqueryAliases) ::
Batch("Constant Folding", FixedPoint(50),
- NullPropagation(SimpleCatalystConf(caseSensitiveAnalysis = true)),
+ NullPropagation(conf),
ConstantFolding,
BooleanSimplification,
SimplifyBinaryComparison,
- PruneFilters) :: Nil
+ PruneFilters(conf)) :: Nil
}
val nullableRelation = LocalRelation('a.int.withNullability(true))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index 1b9db06014..4d404f55aa 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -30,14 +30,15 @@ import org.apache.spark.sql.catalyst.rules._
class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
object Optimize extends RuleExecutor[LogicalPlan] {
+ val conf = SimpleCatalystConf(caseSensitiveAnalysis = true)
val batches =
Batch("AnalysisNodes", Once,
EliminateSubqueryAliases) ::
Batch("Constant Folding", FixedPoint(50),
- NullPropagation(SimpleCatalystConf(caseSensitiveAnalysis = true)),
+ NullPropagation(conf),
ConstantFolding,
BooleanSimplification,
- PruneFilters) :: Nil
+ PruneFilters(conf)) :: Nil
}
val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.string)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
index 9f57f66a2e..98d8b897a9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.optimizer
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
@@ -31,7 +32,17 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
Batch("InferAndPushDownFilters", FixedPoint(100),
PushPredicateThroughJoin,
PushDownPredicate,
- InferFiltersFromConstraints,
+ InferFiltersFromConstraints(SimpleCatalystConf(caseSensitiveAnalysis = true)),
+ CombineFilters) :: Nil
+ }
+
+ object OptimizeWithConstraintPropagationDisabled extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("InferAndPushDownFilters", FixedPoint(100),
+ PushPredicateThroughJoin,
+ PushDownPredicate,
+ InferFiltersFromConstraints(SimpleCatalystConf(caseSensitiveAnalysis = true,
+ constraintPropagationEnabled = false)),
CombineFilters) :: Nil
}
@@ -201,4 +212,10 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
+
+ test("No inferred filter when constraint propagation is disabled") {
+ val originalQuery = testRelation.where('a === 1 && 'a === 'b).analyze
+ val optimized = OptimizeWithConstraintPropagationDisabled.execute(originalQuery)
+ comparePlans(optimized, originalQuery)
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
index c168a55e40..cbabc1fa6d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.optimizer
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
@@ -31,7 +32,17 @@ class OuterJoinEliminationSuite extends PlanTest {
Batch("Subqueries", Once,
EliminateSubqueryAliases) ::
Batch("Outer Join Elimination", Once,
- EliminateOuterJoin,
+ EliminateOuterJoin(SimpleCatalystConf(caseSensitiveAnalysis = true)),
+ PushPredicateThroughJoin) :: Nil
+ }
+
+ object OptimizeWithConstraintPropagationDisabled extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("Subqueries", Once,
+ EliminateSubqueryAliases) ::
+ Batch("Outer Join Elimination", Once,
+ EliminateOuterJoin(SimpleCatalystConf(caseSensitiveAnalysis = true,
+ constraintPropagationEnabled = false)),
PushPredicateThroughJoin) :: Nil
}
@@ -231,4 +242,21 @@ class OuterJoinEliminationSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+
+ test("no outer join elimination if constraint propagation is disabled") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+
+ // The predicate "x.b + y.d >= 3" will be inferred constraints like:
+ // "x.b != null" and "y.d != null", if constraint propagation is enabled.
+ // When we disable it, the predicate can't be evaluated on left or right plan and used to
+ // filter out nulls. So the Outer Join will not be eliminated.
+ val originalQuery =
+ x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
+ .where("x.b".attr + "y.d".attr >= 3)
+
+ val optimized = OptimizeWithConstraintPropagationDisabled.execute(originalQuery.analyze)
+
+ comparePlans(optimized, originalQuery.analyze)
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
index 908dde7a66..f771e3e9eb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans._
@@ -33,7 +34,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
ReplaceExceptWithAntiJoin,
ReplaceIntersectWithSemiJoin,
PushDownPredicate,
- PruneFilters,
+ PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true)),
PropagateEmptyRelation) :: Nil
}
@@ -45,7 +46,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
ReplaceExceptWithAntiJoin,
ReplaceIntersectWithSemiJoin,
PushDownPredicate,
- PruneFilters) :: Nil
+ PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil
}
val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1)))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
index d8cfec5391..20f7f69e86 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.optimizer
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
@@ -33,7 +34,19 @@ class PruneFiltersSuite extends PlanTest {
EliminateSubqueryAliases) ::
Batch("Filter Pushdown and Pruning", Once,
CombineFilters,
- PruneFilters,
+ PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true)),
+ PushDownPredicate,
+ PushPredicateThroughJoin) :: Nil
+ }
+
+ object OptimizeWithConstraintPropagationDisabled extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("Subqueries", Once,
+ EliminateSubqueryAliases) ::
+ Batch("Filter Pushdown and Pruning", Once,
+ CombineFilters,
+ PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true,
+ constraintPropagationEnabled = false)),
PushDownPredicate,
PushPredicateThroughJoin) :: Nil
}
@@ -133,4 +146,29 @@ class PruneFiltersSuite extends PlanTest {
val correctAnswer = testRelation.where(Rand(10) > 5).where(Rand(10) > 5).select('a).analyze
comparePlans(optimized, correctAnswer)
}
+
+ test("No pruning when constraint propagation is disabled") {
+ val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1)
+ val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2)
+
+ val query = tr1
+ .where("tr1.a".attr > 10 || "tr1.c".attr < 10)
+ .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr))
+
+ val queryWithUselessFilter =
+ query.where(
+ ("tr1.a".attr > 10 || "tr1.c".attr < 10) &&
+ 'd.attr < 100)
+
+ val optimized =
+ OptimizeWithConstraintPropagationDisabled.execute(queryWithUselessFilter.analyze)
+ // When constraint propagation is disabled, the useless filter won't be pruned.
+ // It gets pushed down. Because the rule `CombineFilters` runs only once, there are redundant
+ // and duplicate filters.
+ val correctAnswer = tr1
+ .where("tr1.a".attr > 10 || "tr1.c".attr < 10).where("tr1.a".attr > 10 || "tr1.c".attr < 10)
+ .join(tr2.where('d.attr < 100).where('d.attr < 100),
+ Inner, Some("tr1.a".attr === "tr2.a".attr)).analyze
+ comparePlans(optimized, correctAnswer)
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
index 21b7f49e14..ca4976f0d6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.optimizer
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
@@ -34,7 +35,7 @@ class SetOperationSuite extends PlanTest {
CombineUnions,
PushProjectionThroughUnion,
PushDownPredicate,
- PruneFilters) :: Nil
+ PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil
}
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
index 908b370408..4061394b86 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
@@ -397,4 +397,22 @@ class ConstraintPropagationSuite extends SparkFunSuite {
IsNotNull(resolveColumn(tr, "a")),
IsNotNull(resolveColumn(tr, "c")))))
}
+
+ test("enable/disable constraint propagation") {
+ val tr = LocalRelation('a.int, 'b.string, 'c.int)
+ val filterRelation = tr.where('a.attr > 10)
+
+ verifyConstraints(
+ filterRelation.analyze.getConstraints(constraintPropagationEnabled = true),
+ filterRelation.analyze.constraints)
+
+ assert(filterRelation.analyze.getConstraints(constraintPropagationEnabled = false).isEmpty)
+
+ val aliasedRelation = tr.where('c.attr > 10 && 'a.attr < 5)
+ .groupBy('a, 'c, 'b)('a, 'c.as("c1"), count('a).as("a3")).select('c1, 'a, 'a3)
+
+ verifyConstraints(aliasedRelation.analyze.getConstraints(constraintPropagationEnabled = true),
+ aliasedRelation.analyze.constraints)
+ assert(aliasedRelation.analyze.getConstraints(constraintPropagationEnabled = false).isEmpty)
+ }
}