aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2015-06-08 23:27:05 -0700
committerReynold Xin <rxin@databricks.com>2015-06-08 23:27:05 -0700
commit7658eb28a2ea28c06e3b5a26f7734a7dc36edc19 (patch)
tree223a4bfef4f1abceeb471bb23fc371fe96a6f5a0 /sql/core
parenta5c52c1a3488b69bec19e460d2d1fdb0c9ada58d (diff)
downloadspark-7658eb28a2ea28c06e3b5a26f7734a7dc36edc19.tar.gz
spark-7658eb28a2ea28c06e3b5a26f7734a7dc36edc19.tar.bz2
spark-7658eb28a2ea28c06e3b5a26f7734a7dc36edc19.zip
[SPARK-7990][SQL] Add methods to facilitate equi-join on multiple joining keys
JIRA: https://issues.apache.org/jira/browse/SPARK-7990 Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #6616 from viirya/multi_keys_equi_join and squashes the following commits: cd5c888 [Liang-Chi Hsieh] Import reduce in python3. c43722c [Liang-Chi Hsieh] For comments. 0400e89 [Liang-Chi Hsieh] Fix scala style. cc90015 [Liang-Chi Hsieh] Add methods to facilitate equi-join on multiple joining keys.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala40
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala9
2 files changed, 43 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 4a224153e1..59f64dd4bc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -395,22 +395,50 @@ class DataFrame private[sql](
* @since 1.4.0
*/
def join(right: DataFrame, usingColumn: String): DataFrame = {
+ join(right, Seq(usingColumn))
+ }
+
+ /**
+ * Inner equi-join with another [[DataFrame]] using the given columns.
+ *
+ * Different from other join functions, the join columns will only appear once in the output,
+ * i.e. similar to SQL's `JOIN USING` syntax.
+ *
+ * {{{
+ * // Joining df1 and df2 using the columns "user_id" and "user_name"
+ * df1.join(df2, Seq("user_id", "user_name"))
+ * }}}
+ *
+ * Note that if you perform a self-join using this function without aliasing the input
+ * [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
+ * there is no way to disambiguate which side of the join you would like to reference.
+ *
+ * @param right Right side of the join operation.
+ * @param usingColumns Names of the columns to join on. This columns must exist on both sides.
+ * @group dfops
+ * @since 1.4.0
+ */
+ def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = {
// Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
// by creating a new instance for one of the branch.
val joined = sqlContext.executePlan(
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)).analyzed.asInstanceOf[Join]
- // Project only one of the join column.
- val joinedCol = joined.right.resolve(usingColumn)
+ // Project only one of the join columns.
+ val joinedCols = usingColumns.map(col => joined.right.resolve(col))
+ val condition = usingColumns.map { col =>
+ catalyst.expressions.EqualTo(joined.left.resolve(col), joined.right.resolve(col))
+ }.reduceLeftOption[catalyst.expressions.BinaryExpression] { (cond, eqTo) =>
+ catalyst.expressions.And(cond, eqTo)
+ }
+
Project(
- joined.output.filterNot(_ == joinedCol),
+ joined.output.filterNot(joinedCols.contains(_)),
Join(
joined.left,
joined.right,
joinType = Inner,
- Some(catalyst.expressions.EqualTo(
- joined.left.resolve(usingColumn),
- joined.right.resolve(usingColumn))))
+ condition)
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 051d13e9a5..6165764632 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -34,6 +34,15 @@ class DataFrameJoinSuite extends QueryTest {
Row(1, "1", "2") :: Row(2, "2", "3") :: Row(3, "3", "4") :: Nil)
}
+ test("join - join using multiple columns") {
+ val df = Seq(1, 2, 3).map(i => (i, i + 1, i.toString)).toDF("int", "int2", "str")
+ val df2 = Seq(1, 2, 3).map(i => (i, i + 1, (i + 1).toString)).toDF("int", "int2", "str")
+
+ checkAnswer(
+ df.join(df2, Seq("int", "int2")),
+ Row(1, 2, "1", "2") :: Row(2, 3, "2", "3") :: Row(3, 4, "3", "4") :: Nil)
+ }
+
test("join - join using self join") {
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")