aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYadong Qi <qiyadong2010@gmail.com>2016-09-22 13:04:42 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-22 13:04:42 +0800
commitcb324f61150c962aeabf0a779f6a09797b3d5072 (patch)
treedcf18b09a1fe41acf132f4985b470d14afae95e4
parentb50b34f5611a1f182ba9b6eaf86c666bbd9f9eb0 (diff)
downloadspark-cb324f61150c962aeabf0a779f6a09797b3d5072.tar.gz
spark-cb324f61150c962aeabf0a779f6a09797b3d5072.tar.bz2
spark-cb324f61150c962aeabf0a779f6a09797b3d5072.zip
[SPARK-17425][SQL] Override sameResult in HiveTableScanExec to make ReuseExchange work in text format table
## What changes were proposed in this pull request? The PR will override the `sameResult` in `HiveTableScanExec` to make `ReuseExchange` work in text format table. ## How was this patch tested? # SQL ```sql SELECT * FROM src t1 JOIN src t2 ON t1.key = t2.key JOIN src t3 ON t1.key = t3.key; ``` # Before ``` == Physical Plan == *BroadcastHashJoin [key#30], [key#34], Inner, BuildRight :- *BroadcastHashJoin [key#30], [key#32], Inner, BuildRight : :- *Filter isnotnull(key#30) : : +- HiveTableScan [key#30, value#31], MetastoreRelation default, src : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- *Filter isnotnull(key#32) : +- HiveTableScan [key#32, value#33], MetastoreRelation default, src +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *Filter isnotnull(key#34) +- HiveTableScan [key#34, value#35], MetastoreRelation default, src ``` # After ``` == Physical Plan == *BroadcastHashJoin [key#2], [key#6], Inner, BuildRight :- *BroadcastHashJoin [key#2], [key#4], Inner, BuildRight : :- *Filter isnotnull(key#2) : : +- HiveTableScan [key#2, value#3], MetastoreRelation default, src : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- *Filter isnotnull(key#4) : +- HiveTableScan [key#4, value#5], MetastoreRelation default, src +- ReusedExchange [key#6, value#7], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) ``` cc: davies cloud-fan Author: Yadong Qi <qiyadong2010@gmail.com> Closes #14988 from watermen/SPARK-17425.
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala15
1 files changed, 15 insertions, 0 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index a716a3eab6..231f204b12 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -164,4 +164,19 @@ case class HiveTableScanExec(
}
override def output: Seq[Attribute] = attributes
+
+ override def sameResult(plan: SparkPlan): Boolean = plan match {
+ case other: HiveTableScanExec =>
+ val thisPredicates = partitionPruningPred.map(cleanExpression)
+ val otherPredicates = other.partitionPruningPred.map(cleanExpression)
+
+ val result = relation.sameResult(other.relation) &&
+ output.length == other.output.length &&
+ output.zip(other.output)
+ .forall(p => p._1.name == p._2.name && p._1.dataType == p._2.dataType) &&
+ thisPredicates.length == otherPredicates.length &&
+ thisPredicates.zip(otherPredicates).forall(p => p._1.semanticEquals(p._2))
+ result
+ case _ => false
+ }
}