aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--R/pkg/NAMESPACE2
-rw-r--r--R/pkg/R/DataFrame.R125
-rw-r--r--R/pkg/R/generics.R18
-rw-r--r--R/pkg/R/serialize.R10
-rw-r--r--R/pkg/inst/tests/test_sparkSQL.R109
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/SerDe.scala6
6 files changed, 267 insertions, 3 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 411126a377..f9447f6c32 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -19,9 +19,11 @@ exportMethods("arrange",
"count",
"describe",
"distinct",
+ "dropna",
"dtypes",
"except",
"explain",
+ "fillna",
"filter",
"first",
"group_by",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index e79d324838..0af5cb8881 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1429,3 +1429,128 @@ setMethod("describe",
sdf <- callJMethod(x@sdf, "describe", listToSeq(colList))
dataFrame(sdf)
})
+
+#' dropna
+#'
+#' Returns a new DataFrame omitting rows with null values.
+#'
+#' @param x A SparkSQL DataFrame.
+#' @param how "any" or "all".
+#' if "any", drop a row if it contains any nulls.
+#' if "all", drop a row only if all its values are null.
+#' if minNonNulls is specified, how is ignored.
+#' @param minNonNulls If specified, drop rows that have less than
+#' minNonNulls non-null values.
+#' This overwrites the how parameter.
+#' @param cols Optional list of column names to consider.
+#' @return A DataFrame
+#'
+#' @rdname nafunctions
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' dropna(df)
+#' }
+setMethod("dropna",
+ signature(x = "DataFrame"),
+ function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
+ how <- match.arg(how)
+ if (is.null(cols)) {
+ cols <- columns(x)
+ }
+ if (is.null(minNonNulls)) {
+ minNonNulls <- if (how == "any") { length(cols) } else { 1 }
+ }
+
+ naFunctions <- callJMethod(x@sdf, "na")
+ sdf <- callJMethod(naFunctions, "drop",
+ as.integer(minNonNulls), listToSeq(as.list(cols)))
+ dataFrame(sdf)
+ })
+
+#' @aliases dropna
+#' @export
+setMethod("na.omit",
+ signature(x = "DataFrame"),
+ function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
+ dropna(x, how, minNonNulls, cols)
+ })
+
+#' fillna
+#'
+#' Replace null values.
+#'
+#' @param x A SparkSQL DataFrame.
+#' @param value Value to replace null values with.
+#' Should be an integer, numeric, character or named list.
+#' If the value is a named list, then cols is ignored and
+#' value must be a mapping from column name (character) to
+#' replacement value. The replacement value must be an
+#' integer, numeric or character.
+#' @param cols optional list of column names to consider.
+#' Columns specified in cols that do not have matching data
+#' type are ignored. For example, if value is a character, and
+#' subset contains a non-character column, then the non-character
+#' column is simply ignored.
+#' @return A DataFrame
+#'
+#' @rdname nafunctions
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' fillna(df, 1)
+#' fillna(df, list("age" = 20, "name" = "unknown"))
+#' }
+setMethod("fillna",
+ signature(x = "DataFrame"),
+ function(x, value, cols = NULL) {
+ if (!(class(value) %in% c("integer", "numeric", "character", "list"))) {
+ stop("value should be an integer, numeric, charactor or named list.")
+ }
+
+ if (class(value) == "list") {
+ # Check column names in the named list
+ colNames <- names(value)
+ if (length(colNames) == 0 || !all(colNames != "")) {
+ stop("value should be an a named list with each name being a column name.")
+ }
+
+ # Convert to the named list to an environment to be passed to JVM
+ valueMap <- new.env()
+ for (col in colNames) {
+ # Check each item in the named list is of valid type
+ v <- value[[col]]
+ if (!(class(v) %in% c("integer", "numeric", "character"))) {
+ stop("Each item in value should be an integer, numeric or charactor.")
+ }
+ valueMap[[col]] <- v
+ }
+
+ # When value is a named list, caller is expected not to pass in cols
+ if (!is.null(cols)) {
+ warning("When value is a named list, cols is ignored!")
+ cols <- NULL
+ }
+
+ value <- valueMap
+ } else if (is.integer(value)) {
+ # Cast an integer to a numeric
+ value <- as.numeric(value)
+ }
+
+ naFunctions <- callJMethod(x@sdf, "na")
+ sdf <- if (length(cols) == 0) {
+ callJMethod(naFunctions, "fill", value)
+ } else {
+ callJMethod(naFunctions, "fill", value, listToSeq(as.list(cols)))
+ }
+ dataFrame(sdf)
+ })
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 1f4fc6adac..12e09176c9 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -396,6 +396,20 @@ setGeneric("columns", function(x) {standardGeneric("columns") })
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
+#' @rdname nafunctions
+#' @export
+setGeneric("dropna",
+ function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
+ standardGeneric("dropna")
+ })
+
+#' @rdname nafunctions
+#' @export
+setGeneric("na.omit",
+ function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
+ standardGeneric("na.omit")
+ })
+
#' @rdname schema
#' @export
setGeneric("dtypes", function(x) { standardGeneric("dtypes") })
@@ -408,6 +422,10 @@ setGeneric("explain", function(x, ...) { standardGeneric("explain") })
#' @export
setGeneric("except", function(x, y) { standardGeneric("except") })
+#' @rdname nafunctions
+#' @export
+setGeneric("fillna", function(x, value, cols = NULL) { standardGeneric("fillna") })
+
#' @rdname filter
#' @export
setGeneric("filter", function(x, condition) { standardGeneric("filter") })
diff --git a/R/pkg/R/serialize.R b/R/pkg/R/serialize.R
index c53d0a9610..2081786e6f 100644
--- a/R/pkg/R/serialize.R
+++ b/R/pkg/R/serialize.R
@@ -160,6 +160,14 @@ writeList <- function(con, arr) {
}
}
+# Used to pass arrays where the elements can be of different types
+writeGenericList <- function(con, list) {
+ writeInt(con, length(list))
+ for (elem in list) {
+ writeObject(con, elem)
+ }
+}
+
# Used to pass in hash maps required on Java side.
writeEnv <- function(con, env) {
len <- length(env)
@@ -168,7 +176,7 @@ writeEnv <- function(con, env) {
if (len > 0) {
writeList(con, as.list(ls(env)))
vals <- lapply(ls(env), function(x) { env[[x]] })
- writeList(con, as.list(vals))
+ writeGenericList(con, as.list(vals))
}
}
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index 1857e636e8..d2d82e791e 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -32,6 +32,15 @@ jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
parquetPath <- tempfile(pattern="sparkr-test", fileext=".parquet")
writeLines(mockLines, jsonPath)
+# For test nafunctions, like dropna(), fillna(),...
+mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
+ "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
+ "{\"name\":\"David\",\"age\":60,\"height\":null}",
+ "{\"name\":\"Amy\",\"age\":null,\"height\":null}",
+ "{\"name\":null,\"age\":null,\"height\":null}")
+jsonPathNa <- tempfile(pattern="sparkr-test", fileext=".tmp")
+writeLines(mockLinesNa, jsonPathNa)
+
test_that("infer types", {
expect_equal(infer_type(1L), "integer")
expect_equal(infer_type(1.0), "double")
@@ -765,5 +774,105 @@ test_that("describe() on a DataFrame", {
expect_equal(collect(stats)[5, "age"], "30")
})
+test_that("dropna() on a DataFrame", {
+ df <- jsonFile(sqlContext, jsonPathNa)
+ rows <- collect(df)
+
+ # drop with columns
+
+ expected <- rows[!is.na(rows$name),]
+ actual <- collect(dropna(df, cols = "name"))
+ expect_true(identical(expected, actual))
+
+ expected <- rows[!is.na(rows$age),]
+ actual <- collect(dropna(df, cols = "age"))
+ row.names(expected) <- row.names(actual)
+ # identical on two dataframes does not work here. Don't know why.
+ # use identical on all columns as a workaround.
+ expect_true(identical(expected$age, actual$age))
+ expect_true(identical(expected$height, actual$height))
+ expect_true(identical(expected$name, actual$name))
+
+ expected <- rows[!is.na(rows$age) & !is.na(rows$height),]
+ actual <- collect(dropna(df, cols = c("age", "height")))
+ expect_true(identical(expected, actual))
+
+ expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
+ actual <- collect(dropna(df))
+ expect_true(identical(expected, actual))
+
+ # drop with how
+
+ expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
+ actual <- collect(dropna(df))
+ expect_true(identical(expected, actual))
+
+ expected <- rows[!is.na(rows$age) | !is.na(rows$height) | !is.na(rows$name),]
+ actual <- collect(dropna(df, "all"))
+ expect_true(identical(expected, actual))
+
+ expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
+ actual <- collect(dropna(df, "any"))
+ expect_true(identical(expected, actual))
+
+ expected <- rows[!is.na(rows$age) & !is.na(rows$height),]
+ actual <- collect(dropna(df, "any", cols = c("age", "height")))
+ expect_true(identical(expected, actual))
+
+ expected <- rows[!is.na(rows$age) | !is.na(rows$height),]
+ actual <- collect(dropna(df, "all", cols = c("age", "height")))
+ expect_true(identical(expected, actual))
+
+ # drop with threshold
+
+ expected <- rows[as.integer(!is.na(rows$age)) + as.integer(!is.na(rows$height)) >= 2,]
+ actual <- collect(dropna(df, minNonNulls = 2, cols = c("age", "height")))
+ expect_true(identical(expected, actual))
+
+ expected <- rows[as.integer(!is.na(rows$age)) +
+ as.integer(!is.na(rows$height)) +
+ as.integer(!is.na(rows$name)) >= 3,]
+ actual <- collect(dropna(df, minNonNulls = 3, cols = c("name", "age", "height")))
+ expect_true(identical(expected, actual))
+})
+
+test_that("fillna() on a DataFrame", {
+ df <- jsonFile(sqlContext, jsonPathNa)
+ rows <- collect(df)
+
+ # fill with value
+
+ expected <- rows
+ expected$age[is.na(expected$age)] <- 50
+ expected$height[is.na(expected$height)] <- 50.6
+ actual <- collect(fillna(df, 50.6))
+ expect_true(identical(expected, actual))
+
+ expected <- rows
+ expected$name[is.na(expected$name)] <- "unknown"
+ actual <- collect(fillna(df, "unknown"))
+ expect_true(identical(expected, actual))
+
+ expected <- rows
+ expected$age[is.na(expected$age)] <- 50
+ actual <- collect(fillna(df, 50.6, "age"))
+ expect_true(identical(expected, actual))
+
+ expected <- rows
+ expected$name[is.na(expected$name)] <- "unknown"
+ actual <- collect(fillna(df, "unknown", c("age", "name")))
+ expect_true(identical(expected, actual))
+
+ # fill with named list
+
+ expected <- rows
+ expected$age[is.na(expected$age)] <- 50
+ expected$height[is.na(expected$height)] <- 50.6
+ expected$name[is.na(expected$name)] <- "unknown"
+ actual <- collect(fillna(df, list("age" = 50, "height" = 50.6, "name" = "unknown")))
+ expect_true(identical(expected, actual))
+})
+
unlink(parquetPath)
unlink(jsonPath)
+unlink(jsonPathNa)
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
index 371dfe454d..f8e3f1a790 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -157,9 +157,11 @@ private[spark] object SerDe {
val keysLen = readInt(in)
val keys = (0 until keysLen).map(_ => readTypedObject(in, keysType))
- val valuesType = readObjectType(in)
val valuesLen = readInt(in)
- val values = (0 until valuesLen).map(_ => readTypedObject(in, valuesType))
+ val values = (0 until valuesLen).map(_ => {
+ val valueType = readObjectType(in)
+ readTypedObject(in, valueType)
+ })
mapAsJavaMap(keys.zip(values).toMap)
} else {
new java.util.HashMap[Object, Object]()