aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorSun Rui <rui.sun@intel.com>2015-05-31 15:01:21 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-05-31 15:01:59 -0700
commit46576ab303e50c54c3bd464f8939953efe644574 (patch)
tree4d5da771f4dd584b7b023a559553555d1aeb6503 /R
parent866652c903d06d1cb4356283e0741119d84dcc21 (diff)
downloadspark-46576ab303e50c54c3bd464f8939953efe644574.tar.gz
spark-46576ab303e50c54c3bd464f8939953efe644574.tar.bz2
spark-46576ab303e50c54c3bd464f8939953efe644574.zip
[SPARK-7227] [SPARKR] Support fillna / dropna in R DataFrame.
Author: Sun Rui <rui.sun@intel.com> Closes #6183 from sun-rui/SPARK-7227 and squashes the following commits: dd6f5b3 [Sun Rui] Rename readEnv() back to readMap(). Add alias na.omit() for dropna(). 41cf725 [Sun Rui] [SPARK-7227][SPARKR] Support fillna / dropna in R DataFrame.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/NAMESPACE2
-rw-r--r--R/pkg/R/DataFrame.R125
-rw-r--r--R/pkg/R/generics.R18
-rw-r--r--R/pkg/R/serialize.R10
-rw-r--r--R/pkg/inst/tests/test_sparkSQL.R109
5 files changed, 263 insertions, 1 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 411126a377..f9447f6c32 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -19,9 +19,11 @@ exportMethods("arrange",
"count",
"describe",
"distinct",
+ "dropna",
"dtypes",
"except",
"explain",
+ "fillna",
"filter",
"first",
"group_by",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index e79d324838..0af5cb8881 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1429,3 +1429,128 @@ setMethod("describe",
sdf <- callJMethod(x@sdf, "describe", listToSeq(colList))
dataFrame(sdf)
})
+
+#' dropna
+#'
+#' Returns a new DataFrame omitting rows with null values.
+#'
+#' @param x A SparkSQL DataFrame.
+#' @param how "any" or "all".
+#' if "any", drop a row if it contains any nulls.
+#' if "all", drop a row only if all its values are null.
+#' if minNonNulls is specified, how is ignored.
+#' @param minNonNulls If specified, drop rows that have less than
+#' minNonNulls non-null values.
+#' This overwrites the how parameter.
+#' @param cols Optional list of column names to consider.
+#' @return A DataFrame
+#'
+#' @rdname nafunctions
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' dropna(df)
+#' }
+setMethod("dropna",
+ signature(x = "DataFrame"),
+ function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
+ how <- match.arg(how)
+ if (is.null(cols)) {
+ cols <- columns(x)
+ }
+ if (is.null(minNonNulls)) {
+ minNonNulls <- if (how == "any") { length(cols) } else { 1 }
+ }
+
+ naFunctions <- callJMethod(x@sdf, "na")
+ sdf <- callJMethod(naFunctions, "drop",
+ as.integer(minNonNulls), listToSeq(as.list(cols)))
+ dataFrame(sdf)
+ })
+
+#' @aliases dropna
+#' @export
+setMethod("na.omit",
+ signature(x = "DataFrame"),
+ function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
+ dropna(x, how, minNonNulls, cols)
+ })
+
+#' fillna
+#'
+#' Replace null values.
+#'
+#' @param x A SparkSQL DataFrame.
+#' @param value Value to replace null values with.
+#' Should be an integer, numeric, character or named list.
+#' If the value is a named list, then cols is ignored and
+#' value must be a mapping from column name (character) to
+#' replacement value. The replacement value must be an
+#' integer, numeric or character.
+#' @param cols optional list of column names to consider.
+#' Columns specified in cols that do not have matching data
+#' type are ignored. For example, if value is a character, and
+#' subset contains a non-character column, then the non-character
+#' column is simply ignored.
+#' @return A DataFrame
+#'
+#' @rdname nafunctions
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' fillna(df, 1)
+#' fillna(df, list("age" = 20, "name" = "unknown"))
+#' }
+setMethod("fillna",
+ signature(x = "DataFrame"),
+ function(x, value, cols = NULL) {
+ if (!(class(value) %in% c("integer", "numeric", "character", "list"))) {
+ stop("value should be an integer, numeric, charactor or named list.")
+ }
+
+ if (class(value) == "list") {
+ # Check column names in the named list
+ colNames <- names(value)
+ if (length(colNames) == 0 || !all(colNames != "")) {
+ stop("value should be an a named list with each name being a column name.")
+ }
+
+ # Convert to the named list to an environment to be passed to JVM
+ valueMap <- new.env()
+ for (col in colNames) {
+ # Check each item in the named list is of valid type
+ v <- value[[col]]
+ if (!(class(v) %in% c("integer", "numeric", "character"))) {
+ stop("Each item in value should be an integer, numeric or charactor.")
+ }
+ valueMap[[col]] <- v
+ }
+
+ # When value is a named list, caller is expected not to pass in cols
+ if (!is.null(cols)) {
+ warning("When value is a named list, cols is ignored!")
+ cols <- NULL
+ }
+
+ value <- valueMap
+ } else if (is.integer(value)) {
+ # Cast an integer to a numeric
+ value <- as.numeric(value)
+ }
+
+ naFunctions <- callJMethod(x@sdf, "na")
+ sdf <- if (length(cols) == 0) {
+ callJMethod(naFunctions, "fill", value)
+ } else {
+ callJMethod(naFunctions, "fill", value, listToSeq(as.list(cols)))
+ }
+ dataFrame(sdf)
+ })
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 1f4fc6adac..12e09176c9 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -396,6 +396,20 @@ setGeneric("columns", function(x) {standardGeneric("columns") })
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
+#' @rdname nafunctions
+#' @export
+setGeneric("dropna",
+ function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
+ standardGeneric("dropna")
+ })
+
+#' @rdname nafunctions
+#' @export
+setGeneric("na.omit",
+ function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
+ standardGeneric("na.omit")
+ })
+
#' @rdname schema
#' @export
setGeneric("dtypes", function(x) { standardGeneric("dtypes") })
@@ -408,6 +422,10 @@ setGeneric("explain", function(x, ...) { standardGeneric("explain") })
#' @export
setGeneric("except", function(x, y) { standardGeneric("except") })
+#' @rdname nafunctions
+#' @export
+setGeneric("fillna", function(x, value, cols = NULL) { standardGeneric("fillna") })
+
#' @rdname filter
#' @export
setGeneric("filter", function(x, condition) { standardGeneric("filter") })
diff --git a/R/pkg/R/serialize.R b/R/pkg/R/serialize.R
index c53d0a9610..2081786e6f 100644
--- a/R/pkg/R/serialize.R
+++ b/R/pkg/R/serialize.R
@@ -160,6 +160,14 @@ writeList <- function(con, arr) {
}
}
+# Used to pass arrays where the elements can be of different types
+writeGenericList <- function(con, list) {
+ writeInt(con, length(list))
+ for (elem in list) {
+ writeObject(con, elem)
+ }
+}
+
# Used to pass in hash maps required on Java side.
writeEnv <- function(con, env) {
len <- length(env)
@@ -168,7 +176,7 @@ writeEnv <- function(con, env) {
if (len > 0) {
writeList(con, as.list(ls(env)))
vals <- lapply(ls(env), function(x) { env[[x]] })
- writeList(con, as.list(vals))
+ writeGenericList(con, as.list(vals))
}
}
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index 1857e636e8..d2d82e791e 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -32,6 +32,15 @@ jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
parquetPath <- tempfile(pattern="sparkr-test", fileext=".parquet")
writeLines(mockLines, jsonPath)
+# For test nafunctions, like dropna(), fillna(),...
+mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
+ "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
+ "{\"name\":\"David\",\"age\":60,\"height\":null}",
+ "{\"name\":\"Amy\",\"age\":null,\"height\":null}",
+ "{\"name\":null,\"age\":null,\"height\":null}")
+jsonPathNa <- tempfile(pattern="sparkr-test", fileext=".tmp")
+writeLines(mockLinesNa, jsonPathNa)
+
test_that("infer types", {
expect_equal(infer_type(1L), "integer")
expect_equal(infer_type(1.0), "double")
@@ -765,5 +774,105 @@ test_that("describe() on a DataFrame", {
expect_equal(collect(stats)[5, "age"], "30")
})
+test_that("dropna() on a DataFrame", {
+ df <- jsonFile(sqlContext, jsonPathNa)
+ rows <- collect(df)
+
+ # drop with columns
+
+ expected <- rows[!is.na(rows$name),]
+ actual <- collect(dropna(df, cols = "name"))
+ expect_true(identical(expected, actual))
+
+ expected <- rows[!is.na(rows$age),]
+ actual <- collect(dropna(df, cols = "age"))
+ row.names(expected) <- row.names(actual)
+ # identical on two dataframes does not work here. Don't know why.
+ # use identical on all columns as a workaround.
+ expect_true(identical(expected$age, actual$age))
+ expect_true(identical(expected$height, actual$height))
+ expect_true(identical(expected$name, actual$name))
+
+ expected <- rows[!is.na(rows$age) & !is.na(rows$height),]
+ actual <- collect(dropna(df, cols = c("age", "height")))
+ expect_true(identical(expected, actual))
+
+ expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
+ actual <- collect(dropna(df))
+ expect_true(identical(expected, actual))
+
+ # drop with how
+
+ expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
+ actual <- collect(dropna(df))
+ expect_true(identical(expected, actual))
+
+ expected <- rows[!is.na(rows$age) | !is.na(rows$height) | !is.na(rows$name),]
+ actual <- collect(dropna(df, "all"))
+ expect_true(identical(expected, actual))
+
+ expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
+ actual <- collect(dropna(df, "any"))
+ expect_true(identical(expected, actual))
+
+ expected <- rows[!is.na(rows$age) & !is.na(rows$height),]
+ actual <- collect(dropna(df, "any", cols = c("age", "height")))
+ expect_true(identical(expected, actual))
+
+ expected <- rows[!is.na(rows$age) | !is.na(rows$height),]
+ actual <- collect(dropna(df, "all", cols = c("age", "height")))
+ expect_true(identical(expected, actual))
+
+ # drop with threshold
+
+ expected <- rows[as.integer(!is.na(rows$age)) + as.integer(!is.na(rows$height)) >= 2,]
+ actual <- collect(dropna(df, minNonNulls = 2, cols = c("age", "height")))
+ expect_true(identical(expected, actual))
+
+ expected <- rows[as.integer(!is.na(rows$age)) +
+ as.integer(!is.na(rows$height)) +
+ as.integer(!is.na(rows$name)) >= 3,]
+ actual <- collect(dropna(df, minNonNulls = 3, cols = c("name", "age", "height")))
+ expect_true(identical(expected, actual))
+})
+
+test_that("fillna() on a DataFrame", {
+ df <- jsonFile(sqlContext, jsonPathNa)
+ rows <- collect(df)
+
+ # fill with value
+
+ expected <- rows
+ expected$age[is.na(expected$age)] <- 50
+ expected$height[is.na(expected$height)] <- 50.6
+ actual <- collect(fillna(df, 50.6))
+ expect_true(identical(expected, actual))
+
+ expected <- rows
+ expected$name[is.na(expected$name)] <- "unknown"
+ actual <- collect(fillna(df, "unknown"))
+ expect_true(identical(expected, actual))
+
+ expected <- rows
+ expected$age[is.na(expected$age)] <- 50
+ actual <- collect(fillna(df, 50.6, "age"))
+ expect_true(identical(expected, actual))
+
+ expected <- rows
+ expected$name[is.na(expected$name)] <- "unknown"
+ actual <- collect(fillna(df, "unknown", c("age", "name")))
+ expect_true(identical(expected, actual))
+
+ # fill with named list
+
+ expected <- rows
+ expected$age[is.na(expected$age)] <- 50
+ expected$height[is.na(expected$height)] <- 50.6
+ expected$name[is.na(expected$name)] <- "unknown"
+ actual <- collect(fillna(df, list("age" = 50, "height" = 50.6, "name" = "unknown")))
+ expect_true(identical(expected, actual))
+})
+
unlink(parquetPath)
unlink(jsonPath)
+unlink(jsonPathNa)