diff options
author | Cheng Hao <hao.cheng@intel.com> | 2015-10-27 20:26:38 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-10-27 20:26:38 -0700 |
commit | d9c6039897236c3f1e4503aa95c5c9b07b32eadd (patch) | |
tree | 3f9ddb1f1c7b91ef3cb7073cd7094522e48b340f /sql/hive | |
parent | b960a890561eaf3795b93c621bd95be81e56f5b7 (diff) | |
download | spark-d9c6039897236c3f1e4503aa95c5c9b07b32eadd.tar.gz spark-d9c6039897236c3f1e4503aa95c5c9b07b32eadd.tar.bz2 spark-d9c6039897236c3f1e4503aa95c5c9b07b32eadd.zip |
[SPARK-10484] [SQL] Optimize the cartesian join with broadcast join for some cases
In some cases, we can broadcast the smaller relation in cartesian join, which improve the performance significantly.
Author: Cheng Hao <hao.cheng@intel.com>
Closes #8652 from chenghao-intel/cartesian.
Diffstat (limited to 'sql/hive')
6 files changed, 136 insertions, 1 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index c328734df3..83a81cf5d1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -588,8 +588,9 @@ class HiveContext private[hive]( LeftSemiJoin, EquiJoinSelection, BasicOperators, + BroadcastNestedLoop, CartesianProduct, - BroadcastNestedLoopJoin + DefaultJoin ) } diff --git a/sql/hive/src/test/resources/golden/SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN #1-0-abfc0b99ee357f71639f6162345fe8e b/sql/hive/src/test/resources/golden/SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN #1-0-abfc0b99ee357f71639f6162345fe8e new file mode 100644 index 0000000000..0bb9399af0 --- /dev/null +++ b/sql/hive/src/test/resources/golden/SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN #1-0-abfc0b99ee357f71639f6162345fe8e @@ -0,0 +1,20 @@ +302 0 +302 0 +302 0 +305 0 +305 0 +305 0 +306 0 +306 0 +306 0 +307 0 +307 0 +307 0 +307 0 +307 0 +307 0 +308 0 +308 0 +308 0 +309 0 +309 0 diff --git a/sql/hive/src/test/resources/golden/SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN #2-0-8412a39ee57885ccb0aaf848db8ef1dd b/sql/hive/src/test/resources/golden/SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN #2-0-8412a39ee57885ccb0aaf848db8ef1dd new file mode 100644 index 0000000000..4e455ed255 --- /dev/null +++ b/sql/hive/src/test/resources/golden/SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN #2-0-8412a39ee57885ccb0aaf848db8ef1dd @@ -0,0 +1,20 @@ +302 0 +302 0 +302 0 +305 0 +305 0 +305 0 +305 2 +305 4 +306 0 +306 0 +306 0 +306 2 +306 4 +306 5 +306 5 +306 5 +307 0 +307 0 +307 0 +307 0 diff --git a/sql/hive/src/test/resources/golden/SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN #3-0-e8a0427dbde35eea6011144443e5ffb4 b/sql/hive/src/test/resources/golden/SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN #3-0-e8a0427dbde35eea6011144443e5ffb4 new file mode 100644 index 0000000000..4e455ed255 --- /dev/null +++ b/sql/hive/src/test/resources/golden/SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN #3-0-e8a0427dbde35eea6011144443e5ffb4 @@ -0,0 +1,20 @@ +302 0 +302 0 +302 0 +305 0 +305 0 +305 0 +305 2 +305 4 +306 0 +306 0 +306 0 +306 2 +306 4 +306 5 +306 5 +306 5 +307 0 +307 0 +307 0 +307 0 diff --git a/sql/hive/src/test/resources/golden/SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN #4-0-45f8602d257655322b7d18cad09f6a0f b/sql/hive/src/test/resources/golden/SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN #4-0-45f8602d257655322b7d18cad09f6a0f new file mode 100644 index 0000000000..4e455ed255 --- /dev/null +++ b/sql/hive/src/test/resources/golden/SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN #4-0-45f8602d257655322b7d18cad09f6a0f @@ -0,0 +1,20 @@ +302 0 +302 0 +302 0 +305 0 +305 0 +305 0 +305 2 +305 4 +306 0 +306 0 +306 0 +306 2 +306 4 +306 5 +306 5 +306 5 +307 0 +307 0 +307 0 +307 0 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 2878500453..b52f7d4b57 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.hive.execution import java.io.File import java.util.{Locale, TimeZone} +import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin + import scala.util.Try import org.scalatest.BeforeAndAfter @@ -69,6 +71,58 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { } } + // Testing the Broadcast based join for cartesian join (cross join) + // We assume that the Broadcast Join Threshold will works since the src is a small table + private val spark_10484_1 = """ + | SELECT a.key, b.key + | FROM src a LEFT JOIN src b WHERE a.key > b.key + 300 + | ORDER BY b.key, a.key + | LIMIT 20 + """.stripMargin + private val spark_10484_2 = """ + | SELECT a.key, b.key + | FROM src a RIGHT JOIN src b WHERE a.key > b.key + 300 + | ORDER BY a.key, b.key + | LIMIT 20 + """.stripMargin + private val spark_10484_3 = """ + | SELECT a.key, b.key + | FROM src a FULL OUTER JOIN src b WHERE a.key > b.key + 300 + | ORDER BY a.key, b.key + | LIMIT 20 + """.stripMargin + private val spark_10484_4 = """ + | SELECT a.key, b.key + | FROM src a JOIN src b WHERE a.key > b.key + 300 + | ORDER BY a.key, b.key + | LIMIT 20 + """.stripMargin + + createQueryTest("SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN #1", + spark_10484_1) + + createQueryTest("SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN #2", + spark_10484_2) + + createQueryTest("SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN #3", + spark_10484_3) + + createQueryTest("SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN #4", + spark_10484_4) + + test("SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN") { + def assertBroadcastNestedLoopJoin(sqlText: String): Unit = { + assert(sql(sqlText).queryExecution.sparkPlan.collect { + case _: BroadcastNestedLoopJoin => 1 + }.nonEmpty) + } + + assertBroadcastNestedLoopJoin(spark_10484_1) + assertBroadcastNestedLoopJoin(spark_10484_2) + assertBroadcastNestedLoopJoin(spark_10484_3) + assertBroadcastNestedLoopJoin(spark_10484_4) + } + createQueryTest("SPARK-8976 Wrong Result for Rollup #1", """ SELECT count(*) AS cnt, key % 5,GROUPING__ID FROM src group by key%5 WITH ROLLUP |