aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarson Wang <carson.wang@intel.com>2017-03-10 11:13:26 -0800
committerYin Huai <yhuai@databricks.com>2017-03-10 11:13:26 -0800
commitdd9049e0492cc70b629518fee9b3d1632374c612 (patch)
tree9696ca797307a3b3be834f1545be1ca92c33d792
parentfcb68e0f5d49234ac4527109887ff08cd4e1c29f (diff)
downloadspark-dd9049e0492cc70b629518fee9b3d1632374c612.tar.gz
spark-dd9049e0492cc70b629518fee9b3d1632374c612.tar.bz2
spark-dd9049e0492cc70b629518fee9b3d1632374c612.zip
[SPARK-19620][SQL] Fix incorrect exchange coordinator id in the physical plan
## What changes were proposed in this pull request? When adaptive execution is enabled, an exchange coordinator is used in the Exchange operators. For Join, the same exchange coordinator is used for its two Exchanges. But the physical plan shows two different coordinator Ids which is confusing. This PR is to fix the incorrect exchange coordinator id in the physical plan. The coordinator object instead of the `Option[ExchangeCoordinator]` should be used to generate the identity hash code of the same coordinator. ## How was this patch tested? Before the patch, the physical plan shows two different exchange coordinator id for Join. ``` == Physical Plan == *Project [key1#3L, value2#12L] +- *SortMergeJoin [key1#3L], [key2#11L], Inner :- *Sort [key1#3L ASC NULLS FIRST], false, 0 : +- Exchange(coordinator id: 1804587700) hashpartitioning(key1#3L, 10), coordinator[target post-shuffle partition size: 67108864] : +- *Project [(id#0L % 500) AS key1#3L] : +- *Filter isnotnull((id#0L % 500)) : +- *Range (0, 1000, step=1, splits=Some(10)) +- *Sort [key2#11L ASC NULLS FIRST], false, 0 +- Exchange(coordinator id: 793927319) hashpartitioning(key2#11L, 10), coordinator[target post-shuffle partition size: 67108864] +- *Project [(id#8L % 500) AS key2#11L, id#8L AS value2#12L] +- *Filter isnotnull((id#8L % 500)) +- *Range (0, 1000, step=1, splits=Some(10)) ``` After the patch, two exchange coordinator id are the same. Author: Carson Wang <carson.wang@intel.com> Closes #16952 from carsonwang/FixCoordinatorId.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala2
1 files changed, 1 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
index 125a4930c6..f06544ea8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
@@ -46,7 +46,7 @@ case class ShuffleExchange(
override def nodeName: String = {
val extraInfo = coordinator match {
case Some(exchangeCoordinator) =>
- s"(coordinator id: ${System.identityHashCode(coordinator)})"
+ s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})"
case None => ""
}