aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorSun Rui <rui.sun@intel.com>2016-01-19 16:37:18 -0800
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2016-01-19 16:37:18 -0800
commit3ac648289c543b56937d67b5df5c3e228ef47cbd (patch)
tree74c38841840ce85970ce4883eec9a2c0cffbc546 /R
parent37fefa66cbd61bc592aba42b0ed3aefc0cf3abb0 (diff)
downloadspark-3ac648289c543b56937d67b5df5c3e228ef47cbd.tar.gz
spark-3ac648289c543b56937d67b5df5c3e228ef47cbd.tar.bz2
spark-3ac648289c543b56937d67b5df5c3e228ef47cbd.zip
[SPARK-12337][SPARKR] Implement dropDuplicates() method of DataFrame in SparkR.
Author: Sun Rui <rui.sun@intel.com> Closes #10309 from sun-rui/SPARK-12337.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/NAMESPACE1
-rw-r--r--R/pkg/R/DataFrame.R30
-rw-r--r--R/pkg/R/generics.R7
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R38
4 files changed, 75 insertions, 1 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 27d2f9822f..7739e9ea86 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -39,6 +39,7 @@ exportMethods("arrange",
"describe",
"dim",
"distinct",
+ "dropDuplicates",
"dropna",
"dtypes",
"except",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 35695b9df1..629c1ce2ed 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1645,6 +1645,36 @@ setMethod("where",
filter(x, condition)
})
+#' dropDuplicates
+#'
+#' Returns a new DataFrame with duplicate rows removed, considering only
+#' the subset of columns.
+#'
+#' @param x A DataFrame.
+#' @param colnames A character vector of column names.
+#' @return A DataFrame with duplicate rows removed.
+#' @family DataFrame functions
+#' @rdname dropduplicates
+#' @name dropDuplicates
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlContext <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- read.json(sqlContext, path)
+#' dropDuplicates(df)
+#' dropDuplicates(df, c("col1", "col2"))
+#' }
+setMethod("dropDuplicates",
+ signature(x = "DataFrame"),
+ function(x, colNames = columns(x)) {
+ stopifnot(class(colNames) == "character")
+
+ sdf <- callJMethod(x@sdf, "dropDuplicates", as.list(colNames))
+ dataFrame(sdf)
+ })
+
#' Join
#'
#' Join two DataFrames based on the given join expression.
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 860329988f..d616266ead 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -428,6 +428,13 @@ setGeneric("corr", function(x, ...) {standardGeneric("corr") })
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
+#' @rdname dropduplicates
+#' @export
+setGeneric("dropDuplicates",
+ function(x, colNames = columns(x)) {
+ standardGeneric("dropDuplicates")
+ })
+
#' @rdname nafunctions
#' @export
setGeneric("dropna",
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 67ecdbc522..6610734cf4 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -734,7 +734,7 @@ test_that("head() and first() return the correct data", {
expect_equal(ncol(testFirst), 2)
})
-test_that("distinct() and unique on DataFrames", {
+test_that("distinct(), unique() and dropDuplicates() on DataFrames", {
lines <- c("{\"name\":\"Michael\"}",
"{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"Justin\", \"age\":19}",
@@ -750,6 +750,42 @@ test_that("distinct() and unique on DataFrames", {
uniques2 <- unique(df)
expect_is(uniques2, "DataFrame")
expect_equal(count(uniques2), 3)
+
+ # Test dropDuplicates()
+ df <- createDataFrame(
+ sqlContext,
+ list(
+ list(2, 1, 2), list(1, 1, 1),
+ list(1, 2, 1), list(2, 1, 2),
+ list(2, 2, 2), list(2, 2, 1),
+ list(2, 1, 1), list(1, 1, 2),
+ list(1, 2, 2), list(1, 2, 1)),
+ schema = c("key", "value1", "value2"))
+ result <- collect(dropDuplicates(df))
+ expected <- rbind.data.frame(
+ c(1, 1, 1), c(1, 1, 2), c(1, 2, 1),
+ c(1, 2, 2), c(2, 1, 1), c(2, 1, 2),
+ c(2, 2, 1), c(2, 2, 2))
+ names(expected) <- c("key", "value1", "value2")
+ expect_equivalent(
+ result[order(result$key, result$value1, result$value2),],
+ expected)
+
+ result <- collect(dropDuplicates(df, c("key", "value1")))
+ expected <- rbind.data.frame(
+ c(1, 1, 1), c(1, 2, 1), c(2, 1, 2), c(2, 2, 2))
+ names(expected) <- c("key", "value1", "value2")
+ expect_equivalent(
+ result[order(result$key, result$value1, result$value2),],
+ expected)
+
+ result <- collect(dropDuplicates(df, "key"))
+ expected <- rbind.data.frame(
+ c(1, 1, 1), c(2, 1, 2))
+ names(expected) <- c("key", "value1", "value2")
+ expect_equivalent(
+ result[order(result$key, result$value1, result$value2),],
+ expected)
})
test_that("sample on a DataFrame", {