aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorNarine Kokhlikyan <narine.kokhlikyan@gmail.com>2016-06-15 21:42:05 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2016-06-15 21:42:05 -0700
commit7c6c6926376c93acc42dd56a399d816f4838f28c (patch)
treebbf8f9dc1d7a044b890b6c95fdd3a17aa76fea89 /R
parentb75f454f946714b93fe561055cd53b0686187d2e (diff)
downloadspark-7c6c6926376c93acc42dd56a399d816f4838f28c.tar.gz
spark-7c6c6926376c93acc42dd56a399d816f4838f28c.tar.bz2
spark-7c6c6926376c93acc42dd56a399d816f4838f28c.zip
[SPARK-12922][SPARKR][WIP] Implement gapply() on DataFrame in SparkR
## What changes were proposed in this pull request? gapply() applies an R function on groups grouped by one or more columns of a DataFrame, and returns a DataFrame. It is like GroupedDataSet.flatMapGroups() in the Dataset API. Please, let me know what do you think and if you have any ideas to improve it. Thank you! ## How was this patch tested? Unit tests. 1. Primitive test with different column types 2. Add a boolean column 3. Compute average by a group Author: Narine Kokhlikyan <narine.kokhlikyan@gmail.com> Author: NarineK <narine.kokhlikyan@us.ibm.com> Closes #12836 from NarineK/gapply2.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/NAMESPACE1
-rw-r--r--R/pkg/R/DataFrame.R82
-rw-r--r--R/pkg/R/deserialize.R30
-rw-r--r--R/pkg/R/generics.R4
-rw-r--r--R/pkg/R/group.R62
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R65
-rw-r--r--R/pkg/inst/worker/worker.R138
7 files changed, 333 insertions, 49 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index a8cf53fd46..8db4d5ca1e 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -62,6 +62,7 @@ exportMethods("arrange",
"filter",
"first",
"freqItems",
+ "gapply",
"group_by",
"groupBy",
"head",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 0ff350d44d..9a9b3f7eca 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1181,7 +1181,7 @@ dapplyInternal <- function(x, func, schema) {
#' func should have only one parameter, to which a data.frame corresponds
#' to each partition will be passed.
#' The output of func should be a data.frame.
-#' @param schema The schema of the resulting DataFrame after the function is applied.
+#' @param schema The schema of the resulting SparkDataFrame after the function is applied.
#' It must match the output of func.
#' @family SparkDataFrame functions
#' @rdname dapply
@@ -1267,6 +1267,86 @@ setMethod("dapplyCollect",
ldf
})
+#' gapply
+#'
+#' Group the SparkDataFrame using the specified columns and apply the R function to each
+#' group.
+#'
+#' @param x A SparkDataFrame
+#' @param cols Grouping columns
+#' @param func A function to be applied to each group partition specified by grouping
+#' column of the SparkDataFrame. The function `func` takes as argument
+#' a key - grouping columns and a data frame - a local R data.frame.
+#' The output of `func` is a local R data.frame.
+#' @param schema The schema of the resulting SparkDataFrame after the function is applied.
+#' The schema must match to output of `func`. It has to be defined for each
+#' output column with preferred output column name and corresponding data type.
+#' @family SparkDataFrame functions
+#' @rdname gapply
+#' @name gapply
+#' @export
+#' @examples
+#'
+#' \dontrun{
+#' Computes the arithmetic mean of the second column by grouping
+#' on the first and third columns. Output the grouping values and the average.
+#'
+#' df <- createDataFrame (
+#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
+#' c("a", "b", "c", "d"))
+#'
+#' Here our output contains three columns, the key which is a combination of two
+#' columns with data types integer and string and the mean which is a double.
+#' schema <- structType(structField("a", "integer"), structField("c", "string"),
+#' structField("avg", "double"))
+#' df1 <- gapply(
+#' df,
+#' list("a", "c"),
+#' function(key, x) {
+#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+#' },
+#' schema)
+#' collect(df1)
+#'
+#' Result
+#' ------
+#' a c avg
+#' 3 3 3.0
+#' 1 1 1.5
+#'
+#' Fits linear models on iris dataset by grouping on the 'Species' column and
+#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
+#' and 'Petal_Width' as training features.
+#'
+#' df <- createDataFrame (iris)
+#' schema <- structType(structField("(Intercept)", "double"),
+#' structField("Sepal_Width", "double"),structField("Petal_Length", "double"),
+#' structField("Petal_Width", "double"))
+#' df1 <- gapply(
+#' df,
+#' list(df$"Species"),
+#' function(key, x) {
+#' m <- suppressWarnings(lm(Sepal_Length ~
+#' Sepal_Width + Petal_Length + Petal_Width, x))
+#' data.frame(t(coef(m)))
+#' }, schema)
+#' collect(df1)
+#'
+#'Result
+#'---------
+#' Model (Intercept) Sepal_Width Petal_Length Petal_Width
+#' 1 0.699883 0.3303370 0.9455356 -0.1697527
+#' 2 1.895540 0.3868576 0.9083370 -0.6792238
+#' 3 2.351890 0.6548350 0.2375602 0.2521257
+#'
+#'}
+setMethod("gapply",
+ signature(x = "SparkDataFrame"),
+ function(x, cols, func, schema) {
+ grouped <- do.call("groupBy", c(x, cols))
+ gapply(grouped, func, schema)
+ })
+
############################## RDD Map Functions ##################################
# All of the following functions mirror the existing RDD map functions, #
# but allow for use with DataFrames by first converting to an RRDD before calling #
diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R
index ce071b1a84..0e99b171ca 100644
--- a/R/pkg/R/deserialize.R
+++ b/R/pkg/R/deserialize.R
@@ -197,6 +197,36 @@ readMultipleObjects <- function(inputCon) {
data # this is a list of named lists now
}
+readMultipleObjectsWithKeys <- function(inputCon) {
+ # readMultipleObjectsWithKeys will read multiple continuous objects from
+ # a DataOutputStream. There is no preceding field telling the count
+ # of the objects, so the number of objects varies, we try to read
+ # all objects in a loop until the end of the stream. This function
+ # is for use by gapply. Each group of rows is followed by the grouping
+ # key for this group which is then followed by next group.
+ keys <- list()
+ data <- list()
+ subData <- list()
+ while (TRUE) {
+ # If reaching the end of the stream, type returned should be "".
+ type <- readType(inputCon)
+ if (type == "") {
+ break
+ } else if (type == "r") {
+ type <- readType(inputCon)
+ # A grouping boundary detected
+ key <- readTypedObject(inputCon, type)
+ index <- length(data) + 1L
+ data[[index]] <- subData
+ keys[[index]] <- key
+ subData <- list()
+ } else {
+ subData[[length(subData) + 1L]] <- readTypedObject(inputCon, type)
+ }
+ }
+ list(keys = keys, data = data) # this is a list of keys and corresponding data
+}
+
readRowList <- function(obj) {
# readRowList is meant for use inside an lapply. As a result, it is
# necessary to open a standalone connection for the row and consume
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 50fc204f99..40a96d8991 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -454,6 +454,10 @@ setGeneric("dapply", function(x, func, schema) { standardGeneric("dapply") })
#' @export
setGeneric("dapplyCollect", function(x, func) { standardGeneric("dapplyCollect") })
+#' @rdname gapply
+#' @export
+setGeneric("gapply", function(x, ...) { standardGeneric("gapply") })
+
#' @rdname summary
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
diff --git a/R/pkg/R/group.R b/R/pkg/R/group.R
index 08f4a490c8..b704776917 100644
--- a/R/pkg/R/group.R
+++ b/R/pkg/R/group.R
@@ -142,3 +142,65 @@ createMethods <- function() {
}
createMethods()
+
+#' gapply
+#'
+#' Applies a R function to each group in the input GroupedData
+#'
+#' @param x a GroupedData
+#' @param func A function to be applied to each group partition specified by GroupedData.
+#' The function `func` takes as argument a key - grouping columns and
+#' a data frame - a local R data.frame.
+#' The output of `func` is a local R data.frame.
+#' @param schema The schema of the resulting SparkDataFrame after the function is applied.
+#' The schema must match to output of `func`. It has to be defined for each
+#' output column with preferred output column name and corresponding data type.
+#' @return a SparkDataFrame
+#' @rdname gapply
+#' @name gapply
+#' @examples
+#' \dontrun{
+#' Computes the arithmetic mean of the second column by grouping
+#' on the first and third columns. Output the grouping values and the average.
+#'
+#' df <- createDataFrame (
+#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
+#' c("a", "b", "c", "d"))
+#'
+#' Here our output contains three columns, the key which is a combination of two
+#' columns with data types integer and string and the mean which is a double.
+#' schema <- structType(structField("a", "integer"), structField("c", "string"),
+#' structField("avg", "double"))
+#' df1 <- gapply(
+#' df,
+#' list("a", "c"),
+#' function(key, x) {
+#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+#' },
+#' schema)
+#' collect(df1)
+#'
+#' Result
+#' ------
+#' a c avg
+#' 3 3 3.0
+#' 1 1 1.5
+#' }
+setMethod("gapply",
+ signature(x = "GroupedData"),
+ function(x, func, schema) {
+ try(if (is.null(schema)) stop("schema cannot be NULL"))
+ packageNamesArr <- serialize(.sparkREnv[[".packages"]],
+ connection = NULL)
+ broadcastArr <- lapply(ls(.broadcastNames),
+ function(name) { get(name, .broadcastNames) })
+ sdf <- callJStatic(
+ "org.apache.spark.sql.api.r.SQLUtils",
+ "gapply",
+ x@sgd,
+ serialize(cleanClosure(func), connection = NULL),
+ packageNamesArr,
+ broadcastArr,
+ schema$jobj)
+ dataFrame(sdf)
+ })
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index d1ca3b726f..c11930ada6 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -2146,6 +2146,71 @@ test_that("repartition by columns on DataFrame", {
expect_equal(nrow(df1), 2)
})
+test_that("gapply() on a DataFrame", {
+ df <- createDataFrame (
+ list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
+ c("a", "b", "c", "d"))
+ expected <- collect(df)
+ df1 <- gapply(df, list("a"), function(key, x) { x }, schema(df))
+ actual <- collect(df1)
+ expect_identical(actual, expected)
+
+ # Computes the sum of second column by grouping on the first and third columns
+ # and checks if the sum is larger than 2
+ schema <- structType(structField("a", "integer"), structField("e", "boolean"))
+ df2 <- gapply(
+ df,
+ list(df$"a", df$"c"),
+ function(key, x) {
+ y <- data.frame(key[1], sum(x$b) > 2)
+ },
+ schema)
+ actual <- collect(df2)$e
+ expected <- c(TRUE, TRUE)
+ expect_identical(actual, expected)
+
+ # Computes the arithmetic mean of the second column by grouping
+ # on the first and third columns. Output the groupping value and the average.
+ schema <- structType(structField("a", "integer"), structField("c", "string"),
+ structField("avg", "double"))
+ df3 <- gapply(
+ df,
+ list("a", "c"),
+ function(key, x) {
+ y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+ },
+ schema)
+ actual <- collect(df3)
+ actual <- actual[order(actual$a), ]
+ rownames(actual) <- NULL
+ expected <- collect(select(df, "a", "b", "c"))
+ expected <- data.frame(aggregate(expected$b, by = list(expected$a, expected$c), FUN = mean))
+ colnames(expected) <- c("a", "c", "avg")
+ expected <- expected[order(expected$a), ]
+ rownames(expected) <- NULL
+ expect_identical(actual, expected)
+
+ irisDF <- suppressWarnings(createDataFrame (iris))
+ schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double"))
+ # Groups by `Sepal_Length` and computes the average for `Sepal_Width`
+ df4 <- gapply(
+ cols = list("Sepal_Length"),
+ irisDF,
+ function(key, x) {
+ y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE)
+ },
+ schema)
+ actual <- collect(df4)
+ actual <- actual[order(actual$Sepal_Length), ]
+ rownames(actual) <- NULL
+ agg_local_df <- data.frame(aggregate(iris$Sepal.Width, by = list(iris$Sepal.Length), FUN = mean),
+ stringsAsFactors = FALSE)
+ colnames(agg_local_df) <- c("Sepal_Length", "Avg")
+ expected <- agg_local_df[order(agg_local_df$Sepal_Length), ]
+ rownames(expected) <- NULL
+ expect_identical(actual, expected)
+})
+
test_that("Window functions on a DataFrame", {
setHiveContext(sc)
df <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")),
diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R
index 40cda0c5ef..debf018018 100644
--- a/R/pkg/inst/worker/worker.R
+++ b/R/pkg/inst/worker/worker.R
@@ -27,6 +27,54 @@ elapsedSecs <- function() {
proc.time()[3]
}
+compute <- function(mode, partition, serializer, deserializer, key,
+ colNames, computeFunc, inputData) {
+ if (mode > 0) {
+ if (deserializer == "row") {
+ # Transform the list of rows into a data.frame
+ # Note that the optional argument stringsAsFactors for rbind is
+ # available since R 3.2.4. So we set the global option here.
+ oldOpt <- getOption("stringsAsFactors")
+ options(stringsAsFactors = FALSE)
+ inputData <- do.call(rbind.data.frame, inputData)
+ options(stringsAsFactors = oldOpt)
+
+ names(inputData) <- colNames
+ } else {
+ # Check to see if inputData is a valid data.frame
+ stopifnot(deserializer == "byte")
+ stopifnot(class(inputData) == "data.frame")
+ }
+
+ if (mode == 2) {
+ output <- computeFunc(key, inputData)
+ } else {
+ output <- computeFunc(inputData)
+ }
+ if (serializer == "row") {
+ # Transform the result data.frame back to a list of rows
+ output <- split(output, seq(nrow(output)))
+ } else {
+ # Serialize the ouput to a byte array
+ stopifnot(serializer == "byte")
+ }
+ } else {
+ output <- computeFunc(partition, inputData)
+ }
+ return (output)
+}
+
+outputResult <- function(serializer, output, outputCon) {
+ if (serializer == "byte") {
+ SparkR:::writeRawSerialize(outputCon, output)
+ } else if (serializer == "row") {
+ SparkR:::writeRowSerialize(outputCon, output)
+ } else {
+ # write lines one-by-one with flag
+ lapply(output, function(line) SparkR:::writeString(outputCon, line))
+ }
+}
+
# Constants
specialLengths <- list(END_OF_STERAM = 0L, TIMING_DATA = -1L)
@@ -79,75 +127,71 @@ if (numBroadcastVars > 0) {
# Timing broadcast
broadcastElap <- elapsedSecs()
+# Initial input timing
+inputElap <- broadcastElap
# If -1: read as normal RDD; if >= 0, treat as pairwise RDD and treat the int
# as number of partitions to create.
numPartitions <- SparkR:::readInt(inputCon)
-isDataFrame <- as.logical(SparkR:::readInt(inputCon))
+# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode
+mode <- SparkR:::readInt(inputCon)
-# If isDataFrame, then read column names
-if (isDataFrame) {
+if (mode > 0) {
colNames <- SparkR:::readObject(inputCon)
}
isEmpty <- SparkR:::readInt(inputCon)
+computeInputElapsDiff <- 0
+outputComputeElapsDiff <- 0
if (isEmpty != 0) {
-
if (numPartitions == -1) {
if (deserializer == "byte") {
# Now read as many characters as described in funcLen
data <- SparkR:::readDeserialize(inputCon)
} else if (deserializer == "string") {
data <- as.list(readLines(inputCon))
+ } else if (deserializer == "row" && mode == 2) {
+ dataWithKeys <- SparkR:::readMultipleObjectsWithKeys(inputCon)
+ keys <- dataWithKeys$keys
+ data <- dataWithKeys$data
} else if (deserializer == "row") {
data <- SparkR:::readMultipleObjects(inputCon)
}
+
# Timing reading input data for execution
inputElap <- elapsedSecs()
-
- if (isDataFrame) {
- if (deserializer == "row") {
- # Transform the list of rows into a data.frame
- # Note that the optional argument stringsAsFactors for rbind is
- # available since R 3.2.4. So we set the global option here.
- oldOpt <- getOption("stringsAsFactors")
- options(stringsAsFactors = FALSE)
- data <- do.call(rbind.data.frame, data)
- options(stringsAsFactors = oldOpt)
-
- names(data) <- colNames
- } else {
- # Check to see if data is a valid data.frame
- stopifnot(deserializer == "byte")
- stopifnot(class(data) == "data.frame")
- }
- output <- computeFunc(data)
- if (serializer == "row") {
- # Transform the result data.frame back to a list of rows
- output <- split(output, seq(nrow(output)))
- } else {
- # Serialize the ouput to a byte array
- stopifnot(serializer == "byte")
+ if (mode > 0) {
+ if (mode == 1) {
+ output <- compute(mode, partition, serializer, deserializer, NULL,
+ colNames, computeFunc, data)
+ } else {
+ # gapply mode
+ for (i in 1:length(data)) {
+ # Timing reading input data for execution
+ inputElap <- elapsedSecs()
+ output <- compute(mode, partition, serializer, deserializer, keys[[i]],
+ colNames, computeFunc, data[[i]])
+ computeElap <- elapsedSecs()
+ outputResult(serializer, output, outputCon)
+ outputElap <- elapsedSecs()
+ computeInputElapsDiff <- computeInputElapsDiff + (computeElap - inputElap)
+ outputComputeElapsDiff <- outputComputeElapsDiff + (outputElap - computeElap)
+ }
}
} else {
- output <- computeFunc(partition, data)
+ output <- compute(mode, partition, serializer, deserializer, NULL,
+ colNames, computeFunc, data)
}
-
- # Timing computing
- computeElap <- elapsedSecs()
-
- if (serializer == "byte") {
- SparkR:::writeRawSerialize(outputCon, output)
- } else if (serializer == "row") {
- SparkR:::writeRowSerialize(outputCon, output)
- } else {
- # write lines one-by-one with flag
- lapply(output, function(line) SparkR:::writeString(outputCon, line))
+ if (mode != 2) {
+ # Not a gapply mode
+ computeElap <- elapsedSecs()
+ outputResult(serializer, output, outputCon)
+ outputElap <- elapsedSecs()
+ computeInputElapsDiff <- computeElap - inputElap
+ outputComputeElapsDiff <- outputElap - computeElap
}
- # Timing output
- outputElap <- elapsedSecs()
} else {
if (deserializer == "byte") {
# Now read as many characters as described in funcLen
@@ -189,11 +233,9 @@ if (isEmpty != 0) {
}
# Timing output
outputElap <- elapsedSecs()
+ computeInputElapsDiff <- computeElap - inputElap
+ outputComputeElapsDiff <- outputElap - computeElap
}
-} else {
- inputElap <- broadcastElap
- computeElap <- broadcastElap
- outputElap <- broadcastElap
}
# Report timing
@@ -202,8 +244,8 @@ SparkR:::writeDouble(outputCon, bootTime)
SparkR:::writeDouble(outputCon, initElap - bootElap) # init
SparkR:::writeDouble(outputCon, broadcastElap - initElap) # broadcast
SparkR:::writeDouble(outputCon, inputElap - broadcastElap) # input
-SparkR:::writeDouble(outputCon, computeElap - inputElap) # compute
-SparkR:::writeDouble(outputCon, outputElap - computeElap) # output
+SparkR:::writeDouble(outputCon, computeInputElapsDiff) # compute
+SparkR:::writeDouble(outputCon, outputComputeElapsDiff) # output
# End of output
SparkR:::writeInt(outputCon, specialLengths$END_OF_STERAM)