aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorDilip Biswal <dbiswal@us.ibm.com>2016-03-17 10:01:41 -0700
committerMichael Armbrust <michael@databricks.com>2016-03-17 10:01:41 -0700
commit637a78f1d3dff00658324de3887d75c5ccd857be (patch)
tree7af264456d6f3d17e62f6ecdb9f8e9db8d34f9c2 /sql/core
parent65b75e66e89075e23ccaa3813648397b3ec010fe (diff)
downloadspark-637a78f1d3dff00658324de3887d75c5ccd857be.tar.gz
spark-637a78f1d3dff00658324de3887d75c5ccd857be.tar.bz2
spark-637a78f1d3dff00658324de3887d75c5ccd857be.zip
[SPARK-13427][SQL] Support USING clause in JOIN.
## What changes were proposed in this pull request? Support queries that JOIN tables with USING clause. SELECT * from table1 JOIN table2 USING <column_list> USING clause can be used as a means to simplify the join condition when : 1) Equijoin semantics is desired and 2) The column names in the equijoin have the same name. We already have the support for Natural Join in Spark. This PR makes use of the already existing infrastructure for natural join to form the join condition and also the projection list. ## How was the this patch tested? Have added unit tests in SQLQuerySuite, CatalystQlSuite, ResolveNaturalJoinSuite Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #11297 from dilipbiswal/spark-13427.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala39
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala64
2 files changed, 69 insertions, 34 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index ac2ca3c5a3..75f1ffd51f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -490,41 +490,12 @@ class Dataset[T] private[sql](
Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None))
.analyzed.asInstanceOf[Join]
- val condition = usingColumns.map { col =>
- catalyst.expressions.EqualTo(
- withPlan(joined.left).resolve(col),
- withPlan(joined.right).resolve(col))
- }.reduceLeftOption[catalyst.expressions.BinaryExpression] { (cond, eqTo) =>
- catalyst.expressions.And(cond, eqTo)
- }
-
- // Project only one of the join columns.
- val joinedCols = JoinType(joinType) match {
- case Inner | LeftOuter | LeftSemi =>
- usingColumns.map(col => withPlan(joined.left).resolve(col))
- case RightOuter =>
- usingColumns.map(col => withPlan(joined.right).resolve(col))
- case FullOuter =>
- usingColumns.map { col =>
- val leftCol = withPlan(joined.left).resolve(col).toAttribute.withNullability(true)
- val rightCol = withPlan(joined.right).resolve(col).toAttribute.withNullability(true)
- Alias(Coalesce(Seq(leftCol, rightCol)), col)()
- }
- case NaturalJoin(_) => sys.error("NaturalJoin with using clause is not supported.")
- }
- // The nullability of output of joined could be different than original column,
- // so we can only compare them by exprId
- val joinRefs = AttributeSet(condition.toSeq.flatMap(_.references))
- val resultCols = joinedCols ++ joined.output.filterNot(joinRefs.contains(_))
withPlan {
- Project(
- resultCols,
- Join(
- joined.left,
- joined.right,
- joinType = JoinType(joinType),
- condition)
- )
+ Join(
+ joined.left,
+ joined.right,
+ UsingJoin(JoinType(joinType), usingColumns.map(UnresolvedAttribute(_))),
+ None)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 3efe984c09..6716982118 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2179,4 +2179,68 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Row(4) :: Nil)
}
}
+
+ test("join with using clause") {
+ val df1 = Seq(("r1c1", "r1c2", "t1r1c3"),
+ ("r2c1", "r2c2", "t1r2c3"), ("r3c1x", "r3c2", "t1r3c3")).toDF("c1", "c2", "c3")
+ val df2 = Seq(("r1c1", "r1c2", "t2r1c3"),
+ ("r2c1", "r2c2", "t2r2c3"), ("r3c1y", "r3c2", "t2r3c3")).toDF("c1", "c2", "c3")
+ val df3 = Seq((null, "r1c2", "t3r1c3"),
+ ("r2c1", "r2c2", "t3r2c3"), ("r3c1y", "r3c2", "t3r3c3")).toDF("c1", "c2", "c3")
+ withTempTable("t1", "t2", "t3") {
+ df1.registerTempTable("t1")
+ df2.registerTempTable("t2")
+ df3.registerTempTable("t3")
+ // inner join with one using column
+ checkAnswer(
+ sql("SELECT * FROM t1 join t2 using (c1)"),
+ Row("r1c1", "r1c2", "t1r1c3", "r1c2", "t2r1c3") ::
+ Row("r2c1", "r2c2", "t1r2c3", "r2c2", "t2r2c3") :: Nil)
+
+ // inner join with two using columns
+ checkAnswer(
+ sql("SELECT * FROM t1 join t2 using (c1, c2)"),
+ Row("r1c1", "r1c2", "t1r1c3", "t2r1c3") ::
+ Row("r2c1", "r2c2", "t1r2c3", "t2r2c3") :: Nil)
+
+ // Left outer join with one using column.
+ checkAnswer(
+ sql("SELECT * FROM t1 left join t2 using (c1)"),
+ Row("r1c1", "r1c2", "t1r1c3", "r1c2", "t2r1c3") ::
+ Row("r2c1", "r2c2", "t1r2c3", "r2c2", "t2r2c3") ::
+ Row("r3c1x", "r3c2", "t1r3c3", null, null) :: Nil)
+
+ // Right outer join with one using column.
+ checkAnswer(
+ sql("SELECT * FROM t1 right join t2 using (c1)"),
+ Row("r1c1", "r1c2", "t1r1c3", "r1c2", "t2r1c3") ::
+ Row("r2c1", "r2c2", "t1r2c3", "r2c2", "t2r2c3") ::
+ Row("r3c1y", null, null, "r3c2", "t2r3c3") :: Nil)
+
+ // Full outer join with one using column.
+ checkAnswer(
+ sql("SELECT * FROM t1 full outer join t2 using (c1)"),
+ Row("r1c1", "r1c2", "t1r1c3", "r1c2", "t2r1c3") ::
+ Row("r2c1", "r2c2", "t1r2c3", "r2c2", "t2r2c3") ::
+ Row("r3c1x", "r3c2", "t1r3c3", null, null) ::
+ Row("r3c1y", null,
+ null, "r3c2", "t2r3c3") :: Nil)
+
+ // Full outer join with null value in join column.
+ checkAnswer(
+ sql("SELECT * FROM t1 full outer join t3 using (c1)"),
+ Row("r1c1", "r1c2", "t1r1c3", null, null) ::
+ Row("r2c1", "r2c2", "t1r2c3", "r2c2", "t3r2c3") ::
+ Row("r3c1x", "r3c2", "t1r3c3", null, null) ::
+ Row("r3c1y", null, null, "r3c2", "t3r3c3") ::
+ Row(null, null, null, "r1c2", "t3r1c3") :: Nil)
+
+ // Self join with using columns.
+ checkAnswer(
+ sql("SELECT * FROM t1 join t1 using (c1)"),
+ Row("r1c1", "r1c2", "t1r1c3", "r1c2", "t1r1c3") ::
+ Row("r2c1", "r2c2", "t1r2c3", "r2c2", "t1r2c3") ::
+ Row("r3c1x", "r3c2", "t1r3c3", "r3c2", "t1r3c3") :: Nil)
+ }
+ }
}