aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorSun Rui <rui.sun@intel.com>2016-01-20 21:08:15 -0800
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2016-01-20 21:08:15 -0800
commit1b2a918e59addcdccdf8e011bce075cc9dd07b93 (patch)
treea4fe927c31db877acf9af143e88081b95633891b /R
parentd7415991a1c65f44ba385bc697b458125366523f (diff)
downloadspark-1b2a918e59addcdccdf8e011bce075cc9dd07b93.tar.gz
spark-1b2a918e59addcdccdf8e011bce075cc9dd07b93.tar.bz2
spark-1b2a918e59addcdccdf8e011bce075cc9dd07b93.zip
[SPARK-12204][SPARKR] Implement drop method for DataFrame in SparkR.
Author: Sun Rui <rui.sun@intel.com> Closes #10201 from sun-rui/SPARK-12204.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/NAMESPACE1
-rw-r--r--R/pkg/R/DataFrame.R77
-rw-r--r--R/pkg/R/generics.R4
-rw-r--r--R/pkg/inst/tests/testthat/test_context.R2
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R31
5 files changed, 88 insertions, 27 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 00634c1a70..2cc1544bef 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -39,6 +39,7 @@ exportMethods("arrange",
"describe",
"dim",
"distinct",
+ "drop",
"dropDuplicates",
"dropna",
"dtypes",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 629c1ce2ed..4653a73e11 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1192,23 +1192,10 @@ setMethod("$", signature(x = "DataFrame"),
setMethod("$<-", signature(x = "DataFrame"),
function(x, name, value) {
stopifnot(class(value) == "Column" || is.null(value))
- cols <- columns(x)
- if (name %in% cols) {
- if (is.null(value)) {
- cols <- Filter(function(c) { c != name }, cols)
- }
- cols <- lapply(cols, function(c) {
- if (c == name) {
- alias(value, name)
- } else {
- col(c)
- }
- })
- nx <- select(x, cols)
+
+ if (is.null(value)) {
+ nx <- drop(x, name)
} else {
- if (is.null(value)) {
- return(x)
- }
nx <- withColumn(x, name, value)
}
x@sdf <- nx@sdf
@@ -1386,12 +1373,13 @@ setMethod("selectExpr",
#' WithColumn
#'
-#' Return a new DataFrame with the specified column added.
+#' Return a new DataFrame by adding a column or replacing the existing column
+#' that has the same name.
#'
#' @param x A DataFrame
-#' @param colName A string containing the name of the new column.
+#' @param colName A column name.
#' @param col A Column expression.
-#' @return A DataFrame with the new column added.
+#' @return A DataFrame with the new column added or the existing column replaced.
#' @family DataFrame functions
#' @rdname withColumn
#' @name withColumn
@@ -1404,12 +1392,16 @@ setMethod("selectExpr",
#' path <- "path/to/file.json"
#' df <- read.json(sqlContext, path)
#' newDF <- withColumn(df, "newCol", df$col1 * 5)
+#' # Replace an existing column
+#' newDF2 <- withColumn(newDF, "newCol", newDF$col1)
#' }
setMethod("withColumn",
signature(x = "DataFrame", colName = "character", col = "Column"),
function(x, colName, col) {
- select(x, x$"*", alias(col, colName))
+ sdf <- callJMethod(x@sdf, "withColumn", colName, col@jc)
+ dataFrame(sdf)
})
+
#' Mutate
#'
#' Return a new DataFrame with the specified columns added.
@@ -2401,4 +2393,47 @@ setMethod("str",
cat(paste0("\nDisplaying first ", ncol(localDF), " columns only."))
}
}
- }) \ No newline at end of file
+ })
+
+#' drop
+#'
+#' Returns a new DataFrame with columns dropped.
+#' This is a no-op if schema doesn't contain column name(s).
+#'
+#' @param x A SparkSQL DataFrame.
+#' @param cols A character vector of column names or a Column.
+#' @return A DataFrame
+#'
+#' @family DataFrame functions
+#' @rdname drop
+#' @name drop
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- read.json(sqlCtx, path)
+#' drop(df, "col1")
+#' drop(df, c("col1", "col2"))
+#' drop(df, df$col1)
+#' }
+setMethod("drop",
+ signature(x = "DataFrame"),
+ function(x, col) {
+ stopifnot(class(col) == "character" || class(col) == "Column")
+
+ if (class(col) == "Column") {
+ sdf <- callJMethod(x@sdf, "drop", col@jc)
+ } else {
+ sdf <- callJMethod(x@sdf, "drop", as.list(col))
+ }
+ dataFrame(sdf)
+ })
+
+# Expose base::drop
+setMethod("drop",
+ signature(x = "ANY"),
+ function(x) {
+ base::drop(x)
+ })
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index d616266ead..9a8ab97bb8 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -428,6 +428,10 @@ setGeneric("corr", function(x, ...) {standardGeneric("corr") })
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
+#' @rdname drop
+#' @export
+setGeneric("drop", function(x, ...) { standardGeneric("drop") })
+
#' @rdname dropduplicates
#' @export
setGeneric("dropDuplicates",
diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R
index 3b14a497b4..ad3f9722a4 100644
--- a/R/pkg/inst/tests/testthat/test_context.R
+++ b/R/pkg/inst/tests/testthat/test_context.R
@@ -26,7 +26,7 @@ test_that("Check masked functions", {
maskedBySparkR <- masked[funcSparkROrEmpty]
namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
- "summary", "transform")
+ "summary", "transform", "drop")
expect_equal(length(maskedBySparkR), length(namesOfMasked))
expect_equal(sort(maskedBySparkR), sort(namesOfMasked))
# above are those reported as masked when `library(SparkR)`
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index a389dd71a2..e59841ab9f 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -824,11 +824,6 @@ test_that("select operators", {
df$age2 <- df$age * 2
expect_equal(columns(df), c("name", "age", "age2"))
expect_equal(count(where(df, df$age2 == df$age * 2)), 2)
-
- df$age2 <- NULL
- expect_equal(columns(df), c("name", "age"))
- df$age3 <- NULL
- expect_equal(columns(df), c("name", "age"))
})
test_that("select with column", {
@@ -854,6 +849,27 @@ test_that("select with column", {
"To select multiple columns, use a character vector or list for col")
})
+test_that("drop column", {
+ df <- select(read.json(sqlContext, jsonPath), "name", "age")
+ df1 <- drop(df, "name")
+ expect_equal(columns(df1), c("age"))
+
+ df$age2 <- df$age
+ df1 <- drop(df, c("name", "age"))
+ expect_equal(columns(df1), c("age2"))
+
+ df1 <- drop(df, df$age)
+ expect_equal(columns(df1), c("name", "age2"))
+
+ df$age2 <- NULL
+ expect_equal(columns(df), c("name", "age"))
+ df$age3 <- NULL
+ expect_equal(columns(df), c("name", "age"))
+
+ # Test to make sure base::drop is not masked
+ expect_equal(drop(1:3 %*% 2:4), 20)
+})
+
test_that("subsetting", {
# read.json returns columns in random order
df <- select(read.json(sqlContext, jsonPath), "name", "age")
@@ -1462,6 +1478,11 @@ test_that("withColumn() and withColumnRenamed()", {
expect_equal(columns(newDF)[3], "newAge")
expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 32)
+ # Replace existing column
+ newDF <- withColumn(df, "age", df$age + 2)
+ expect_equal(length(columns(newDF)), 2)
+ expect_equal(first(filter(newDF, df$name != "Michael"))$age, 32)
+
newDF2 <- withColumnRenamed(df, "age", "newerAge")
expect_equal(length(columns(newDF2)), 2)
expect_equal(columns(newDF2)[1], "newerAge")