aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/inst/worker/worker.R
diff options
context:
space:
mode:
Diffstat (limited to 'R/pkg/inst/worker/worker.R')
-rw-r--r--R/pkg/inst/worker/worker.R36
1 files changed, 35 insertions, 1 deletions
diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R
index b6784dbae3..40cda0c5ef 100644
--- a/R/pkg/inst/worker/worker.R
+++ b/R/pkg/inst/worker/worker.R
@@ -84,6 +84,13 @@ broadcastElap <- elapsedSecs()
# as number of partitions to create.
numPartitions <- SparkR:::readInt(inputCon)
+isDataFrame <- as.logical(SparkR:::readInt(inputCon))
+
+# If isDataFrame, then read column names
+if (isDataFrame) {
+ colNames <- SparkR:::readObject(inputCon)
+}
+
isEmpty <- SparkR:::readInt(inputCon)
if (isEmpty != 0) {
@@ -100,7 +107,34 @@ if (isEmpty != 0) {
# Timing reading input data for execution
inputElap <- elapsedSecs()
- output <- computeFunc(partition, data)
+ if (isDataFrame) {
+ if (deserializer == "row") {
+ # Transform the list of rows into a data.frame
+ # Note that the optional argument stringsAsFactors for rbind is
+ # available since R 3.2.4. So we set the global option here.
+ oldOpt <- getOption("stringsAsFactors")
+ options(stringsAsFactors = FALSE)
+ data <- do.call(rbind.data.frame, data)
+ options(stringsAsFactors = oldOpt)
+
+ names(data) <- colNames
+ } else {
+ # Check to see if data is a valid data.frame
+ stopifnot(deserializer == "byte")
+ stopifnot(class(data) == "data.frame")
+ }
+ output <- computeFunc(data)
+ if (serializer == "row") {
+ # Transform the result data.frame back to a list of rows
+ output <- split(output, seq(nrow(output)))
+ } else {
+ # Serialize the ouput to a byte array
+ stopifnot(serializer == "byte")
+ }
+ } else {
+ output <- computeFunc(partition, data)
+ }
+
# Timing computing
computeElap <- elapsedSecs()