diff options
author | Yin Huai <huai@cse.ohio-state.edu> | 2014-06-18 10:51:32 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-06-18 10:51:40 -0700 |
commit | 086ca9c86f36ca57668ccbc33f329f495f5caa7e (patch) | |
tree | 863c9e0e0baf378a4c1a98e30217005d8f8a3009 /sql/core | |
parent | 26f6b989312a9a48a27a23ecc68702bd14032e55 (diff) | |
download | spark-086ca9c86f36ca57668ccbc33f329f495f5caa7e.tar.gz spark-086ca9c86f36ca57668ccbc33f329f495f5caa7e.tar.bz2 spark-086ca9c86f36ca57668ccbc33f329f495f5caa7e.zip |
[SPARK-2176][SQL] Extra unnecessary exchange operator in the result of an explain command
```
hql("explain select * from src group by key").collect().foreach(println)
[ExplainCommand [plan#27:0]]
[ Aggregate false, [key#25], [key#25,value#26]]
[ Exchange (HashPartitioning [key#25:0], 200)]
[ Exchange (HashPartitioning [key#25:0], 200)]
[ Aggregate true, [key#25], [key#25]]
[ HiveTableScan [key#25,value#26], (MetastoreRelation default, src, None), None]
```
There are two exchange operators.
However, if we do not use explain...
```
hql("select * from src group by key")
res4: org.apache.spark.sql.SchemaRDD =
SchemaRDD[8] at RDD at SchemaRDD.scala:100
== Query Plan ==
Aggregate false, [key#8], [key#8,value#9]
Exchange (HashPartitioning [key#8:0], 200)
Aggregate true, [key#8], [key#8]
HiveTableScan [key#8,value#9], (MetastoreRelation default, src, None), None
```
The plan is fine.
The cause of this bug is explained below.
When we create an `execution.ExplainCommand`, we use the `executedPlan` as the child of this `ExplainCommand`. But, this `executedPlan` is prepared for execution again when we generate the `executedPlan` for the `ExplainCommand`. Basically, `prepareForExecution` is called twice on a physical plan. Because after `prepareForExecution` we have already bounded those references (in `BoundReference`s), `AddExchange` cannot figure out we are using the same partitioning (we use `AttributeReference`s to create an `ExchangeOperator` and then those references will be changed to `BoundReference`s after `prepareForExecution` is called). So, an extra `ExchangeOperator` is inserted.
I think in `CommandStrategy`, we should just use the `sparkPlan` (`sparkPlan` is the input of `prepareForExecution`) to initialize the `ExplainCommand` instead of using `executedPlan`.
The link to JIRA: https://issues.apache.org/jira/browse/SPARK-2176
Author: Yin Huai <huai@cse.ohio-state.edu>
Closes #1116 from yhuai/SPARK-2176 and squashes the following commits:
197c19c [Yin Huai] Use sparkPlan to initialize a Physical Explain Command instead of using executedPlan.
(cherry picked from commit 587d32012ceeec1e80cec1878312f164cdb76ec8)
Signed-off-by: Reynold Xin <rxin@apache.org>
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 2 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala | 4 |
2 files changed, 4 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index f7e03323be..1617ec717b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -307,6 +307,8 @@ class SQLContext(@transient val sparkContext: SparkContext) lazy val optimizedPlan = optimizer(analyzed) // TODO: Don't just pick the first one... lazy val sparkPlan = planner(optimizedPlan).next() + // executedPlan should not be used to initialize any SparkPlan. It should be + // only used for execution. lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) /** Internal version of the RDD. Avoids copies and has no schema */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 2233216a6e..70c1171148 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -251,8 +251,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.SetCommand(key, value) => Seq(execution.SetCommand(key, value, plan.output)(context)) case logical.ExplainCommand(child) => - val executedPlan = context.executePlan(child).executedPlan - Seq(execution.ExplainCommand(executedPlan, plan.output)(context)) + val sparkPlan = context.executePlan(child).sparkPlan + Seq(execution.ExplainCommand(sparkPlan, plan.output)(context)) case logical.CacheCommand(tableName, cache) => Seq(execution.CacheCommand(tableName, cache)(context)) case _ => Nil |