aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <huai@cse.ohio-state.edu>2014-06-18 10:51:32 -0700
committerReynold Xin <rxin@apache.org>2014-06-18 10:51:32 -0700
commit587d32012ceeec1e80cec1878312f164cdb76ec8 (patch)
tree21c1da78d4aef80d6f1e2fc78ab5ba3b4282df88 /sql
parent889f7b7624689444ecdb4f0ca16ef78f9bfc8430 (diff)
downloadspark-587d32012ceeec1e80cec1878312f164cdb76ec8.tar.gz
spark-587d32012ceeec1e80cec1878312f164cdb76ec8.tar.bz2
spark-587d32012ceeec1e80cec1878312f164cdb76ec8.zip
[SPARK-2176][SQL] Extra unnecessary exchange operator in the result of an explain command
``` hql("explain select * from src group by key").collect().foreach(println) [ExplainCommand [plan#27:0]] [ Aggregate false, [key#25], [key#25,value#26]] [ Exchange (HashPartitioning [key#25:0], 200)] [ Exchange (HashPartitioning [key#25:0], 200)] [ Aggregate true, [key#25], [key#25]] [ HiveTableScan [key#25,value#26], (MetastoreRelation default, src, None), None] ``` There are two exchange operators. However, if we do not use explain... ``` hql("select * from src group by key") res4: org.apache.spark.sql.SchemaRDD = SchemaRDD[8] at RDD at SchemaRDD.scala:100 == Query Plan == Aggregate false, [key#8], [key#8,value#9] Exchange (HashPartitioning [key#8:0], 200) Aggregate true, [key#8], [key#8] HiveTableScan [key#8,value#9], (MetastoreRelation default, src, None), None ``` The plan is fine. The cause of this bug is explained below. When we create an `execution.ExplainCommand`, we use the `executedPlan` as the child of this `ExplainCommand`. But, this `executedPlan` is prepared for execution again when we generate the `executedPlan` for the `ExplainCommand`. Basically, `prepareForExecution` is called twice on a physical plan. Because after `prepareForExecution` we have already bounded those references (in `BoundReference`s), `AddExchange` cannot figure out we are using the same partitioning (we use `AttributeReference`s to create an `ExchangeOperator` and then those references will be changed to `BoundReference`s after `prepareForExecution` is called). So, an extra `ExchangeOperator` is inserted. I think in `CommandStrategy`, we should just use the `sparkPlan` (`sparkPlan` is the input of `prepareForExecution`) to initialize the `ExplainCommand` instead of using `executedPlan`. The link to JIRA: https://issues.apache.org/jira/browse/SPARK-2176 Author: Yin Huai <huai@cse.ohio-state.edu> Closes #1116 from yhuai/SPARK-2176 and squashes the following commits: 197c19c [Yin Huai] Use sparkPlan to initialize a Physical Explain Command instead of using executedPlan.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala4
2 files changed, 4 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index f7e03323be..1617ec717b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -307,6 +307,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
lazy val optimizedPlan = optimizer(analyzed)
// TODO: Don't just pick the first one...
lazy val sparkPlan = planner(optimizedPlan).next()
+ // executedPlan should not be used to initialize any SparkPlan. It should be
+ // only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 2233216a6e..70c1171148 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -251,8 +251,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.SetCommand(key, value) =>
Seq(execution.SetCommand(key, value, plan.output)(context))
case logical.ExplainCommand(child) =>
- val executedPlan = context.executePlan(child).executedPlan
- Seq(execution.ExplainCommand(executedPlan, plan.output)(context))
+ val sparkPlan = context.executePlan(child).sparkPlan
+ Seq(execution.ExplainCommand(sparkPlan, plan.output)(context))
case logical.CacheCommand(tableName, cache) =>
Seq(execution.CacheCommand(tableName, cache)(context))
case _ => Nil