diff options
author | Davies Liu <davies@databricks.com> | 2015-10-20 13:40:24 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-10-20 13:40:24 -0700 |
commit | 67d468f8d9172569ec9846edc6432240547696dd (patch) | |
tree | d9edf59834ee77c564b58ee12387db8ad17a9145 /sql/core | |
parent | 478c7ce8628c05ebce2972e631d76317accebe9c (diff) | |
download | spark-67d468f8d9172569ec9846edc6432240547696dd.tar.gz spark-67d468f8d9172569ec9846edc6432240547696dd.tar.bz2 spark-67d468f8d9172569ec9846edc6432240547696dd.zip |
[SPARK-11111] [SQL] fast null-safe join
Currently, we use CartesianProduct for join with null-safe-equal condition.
```
scala> sqlContext.sql("select * from t a join t b on (a.i <=> b.i)").explain
== Physical Plan ==
TungstenProject [i#2,j#3,i#7,j#8]
Filter (i#2 <=> i#7)
CartesianProduct
LocalTableScan [i#2,j#3], [[1,1]]
LocalTableScan [i#7,j#8], [[1,1]]
```
Actually, we can have an equal-join condition as `coalesce(i, default) = coalesce(b.i, default)`, then an partitioned join algorithm could be used.
After this PR, the plan will become:
```
>>> sqlContext.sql("select * from a join b ON a.id <=> b.id").explain()
TungstenProject [id#0L,id#1L]
Filter (id#0L <=> id#1L)
SortMergeJoin [coalesce(id#0L,0)], [coalesce(id#1L,0)]
TungstenSort [coalesce(id#0L,0) ASC], false, 0
TungstenExchange hashpartitioning(coalesce(id#0L,0),200)
ConvertToUnsafe
Scan PhysicalRDD[id#0L]
TungstenSort [coalesce(id#1L,0) ASC], false, 0
TungstenExchange hashpartitioning(coalesce(id#1L,0),200)
ConvertToUnsafe
Scan PhysicalRDD[id#1L]
```
Author: Davies Liu <davies@databricks.com>
Closes #9120 from davies/null_safe.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 14 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala | 14 |
2 files changed, 28 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 636591630e..a35a7f41dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.DefaultParserDialect import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.errors.DialectException import org.apache.spark.sql.execution.aggregate +import org.apache.spark.sql.execution.joins.{SortMergeJoin, CartesianProduct} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext} @@ -850,6 +851,19 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Row(null, null, 6, "F") :: Nil) } + test("SPARK-11111 null-safe join should not use cartesian product") { + val df = sql("select count(*) from testData a join testData b on (a.key <=> b.key)") + val cp = df.queryExecution.executedPlan.collect { + case cp: CartesianProduct => cp + } + assert(cp.isEmpty, "should not use CartesianProduct for null-safe join") + val smj = df.queryExecution.executedPlan.collect { + case smj: SortMergeJoin => smj + } + assert(smj.size > 0, "should use SortMergeJoin") + checkAnswer(df, Row(100) :: Nil) + } + test("SPARK-3349 partitioning after limit") { sql("SELECT DISTINCT n FROM lowerCaseData ORDER BY n DESC") .limit(2) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index 4174ee0550..da58e96f3e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -212,4 +212,18 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { ) } + { + lazy val left = Seq((1, Some(0)), (2, None)).toDF("a", "b") + lazy val right = Seq((1, Some(0)), (2, None)).toDF("a", "b") + testInnerJoin( + "inner join, null safe", + left, + right, + () => (left.col("b") <=> right.col("b")).expr, + Seq( + (1, 0, 1, 0), + (2, null, 2, null) + ) + ) + } } |