diff options
author | Narine Kokhlikyan <narine.kokhlikyan@gmail.com> | 2016-06-15 21:42:05 -0700 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@cs.berkeley.edu> | 2016-06-15 21:42:05 -0700 |
commit | 7c6c6926376c93acc42dd56a399d816f4838f28c (patch) | |
tree | bbf8f9dc1d7a044b890b6c95fdd3a17aa76fea89 /R | |
parent | b75f454f946714b93fe561055cd53b0686187d2e (diff) | |
download | spark-7c6c6926376c93acc42dd56a399d816f4838f28c.tar.gz spark-7c6c6926376c93acc42dd56a399d816f4838f28c.tar.bz2 spark-7c6c6926376c93acc42dd56a399d816f4838f28c.zip |
[SPARK-12922][SPARKR][WIP] Implement gapply() on DataFrame in SparkR
## What changes were proposed in this pull request?
gapply() applies an R function on groups grouped by one or more columns of a DataFrame, and returns a DataFrame. It is like GroupedDataSet.flatMapGroups() in the Dataset API.
Please, let me know what do you think and if you have any ideas to improve it.
Thank you!
## How was this patch tested?
Unit tests.
1. Primitive test with different column types
2. Add a boolean column
3. Compute average by a group
Author: Narine Kokhlikyan <narine.kokhlikyan@gmail.com>
Author: NarineK <narine.kokhlikyan@us.ibm.com>
Closes #12836 from NarineK/gapply2.
Diffstat (limited to 'R')
-rw-r--r-- | R/pkg/NAMESPACE | 1 | ||||
-rw-r--r-- | R/pkg/R/DataFrame.R | 82 | ||||
-rw-r--r-- | R/pkg/R/deserialize.R | 30 | ||||
-rw-r--r-- | R/pkg/R/generics.R | 4 | ||||
-rw-r--r-- | R/pkg/R/group.R | 62 | ||||
-rw-r--r-- | R/pkg/inst/tests/testthat/test_sparkSQL.R | 65 | ||||
-rw-r--r-- | R/pkg/inst/worker/worker.R | 138 |
7 files changed, 333 insertions, 49 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index a8cf53fd46..8db4d5ca1e 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -62,6 +62,7 @@ exportMethods("arrange", "filter", "first", "freqItems", + "gapply", "group_by", "groupBy", "head", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 0ff350d44d..9a9b3f7eca 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1181,7 +1181,7 @@ dapplyInternal <- function(x, func, schema) { #' func should have only one parameter, to which a data.frame corresponds #' to each partition will be passed. #' The output of func should be a data.frame. -#' @param schema The schema of the resulting DataFrame after the function is applied. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. #' It must match the output of func. #' @family SparkDataFrame functions #' @rdname dapply @@ -1267,6 +1267,86 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Group the SparkDataFrame using the specified columns and apply the R function to each +#' group. +#' +#' @param x A SparkDataFrame +#' @param cols Grouping columns +#' @param func A function to be applied to each group partition specified by grouping +#' column of the SparkDataFrame. The function `func` takes as argument +#' a key - grouping columns and a data frame - a local R data.frame. +#' The output of `func` is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. +#' The schema must match to output of `func`. It has to be defined for each +#' output column with preferred output column name and corresponding data type. +#' @family SparkDataFrame functions +#' @rdname gapply +#' @name gapply +#' @export +#' @examples +#' +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), +#' c("a", "b", "c", "d")) +#' +#' Here our output contains three columns, the key which is a combination of two +#' columns with data types integer and string and the mean which is a double. +#' schema <- structType(structField("a", "integer"), structField("c", "string"), +#' structField("avg", "double")) +#' df1 <- gapply( +#' df, +#' list("a", "c"), +#' function(key, x) { +#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) +#' }, +#' schema) +#' collect(df1) +#' +#' Result +#' ------ +#' a c avg +#' 3 3 3.0 +#' 1 1 1.5 +#' +#' Fits linear models on iris dataset by grouping on the 'Species' column and +#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length' +#' and 'Petal_Width' as training features. +#' +#' df <- createDataFrame (iris) +#' schema <- structType(structField("(Intercept)", "double"), +#' structField("Sepal_Width", "double"),structField("Petal_Length", "double"), +#' structField("Petal_Width", "double")) +#' df1 <- gapply( +#' df, +#' list(df$"Species"), +#' function(key, x) { +#' m <- suppressWarnings(lm(Sepal_Length ~ +#' Sepal_Width + Petal_Length + Petal_Width, x)) +#' data.frame(t(coef(m))) +#' }, schema) +#' collect(df1) +#' +#'Result +#'--------- +#' Model (Intercept) Sepal_Width Petal_Length Petal_Width +#' 1 0.699883 0.3303370 0.9455356 -0.1697527 +#' 2 1.895540 0.3868576 0.9083370 -0.6792238 +#' 3 2.351890 0.6548350 0.2375602 0.2521257 +#' +#'} +setMethod("gapply", + signature(x = "SparkDataFrame"), + function(x, cols, func, schema) { + grouped <- do.call("groupBy", c(x, cols)) + gapply(grouped, func, schema) + }) + ############################## RDD Map Functions ################################## # All of the following functions mirror the existing RDD map functions, # # but allow for use with DataFrames by first converting to an RRDD before calling # diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R index ce071b1a84..0e99b171ca 100644 --- a/R/pkg/R/deserialize.R +++ b/R/pkg/R/deserialize.R @@ -197,6 +197,36 @@ readMultipleObjects <- function(inputCon) { data # this is a list of named lists now } +readMultipleObjectsWithKeys <- function(inputCon) { + # readMultipleObjectsWithKeys will read multiple continuous objects from + # a DataOutputStream. There is no preceding field telling the count + # of the objects, so the number of objects varies, we try to read + # all objects in a loop until the end of the stream. This function + # is for use by gapply. Each group of rows is followed by the grouping + # key for this group which is then followed by next group. + keys <- list() + data <- list() + subData <- list() + while (TRUE) { + # If reaching the end of the stream, type returned should be "". + type <- readType(inputCon) + if (type == "") { + break + } else if (type == "r") { + type <- readType(inputCon) + # A grouping boundary detected + key <- readTypedObject(inputCon, type) + index <- length(data) + 1L + data[[index]] <- subData + keys[[index]] <- key + subData <- list() + } else { + subData[[length(subData) + 1L]] <- readTypedObject(inputCon, type) + } + } + list(keys = keys, data = data) # this is a list of keys and corresponding data +} + readRowList <- function(obj) { # readRowList is meant for use inside an lapply. As a result, it is # necessary to open a standalone connection for the row and consume diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 50fc204f99..40a96d8991 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -454,6 +454,10 @@ setGeneric("dapply", function(x, func, schema) { standardGeneric("dapply") }) #' @export setGeneric("dapplyCollect", function(x, func) { standardGeneric("dapplyCollect") }) +#' @rdname gapply +#' @export +setGeneric("gapply", function(x, ...) { standardGeneric("gapply") }) + #' @rdname summary #' @export setGeneric("describe", function(x, col, ...) { standardGeneric("describe") }) diff --git a/R/pkg/R/group.R b/R/pkg/R/group.R index 08f4a490c8..b704776917 100644 --- a/R/pkg/R/group.R +++ b/R/pkg/R/group.R @@ -142,3 +142,65 @@ createMethods <- function() { } createMethods() + +#' gapply +#' +#' Applies a R function to each group in the input GroupedData +#' +#' @param x a GroupedData +#' @param func A function to be applied to each group partition specified by GroupedData. +#' The function `func` takes as argument a key - grouping columns and +#' a data frame - a local R data.frame. +#' The output of `func` is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. +#' The schema must match to output of `func`. It has to be defined for each +#' output column with preferred output column name and corresponding data type. +#' @return a SparkDataFrame +#' @rdname gapply +#' @name gapply +#' @examples +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), +#' c("a", "b", "c", "d")) +#' +#' Here our output contains three columns, the key which is a combination of two +#' columns with data types integer and string and the mean which is a double. +#' schema <- structType(structField("a", "integer"), structField("c", "string"), +#' structField("avg", "double")) +#' df1 <- gapply( +#' df, +#' list("a", "c"), +#' function(key, x) { +#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) +#' }, +#' schema) +#' collect(df1) +#' +#' Result +#' ------ +#' a c avg +#' 3 3 3.0 +#' 1 1 1.5 +#' } +setMethod("gapply", + signature(x = "GroupedData"), + function(x, func, schema) { + try(if (is.null(schema)) stop("schema cannot be NULL")) + packageNamesArr <- serialize(.sparkREnv[[".packages"]], + connection = NULL) + broadcastArr <- lapply(ls(.broadcastNames), + function(name) { get(name, .broadcastNames) }) + sdf <- callJStatic( + "org.apache.spark.sql.api.r.SQLUtils", + "gapply", + x@sgd, + serialize(cleanClosure(func), connection = NULL), + packageNamesArr, + broadcastArr, + schema$jobj) + dataFrame(sdf) + }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index d1ca3b726f..c11930ada6 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2146,6 +2146,71 @@ test_that("repartition by columns on DataFrame", { expect_equal(nrow(df1), 2) }) +test_that("gapply() on a DataFrame", { + df <- createDataFrame ( + list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), + c("a", "b", "c", "d")) + expected <- collect(df) + df1 <- gapply(df, list("a"), function(key, x) { x }, schema(df)) + actual <- collect(df1) + expect_identical(actual, expected) + + # Computes the sum of second column by grouping on the first and third columns + # and checks if the sum is larger than 2 + schema <- structType(structField("a", "integer"), structField("e", "boolean")) + df2 <- gapply( + df, + list(df$"a", df$"c"), + function(key, x) { + y <- data.frame(key[1], sum(x$b) > 2) + }, + schema) + actual <- collect(df2)$e + expected <- c(TRUE, TRUE) + expect_identical(actual, expected) + + # Computes the arithmetic mean of the second column by grouping + # on the first and third columns. Output the groupping value and the average. + schema <- structType(structField("a", "integer"), structField("c", "string"), + structField("avg", "double")) + df3 <- gapply( + df, + list("a", "c"), + function(key, x) { + y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) + }, + schema) + actual <- collect(df3) + actual <- actual[order(actual$a), ] + rownames(actual) <- NULL + expected <- collect(select(df, "a", "b", "c")) + expected <- data.frame(aggregate(expected$b, by = list(expected$a, expected$c), FUN = mean)) + colnames(expected) <- c("a", "c", "avg") + expected <- expected[order(expected$a), ] + rownames(expected) <- NULL + expect_identical(actual, expected) + + irisDF <- suppressWarnings(createDataFrame (iris)) + schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double")) + # Groups by `Sepal_Length` and computes the average for `Sepal_Width` + df4 <- gapply( + cols = list("Sepal_Length"), + irisDF, + function(key, x) { + y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE) + }, + schema) + actual <- collect(df4) + actual <- actual[order(actual$Sepal_Length), ] + rownames(actual) <- NULL + agg_local_df <- data.frame(aggregate(iris$Sepal.Width, by = list(iris$Sepal.Length), FUN = mean), + stringsAsFactors = FALSE) + colnames(agg_local_df) <- c("Sepal_Length", "Avg") + expected <- agg_local_df[order(agg_local_df$Sepal_Length), ] + rownames(expected) <- NULL + expect_identical(actual, expected) +}) + test_that("Window functions on a DataFrame", { setHiveContext(sc) df <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")), diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index 40cda0c5ef..debf018018 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -27,6 +27,54 @@ elapsedSecs <- function() { proc.time()[3] } +compute <- function(mode, partition, serializer, deserializer, key, + colNames, computeFunc, inputData) { + if (mode > 0) { + if (deserializer == "row") { + # Transform the list of rows into a data.frame + # Note that the optional argument stringsAsFactors for rbind is + # available since R 3.2.4. So we set the global option here. + oldOpt <- getOption("stringsAsFactors") + options(stringsAsFactors = FALSE) + inputData <- do.call(rbind.data.frame, inputData) + options(stringsAsFactors = oldOpt) + + names(inputData) <- colNames + } else { + # Check to see if inputData is a valid data.frame + stopifnot(deserializer == "byte") + stopifnot(class(inputData) == "data.frame") + } + + if (mode == 2) { + output <- computeFunc(key, inputData) + } else { + output <- computeFunc(inputData) + } + if (serializer == "row") { + # Transform the result data.frame back to a list of rows + output <- split(output, seq(nrow(output))) + } else { + # Serialize the ouput to a byte array + stopifnot(serializer == "byte") + } + } else { + output <- computeFunc(partition, inputData) + } + return (output) +} + +outputResult <- function(serializer, output, outputCon) { + if (serializer == "byte") { + SparkR:::writeRawSerialize(outputCon, output) + } else if (serializer == "row") { + SparkR:::writeRowSerialize(outputCon, output) + } else { + # write lines one-by-one with flag + lapply(output, function(line) SparkR:::writeString(outputCon, line)) + } +} + # Constants specialLengths <- list(END_OF_STERAM = 0L, TIMING_DATA = -1L) @@ -79,75 +127,71 @@ if (numBroadcastVars > 0) { # Timing broadcast broadcastElap <- elapsedSecs() +# Initial input timing +inputElap <- broadcastElap # If -1: read as normal RDD; if >= 0, treat as pairwise RDD and treat the int # as number of partitions to create. numPartitions <- SparkR:::readInt(inputCon) -isDataFrame <- as.logical(SparkR:::readInt(inputCon)) +# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode +mode <- SparkR:::readInt(inputCon) -# If isDataFrame, then read column names -if (isDataFrame) { +if (mode > 0) { colNames <- SparkR:::readObject(inputCon) } isEmpty <- SparkR:::readInt(inputCon) +computeInputElapsDiff <- 0 +outputComputeElapsDiff <- 0 if (isEmpty != 0) { - if (numPartitions == -1) { if (deserializer == "byte") { # Now read as many characters as described in funcLen data <- SparkR:::readDeserialize(inputCon) } else if (deserializer == "string") { data <- as.list(readLines(inputCon)) + } else if (deserializer == "row" && mode == 2) { + dataWithKeys <- SparkR:::readMultipleObjectsWithKeys(inputCon) + keys <- dataWithKeys$keys + data <- dataWithKeys$data } else if (deserializer == "row") { data <- SparkR:::readMultipleObjects(inputCon) } + # Timing reading input data for execution inputElap <- elapsedSecs() - - if (isDataFrame) { - if (deserializer == "row") { - # Transform the list of rows into a data.frame - # Note that the optional argument stringsAsFactors for rbind is - # available since R 3.2.4. So we set the global option here. - oldOpt <- getOption("stringsAsFactors") - options(stringsAsFactors = FALSE) - data <- do.call(rbind.data.frame, data) - options(stringsAsFactors = oldOpt) - - names(data) <- colNames - } else { - # Check to see if data is a valid data.frame - stopifnot(deserializer == "byte") - stopifnot(class(data) == "data.frame") - } - output <- computeFunc(data) - if (serializer == "row") { - # Transform the result data.frame back to a list of rows - output <- split(output, seq(nrow(output))) - } else { - # Serialize the ouput to a byte array - stopifnot(serializer == "byte") + if (mode > 0) { + if (mode == 1) { + output <- compute(mode, partition, serializer, deserializer, NULL, + colNames, computeFunc, data) + } else { + # gapply mode + for (i in 1:length(data)) { + # Timing reading input data for execution + inputElap <- elapsedSecs() + output <- compute(mode, partition, serializer, deserializer, keys[[i]], + colNames, computeFunc, data[[i]]) + computeElap <- elapsedSecs() + outputResult(serializer, output, outputCon) + outputElap <- elapsedSecs() + computeInputElapsDiff <- computeInputElapsDiff + (computeElap - inputElap) + outputComputeElapsDiff <- outputComputeElapsDiff + (outputElap - computeElap) + } } } else { - output <- computeFunc(partition, data) + output <- compute(mode, partition, serializer, deserializer, NULL, + colNames, computeFunc, data) } - - # Timing computing - computeElap <- elapsedSecs() - - if (serializer == "byte") { - SparkR:::writeRawSerialize(outputCon, output) - } else if (serializer == "row") { - SparkR:::writeRowSerialize(outputCon, output) - } else { - # write lines one-by-one with flag - lapply(output, function(line) SparkR:::writeString(outputCon, line)) + if (mode != 2) { + # Not a gapply mode + computeElap <- elapsedSecs() + outputResult(serializer, output, outputCon) + outputElap <- elapsedSecs() + computeInputElapsDiff <- computeElap - inputElap + outputComputeElapsDiff <- outputElap - computeElap } - # Timing output - outputElap <- elapsedSecs() } else { if (deserializer == "byte") { # Now read as many characters as described in funcLen @@ -189,11 +233,9 @@ if (isEmpty != 0) { } # Timing output outputElap <- elapsedSecs() + computeInputElapsDiff <- computeElap - inputElap + outputComputeElapsDiff <- outputElap - computeElap } -} else { - inputElap <- broadcastElap - computeElap <- broadcastElap - outputElap <- broadcastElap } # Report timing @@ -202,8 +244,8 @@ SparkR:::writeDouble(outputCon, bootTime) SparkR:::writeDouble(outputCon, initElap - bootElap) # init SparkR:::writeDouble(outputCon, broadcastElap - initElap) # broadcast SparkR:::writeDouble(outputCon, inputElap - broadcastElap) # input -SparkR:::writeDouble(outputCon, computeElap - inputElap) # compute -SparkR:::writeDouble(outputCon, outputElap - computeElap) # output +SparkR:::writeDouble(outputCon, computeInputElapsDiff) # compute +SparkR:::writeDouble(outputCon, outputComputeElapsDiff) # output # End of output SparkR:::writeInt(outputCon, specialLengths$END_OF_STERAM) |