diff options
author | Reynold Xin <rxin@databricks.com> | 2016-02-24 11:58:12 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-02-24 11:58:12 -0800 |
commit | 382b27babf7771b724f7abff78195a858631d138 (patch) | |
tree | 8d757ef721becded4c3eb86c7a6f756416b3ccf4 | |
parent | f3739869973ba4285196a61775d891292b8e282b (diff) | |
download | spark-382b27babf7771b724f7abff78195a858631d138.tar.gz spark-382b27babf7771b724f7abff78195a858631d138.tar.bz2 spark-382b27babf7771b724f7abff78195a858631d138.zip |
Revert "[SPARK-13383][SQL] Keep broadcast hint after column pruning"
This reverts commit f3739869973ba4285196a61775d891292b8e282b.
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala | 4 | ||||
-rw-r--r-- | sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala (renamed from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala) | 35 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala | 12 |
3 files changed, 9 insertions, 42 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 5d2a65b716..af43cb3786 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -332,10 +332,6 @@ case class Join( */ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output - - // We manually set statistics of BroadcastHint to smallest value to make sure - // the plan wrapped by BroadcastHint will be considered to broadcast later. - override def statistics: Statistics = Statistics(sizeInBytes = 1) } case class InsertIntoTable( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala index d482519827..a5b487bcc8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala @@ -23,18 +23,18 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins -import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} -import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor -class JoinOptimizationSuite extends PlanTest { +class JoinOrderSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Subqueries", Once, EliminateSubqueryAliases) :: - Batch("Filter Pushdown", FixedPoint(100), + Batch("Filter Pushdown", Once, CombineFilters, PushPredicateThroughProject, BooleanSimplification, @@ -92,31 +92,4 @@ class JoinOptimizationSuite extends PlanTest { comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) } - - test("broadcasthint sets relation statistics to smallest value") { - val input = LocalRelation('key.int, 'value.string) - - val query = - Project(Seq($"x.key", $"y.key"), - Join( - SubqueryAlias("x", input), - BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze - - val optimized = Optimize.execute(query) - - val expected = - Project(Seq($"x.key", $"y.key"), - Join( - Project(Seq($"x.key"), SubqueryAlias("x", input)), - BroadcastHint( - Project(Seq($"y.key"), SubqueryAlias("y", input))), - Inner, None)).analyze - - comparePlans(optimized, expected) - - val broadcastChildren = optimized.collect { - case Join(_, r, _, _) if r.statistics.sizeInBytes == 1 => r - } - assert(broadcastChildren.size == 1) - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 247eb054a8..7347156398 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -81,13 +81,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * Matches a plan whose output should be small enough to be used in broadcast join. */ object CanBroadcast { - def unapply(plan: LogicalPlan): Option[LogicalPlan] = { - if (sqlContext.conf.autoBroadcastJoinThreshold > 0 && - plan.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) { - Some(plan) - } else { - None - } + def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match { + case BroadcastHint(p) => Some(p) + case p if sqlContext.conf.autoBroadcastJoinThreshold > 0 && + p.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold => Some(p) + case _ => None } } |