aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-02-24 11:58:12 -0800
committerReynold Xin <rxin@databricks.com>2016-02-24 11:58:12 -0800
commit382b27babf7771b724f7abff78195a858631d138 (patch)
tree8d757ef721becded4c3eb86c7a6f756416b3ccf4 /sql
parentf3739869973ba4285196a61775d891292b8e282b (diff)
downloadspark-382b27babf7771b724f7abff78195a858631d138.tar.gz
spark-382b27babf7771b724f7abff78195a858631d138.tar.bz2
spark-382b27babf7771b724f7abff78195a858631d138.zip
Revert "[SPARK-13383][SQL] Keep broadcast hint after column pruning"
This reverts commit f3739869973ba4285196a61775d891292b8e282b.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala (renamed from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala)35
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala12
3 files changed, 9 insertions, 42 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 5d2a65b716..af43cb3786 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -332,10 +332,6 @@ case class Join(
*/
case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
-
- // We manually set statistics of BroadcastHint to smallest value to make sure
- // the plan wrapped by BroadcastHint will be considered to broadcast later.
- override def statistics: Statistics = Statistics(sizeInBytes = 1)
}
case class InsertIntoTable(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
index d482519827..a5b487bcc8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
@@ -23,18 +23,18 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
-import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
-import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
-class JoinOptimizationSuite extends PlanTest {
+class JoinOrderSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubqueryAliases) ::
- Batch("Filter Pushdown", FixedPoint(100),
+ Batch("Filter Pushdown", Once,
CombineFilters,
PushPredicateThroughProject,
BooleanSimplification,
@@ -92,31 +92,4 @@ class JoinOptimizationSuite extends PlanTest {
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
}
-
- test("broadcasthint sets relation statistics to smallest value") {
- val input = LocalRelation('key.int, 'value.string)
-
- val query =
- Project(Seq($"x.key", $"y.key"),
- Join(
- SubqueryAlias("x", input),
- BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze
-
- val optimized = Optimize.execute(query)
-
- val expected =
- Project(Seq($"x.key", $"y.key"),
- Join(
- Project(Seq($"x.key"), SubqueryAlias("x", input)),
- BroadcastHint(
- Project(Seq($"y.key"), SubqueryAlias("y", input))),
- Inner, None)).analyze
-
- comparePlans(optimized, expected)
-
- val broadcastChildren = optimized.collect {
- case Join(_, r, _, _) if r.statistics.sizeInBytes == 1 => r
- }
- assert(broadcastChildren.size == 1)
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 247eb054a8..7347156398 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -81,13 +81,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* Matches a plan whose output should be small enough to be used in broadcast join.
*/
object CanBroadcast {
- def unapply(plan: LogicalPlan): Option[LogicalPlan] = {
- if (sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
- plan.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) {
- Some(plan)
- } else {
- None
- }
+ def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
+ case BroadcastHint(p) => Some(p)
+ case p if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
+ p.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold => Some(p)
+ case _ => None
}
}