diff options
Diffstat (limited to 'R/pkg/inst/worker/worker.R')
-rw-r--r-- | R/pkg/inst/worker/worker.R | 36 |
1 files changed, 35 insertions, 1 deletions
diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index b6784dbae3..40cda0c5ef 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -84,6 +84,13 @@ broadcastElap <- elapsedSecs() # as number of partitions to create. numPartitions <- SparkR:::readInt(inputCon) +isDataFrame <- as.logical(SparkR:::readInt(inputCon)) + +# If isDataFrame, then read column names +if (isDataFrame) { + colNames <- SparkR:::readObject(inputCon) +} + isEmpty <- SparkR:::readInt(inputCon) if (isEmpty != 0) { @@ -100,7 +107,34 @@ if (isEmpty != 0) { # Timing reading input data for execution inputElap <- elapsedSecs() - output <- computeFunc(partition, data) + if (isDataFrame) { + if (deserializer == "row") { + # Transform the list of rows into a data.frame + # Note that the optional argument stringsAsFactors for rbind is + # available since R 3.2.4. So we set the global option here. + oldOpt <- getOption("stringsAsFactors") + options(stringsAsFactors = FALSE) + data <- do.call(rbind.data.frame, data) + options(stringsAsFactors = oldOpt) + + names(data) <- colNames + } else { + # Check to see if data is a valid data.frame + stopifnot(deserializer == "byte") + stopifnot(class(data) == "data.frame") + } + output <- computeFunc(data) + if (serializer == "row") { + # Transform the result data.frame back to a list of rows + output <- split(output, seq(nrow(output))) + } else { + # Serialize the ouput to a byte array + stopifnot(serializer == "byte") + } + } else { + output <- computeFunc(partition, data) + } + # Timing computing computeElap <- elapsedSecs() |