aboutsummaryrefslogtreecommitdiff
path: root/R/pkg
diff options
context:
space:
mode:
authorFelix Cheung <felixcheung_m@hotmail.com>2016-10-21 12:35:37 -0700
committerFelix Cheung <felixcheung@apache.org>2016-10-21 12:35:37 -0700
commite21e1c946c4b7448fb150cfa2d9419864ae6f9b5 (patch)
tree44da3487f4b5403b889fc5e4f33a40a679f20e9a /R/pkg
parent4efdc764edfbc4971f0e863947258482ca2017df (diff)
downloadspark-e21e1c946c4b7448fb150cfa2d9419864ae6f9b5.tar.gz
spark-e21e1c946c4b7448fb150cfa2d9419864ae6f9b5.tar.bz2
spark-e21e1c946c4b7448fb150cfa2d9419864ae6f9b5.zip
[SPARK-18013][SPARKR] add crossJoin API
## What changes were proposed in this pull request? Add crossJoin and do not default to cross join if joinExpr is left out ## How was this patch tested? unit test Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #15559 from felixcheung/rcrossjoin.
Diffstat (limited to 'R/pkg')
-rw-r--r--R/pkg/NAMESPACE1
-rw-r--r--R/pkg/R/DataFrame.R59
-rw-r--r--R/pkg/R/generics.R4
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R11
4 files changed, 60 insertions, 15 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 5960c6206a..8718185171 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -71,6 +71,7 @@ exportMethods("arrange",
"covar_samp",
"covar_pop",
"createOrReplaceTempView",
+ "crossJoin",
"crosstab",
"dapply",
"dapplyCollect",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 801d2ed4e7..8910a4b138 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2271,12 +2271,13 @@ setMethod("dropDuplicates",
#' Join
#'
-#' Join two SparkDataFrames based on the given join expression.
+#' Joins two SparkDataFrames based on the given join expression.
#'
#' @param x A SparkDataFrame
#' @param y A SparkDataFrame
#' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a
-#' Column expression. If joinExpr is omitted, join() will perform a Cartesian join
+#' Column expression. If joinExpr is omitted, the default, inner join is attempted and an error is
+#' thrown if it would be a Cartesian Product. For Cartesian join, use crossJoin instead.
#' @param joinType The type of join to perform. The following join types are available:
#' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left_outer', 'left',
#' 'right_outer', 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner".
@@ -2285,23 +2286,24 @@ setMethod("dropDuplicates",
#' @aliases join,SparkDataFrame,SparkDataFrame-method
#' @rdname join
#' @name join
-#' @seealso \link{merge}
+#' @seealso \link{merge} \link{crossJoin}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
-#' join(df1, df2) # Performs a Cartesian
#' join(df1, df2, df1$col1 == df2$col2) # Performs an inner join based on expression
#' join(df1, df2, df1$col1 == df2$col2, "right_outer")
+#' join(df1, df2) # Attempts an inner join
#' }
#' @note join since 1.4.0
setMethod("join",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y, joinExpr = NULL, joinType = NULL) {
if (is.null(joinExpr)) {
- sdf <- callJMethod(x@sdf, "crossJoin", y@sdf)
+ # this may not fail until the planner checks for Cartesian join later on.
+ sdf <- callJMethod(x@sdf, "join", y@sdf)
} else {
if (class(joinExpr) != "Column") stop("joinExpr must be a Column")
if (is.null(joinType)) {
@@ -2322,22 +2324,52 @@ setMethod("join",
dataFrame(sdf)
})
+#' CrossJoin
+#'
+#' Returns Cartesian Product on two SparkDataFrames.
+#'
+#' @param x A SparkDataFrame
+#' @param y A SparkDataFrame
+#' @return A SparkDataFrame containing the result of the join operation.
+#' @family SparkDataFrame functions
+#' @aliases crossJoin,SparkDataFrame,SparkDataFrame-method
+#' @rdname crossJoin
+#' @name crossJoin
+#' @seealso \link{merge} \link{join}
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df1 <- read.json(path)
+#' df2 <- read.json(path2)
+#' crossJoin(df1, df2) # Performs a Cartesian
+#' }
+#' @note crossJoin since 2.1.0
+setMethod("crossJoin",
+ signature(x = "SparkDataFrame", y = "SparkDataFrame"),
+ function(x, y) {
+ sdf <- callJMethod(x@sdf, "crossJoin", y@sdf)
+ dataFrame(sdf)
+ })
+
#' Merges two data frames
#'
#' @name merge
-#' @param x the first data frame to be joined
-#' @param y the second data frame to be joined
+#' @param x the first data frame to be joined.
+#' @param y the second data frame to be joined.
#' @param by a character vector specifying the join columns. If by is not
#' specified, the common column names in \code{x} and \code{y} will be used.
+#' If by or both by.x and by.y are explicitly set to NULL or of length 0, the Cartesian
+#' Product of x and y will be returned.
#' @param by.x a character vector specifying the joining columns for x.
#' @param by.y a character vector specifying the joining columns for y.
#' @param all a boolean value setting \code{all.x} and \code{all.y}
#' if any of them are unset.
#' @param all.x a boolean value indicating whether all the rows in x should
-#' be including in the join
+#' be including in the join.
#' @param all.y a boolean value indicating whether all the rows in y should
-#' be including in the join
-#' @param sort a logical argument indicating whether the resulting columns should be sorted
+#' be including in the join.
+#' @param sort a logical argument indicating whether the resulting columns should be sorted.
#' @param suffixes a string vector of length 2 used to make colnames of
#' \code{x} and \code{y} unique.
#' The first element is appended to each colname of \code{x}.
@@ -2351,20 +2383,21 @@ setMethod("join",
#' @family SparkDataFrame functions
#' @aliases merge,SparkDataFrame,SparkDataFrame-method
#' @rdname merge
-#' @seealso \link{join}
+#' @seealso \link{join} \link{crossJoin}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
-#' merge(df1, df2) # Performs a Cartesian
+#' merge(df1, df2) # Performs an inner join by common columns
#' merge(df1, df2, by = "col1") # Performs an inner join based on expression
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.y = TRUE)
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE)
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE, all.y = TRUE)
#' merge(df1, df2, by.x = "col1", by.y = "col2", all = TRUE, sort = FALSE)
#' merge(df1, df2, by = "col1", all = TRUE, suffixes = c("-X", "-Y"))
+#' merge(df1, df2, by = NULL) # Performs a Cartesian join
#' }
#' @note merge since 1.5.0
setMethod("merge",
@@ -2401,7 +2434,7 @@ setMethod("merge",
joinY <- by
} else {
# if by or both by.x and by.y have length 0, use Cartesian Product
- joinRes <- join(x, y)
+ joinRes <- crossJoin(x, y)
return (joinRes)
}
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 810aea9017..5549cd7cac 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -468,6 +468,10 @@ setGeneric("createOrReplaceTempView",
standardGeneric("createOrReplaceTempView")
})
+# @rdname crossJoin
+# @export
+setGeneric("crossJoin", function(x, y) { standardGeneric("crossJoin") })
+
#' @rdname dapply
#' @export
setGeneric("dapply", function(x, func, schema) { standardGeneric("dapply") })
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 1c806869e9..3a987cd862 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1572,7 +1572,7 @@ test_that("filter() on a DataFrame", {
#expect_true(is.ts(filter(1:100, rep(1, 3)))) # nolint
})
-test_that("join() and merge() on a DataFrame", {
+test_that("join(), crossJoin() and merge() on a DataFrame", {
df <- read.json(jsonPath)
mockLines2 <- c("{\"name\":\"Michael\", \"test\": \"yes\"}",
@@ -1583,7 +1583,14 @@ test_that("join() and merge() on a DataFrame", {
writeLines(mockLines2, jsonPath2)
df2 <- read.json(jsonPath2)
- joined <- join(df, df2)
+ # inner join, not cartesian join
+ expect_equal(count(where(join(df, df2), df$name == df2$name)), 3)
+ # cartesian join
+ expect_error(tryCatch(count(join(df, df2)), error = function(e) { stop(e) }),
+ paste0(".*(org.apache.spark.sql.AnalysisException: Detected cartesian product for",
+ " INNER join between logical plans).*"))
+
+ joined <- crossJoin(df, df2)
expect_equal(names(joined), c("age", "name", "name", "test"))
expect_equal(count(joined), 12)
expect_equal(names(collect(joined)), c("age", "name", "name", "test"))