From 3ac648289c543b56937d67b5df5c3e228ef47cbd Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Tue, 19 Jan 2016 16:37:18 -0800 Subject: [SPARK-12337][SPARKR] Implement dropDuplicates() method of DataFrame in SparkR. Author: Sun Rui Closes #10309 from sun-rui/SPARK-12337. --- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 30 ++++++++++++++++++++++++ R/pkg/R/generics.R | 7 ++++++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 38 ++++++++++++++++++++++++++++++- 4 files changed, 75 insertions(+), 1 deletion(-) (limited to 'R/pkg') diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 27d2f9822f..7739e9ea86 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -39,6 +39,7 @@ exportMethods("arrange", "describe", "dim", "distinct", + "dropDuplicates", "dropna", "dtypes", "except", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 35695b9df1..629c1ce2ed 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1645,6 +1645,36 @@ setMethod("where", filter(x, condition) }) +#' dropDuplicates +#' +#' Returns a new DataFrame with duplicate rows removed, considering only +#' the subset of columns. +#' +#' @param x A DataFrame. +#' @param colnames A character vector of column names. +#' @return A DataFrame with duplicate rows removed. +#' @family DataFrame functions +#' @rdname dropduplicates +#' @name dropDuplicates +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' path <- "path/to/file.json" +#' df <- read.json(sqlContext, path) +#' dropDuplicates(df) +#' dropDuplicates(df, c("col1", "col2")) +#' } +setMethod("dropDuplicates", + signature(x = "DataFrame"), + function(x, colNames = columns(x)) { + stopifnot(class(colNames) == "character") + + sdf <- callJMethod(x@sdf, "dropDuplicates", as.list(colNames)) + dataFrame(sdf) + }) + #' Join #' #' Join two DataFrames based on the given join expression. diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 860329988f..d616266ead 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -428,6 +428,13 @@ setGeneric("corr", function(x, ...) {standardGeneric("corr") }) #' @export setGeneric("describe", function(x, col, ...) { standardGeneric("describe") }) +#' @rdname dropduplicates +#' @export +setGeneric("dropDuplicates", + function(x, colNames = columns(x)) { + standardGeneric("dropDuplicates") + }) + #' @rdname nafunctions #' @export setGeneric("dropna", diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 67ecdbc522..6610734cf4 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -734,7 +734,7 @@ test_that("head() and first() return the correct data", { expect_equal(ncol(testFirst), 2) }) -test_that("distinct() and unique on DataFrames", { +test_that("distinct(), unique() and dropDuplicates() on DataFrames", { lines <- c("{\"name\":\"Michael\"}", "{\"name\":\"Andy\", \"age\":30}", "{\"name\":\"Justin\", \"age\":19}", @@ -750,6 +750,42 @@ test_that("distinct() and unique on DataFrames", { uniques2 <- unique(df) expect_is(uniques2, "DataFrame") expect_equal(count(uniques2), 3) + + # Test dropDuplicates() + df <- createDataFrame( + sqlContext, + list( + list(2, 1, 2), list(1, 1, 1), + list(1, 2, 1), list(2, 1, 2), + list(2, 2, 2), list(2, 2, 1), + list(2, 1, 1), list(1, 1, 2), + list(1, 2, 2), list(1, 2, 1)), + schema = c("key", "value1", "value2")) + result <- collect(dropDuplicates(df)) + expected <- rbind.data.frame( + c(1, 1, 1), c(1, 1, 2), c(1, 2, 1), + c(1, 2, 2), c(2, 1, 1), c(2, 1, 2), + c(2, 2, 1), c(2, 2, 2)) + names(expected) <- c("key", "value1", "value2") + expect_equivalent( + result[order(result$key, result$value1, result$value2),], + expected) + + result <- collect(dropDuplicates(df, c("key", "value1"))) + expected <- rbind.data.frame( + c(1, 1, 1), c(1, 2, 1), c(2, 1, 2), c(2, 2, 2)) + names(expected) <- c("key", "value1", "value2") + expect_equivalent( + result[order(result$key, result$value1, result$value2),], + expected) + + result <- collect(dropDuplicates(df, "key")) + expected <- rbind.data.frame( + c(1, 1, 1), c(2, 1, 2)) + names(expected) <- c("key", "value1", "value2") + expect_equivalent( + result[order(result$key, result$value1, result$value2),], + expected) }) test_that("sample on a DataFrame", { -- cgit v1.2.3