aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@appier.com>2015-09-21 23:46:00 -0700
committerReynold Xin <rxin@databricks.com>2015-09-21 23:46:00 -0700
commit1fcefef06950e2f03477282368ca835bbf40ff24 (patch)
tree9fd7829e91af85b3f682f677721ca00b1c96eb30
parent781b21ba2a873ed29394c8dbc74fc700e3e0d17e (diff)
downloadspark-1fcefef06950e2f03477282368ca835bbf40ff24.tar.gz
spark-1fcefef06950e2f03477282368ca835bbf40ff24.tar.bz2
spark-1fcefef06950e2f03477282368ca835bbf40ff24.zip
[SPARK-10446][SQL] Support to specify join type when calling join with usingColumns
JIRA: https://issues.apache.org/jira/browse/SPARK-10446 Currently the method `join(right: DataFrame, usingColumns: Seq[String])` only supports inner join. It is more convenient to have it support other join types. Author: Liang-Chi Hsieh <viirya@appier.com> Closes #8600 from viirya/usingcolumns_df.
-rw-r--r--python/pyspark/sql/dataframe.py6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala22
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala13
3 files changed, 39 insertions, 2 deletions
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index fb995fa3a7..80f8d8a0eb 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -567,7 +567,11 @@ class DataFrame(object):
if on is None or len(on) == 0:
jdf = self._jdf.join(other._jdf)
elif isinstance(on[0], basestring):
- jdf = self._jdf.join(other._jdf, self._jseq(on))
+ if how is None:
+ jdf = self._jdf.join(other._jdf, self._jseq(on), "inner")
+ else:
+ assert isinstance(how, basestring), "how should be basestring"
+ jdf = self._jdf.join(other._jdf, self._jseq(on), how)
else:
assert isinstance(on[0], Column), "on should be Column or list of Column"
if len(on) > 1:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 8f737c2023..ba94d77b2e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -484,6 +484,26 @@ class DataFrame private[sql](
* @since 1.4.0
*/
def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = {
+ join(right, usingColumns, "inner")
+ }
+
+ /**
+ * Equi-join with another [[DataFrame]] using the given columns.
+ *
+ * Different from other join functions, the join columns will only appear once in the output,
+ * i.e. similar to SQL's `JOIN USING` syntax.
+ *
+ * Note that if you perform a self-join using this function without aliasing the input
+ * [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
+ * there is no way to disambiguate which side of the join you would like to reference.
+ *
+ * @param right Right side of the join operation.
+ * @param usingColumns Names of the columns to join on. This columns must exist on both sides.
+ * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
+ * @group dfops
+ * @since 1.6.0
+ */
+ def join(right: DataFrame, usingColumns: Seq[String], joinType: String): DataFrame = {
// Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
// by creating a new instance for one of the branch.
val joined = sqlContext.executePlan(
@@ -502,7 +522,7 @@ class DataFrame private[sql](
Join(
joined.left,
joined.right,
- joinType = Inner,
+ joinType = JoinType(joinType),
condition)
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index e2716d7841..56ad71ea4f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -42,6 +42,19 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
Row(1, 2, "1", "2") :: Row(2, 3, "2", "3") :: Row(3, 4, "3", "4") :: Nil)
}
+ test("join - join using multiple columns and specifying join type") {
+ val df = Seq(1, 2, 3).map(i => (i, i + 1, i.toString)).toDF("int", "int2", "str")
+ val df2 = Seq(1, 2, 3).map(i => (i, i + 1, (i + 1).toString)).toDF("int", "int2", "str")
+
+ checkAnswer(
+ df.join(df2, Seq("int", "str"), "left"),
+ Row(1, 2, "1", null) :: Row(2, 3, "2", null) :: Row(3, 4, "3", null) :: Nil)
+
+ checkAnswer(
+ df.join(df2, Seq("int", "str"), "right"),
+ Row(null, null, null, 2) :: Row(null, null, null, 3) :: Row(null, null, null, 4) :: Nil)
+ }
+
test("join - join using self join") {
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")