aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-10-20 13:40:24 -0700
committerDavies Liu <davies.liu@gmail.com>2015-10-20 13:40:24 -0700
commit67d468f8d9172569ec9846edc6432240547696dd (patch)
treed9edf59834ee77c564b58ee12387db8ad17a9145 /sql/core
parent478c7ce8628c05ebce2972e631d76317accebe9c (diff)
downloadspark-67d468f8d9172569ec9846edc6432240547696dd.tar.gz
spark-67d468f8d9172569ec9846edc6432240547696dd.tar.bz2
spark-67d468f8d9172569ec9846edc6432240547696dd.zip
[SPARK-11111] [SQL] fast null-safe join
Currently, we use CartesianProduct for join with null-safe-equal condition. ``` scala> sqlContext.sql("select * from t a join t b on (a.i <=> b.i)").explain == Physical Plan == TungstenProject [i#2,j#3,i#7,j#8] Filter (i#2 <=> i#7) CartesianProduct LocalTableScan [i#2,j#3], [[1,1]] LocalTableScan [i#7,j#8], [[1,1]] ``` Actually, we can have an equal-join condition as `coalesce(i, default) = coalesce(b.i, default)`, then an partitioned join algorithm could be used. After this PR, the plan will become: ``` >>> sqlContext.sql("select * from a join b ON a.id <=> b.id").explain() TungstenProject [id#0L,id#1L] Filter (id#0L <=> id#1L) SortMergeJoin [coalesce(id#0L,0)], [coalesce(id#1L,0)] TungstenSort [coalesce(id#0L,0) ASC], false, 0 TungstenExchange hashpartitioning(coalesce(id#0L,0),200) ConvertToUnsafe Scan PhysicalRDD[id#0L] TungstenSort [coalesce(id#1L,0) ASC], false, 0 TungstenExchange hashpartitioning(coalesce(id#1L,0),200) ConvertToUnsafe Scan PhysicalRDD[id#1L] ``` Author: Davies Liu <davies@databricks.com> Closes #9120 from davies/null_safe.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala14
2 files changed, 28 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 636591630e..a35a7f41dd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.DefaultParserDialect
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.execution.aggregate
+import org.apache.spark.sql.execution.joins.{SortMergeJoin, CartesianProduct}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SQLTestData._
import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext}
@@ -850,6 +851,19 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Row(null, null, 6, "F") :: Nil)
}
+ test("SPARK-11111 null-safe join should not use cartesian product") {
+ val df = sql("select count(*) from testData a join testData b on (a.key <=> b.key)")
+ val cp = df.queryExecution.executedPlan.collect {
+ case cp: CartesianProduct => cp
+ }
+ assert(cp.isEmpty, "should not use CartesianProduct for null-safe join")
+ val smj = df.queryExecution.executedPlan.collect {
+ case smj: SortMergeJoin => smj
+ }
+ assert(smj.size > 0, "should use SortMergeJoin")
+ checkAnswer(df, Row(100) :: Nil)
+ }
+
test("SPARK-3349 partitioning after limit") {
sql("SELECT DISTINCT n FROM lowerCaseData ORDER BY n DESC")
.limit(2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
index 4174ee0550..da58e96f3e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
@@ -212,4 +212,18 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
)
}
+ {
+ lazy val left = Seq((1, Some(0)), (2, None)).toDF("a", "b")
+ lazy val right = Seq((1, Some(0)), (2, None)).toDF("a", "b")
+ testInnerJoin(
+ "inner join, null safe",
+ left,
+ right,
+ () => (left.col("b") <=> right.col("b")).expr,
+ Seq(
+ (1, 0, 1, 0),
+ (2, null, 2, null)
+ )
+ )
+ }
}