From d7d9fa0b8750166f8b74f9bc321df26908683a8b Mon Sep 17 00:00:00 2001 From: zero323 Date: Sun, 15 Nov 2015 19:15:27 -0800 Subject: [SPARK-11086][SPARKR] Use dropFactors column-wise instead of nested loop when createDataFrame Use `dropFactors` column-wise instead of nested loop when `createDataFrame` from a `data.frame` At this moment SparkR createDataFrame is using nested loop to convert factors to character when called on a local data.frame. It works but is incredibly slow especially with data.table (~ 2 orders of magnitude compared to PySpark / Pandas version on a DateFrame of size 1M rows x 2 columns). A simple improvement is to apply `dropFactor `column-wise and then reshape output list. It should at least partially address [SPARK-8277](https://issues.apache.org/jira/browse/SPARK-8277). Author: zero323 Closes #9099 from zero323/SPARK-11086. --- R/pkg/R/SQLContext.R | 54 ++++++++++++++++++++++++---------------- R/pkg/inst/tests/test_sparkSQL.R | 16 ++++++++++++ 2 files changed, 49 insertions(+), 21 deletions(-) (limited to 'R/pkg') diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index fd013fdb30..a62b25fde9 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -17,27 +17,33 @@ # SQLcontext.R: SQLContext-driven functions + +# Map top level R type to SQL type +getInternalType <- function(x) { + # class of POSIXlt is c("POSIXlt" "POSIXt") + switch(class(x)[[1]], + integer = "integer", + character = "string", + logical = "boolean", + double = "double", + numeric = "double", + raw = "binary", + list = "array", + struct = "struct", + environment = "map", + Date = "date", + POSIXlt = "timestamp", + POSIXct = "timestamp", + stop(paste("Unsupported type for DataFrame:", class(x)))) +} + #' infer the SQL type infer_type <- function(x) { if (is.null(x)) { stop("can not infer type from NULL") } - # class of POSIXlt is c("POSIXlt" "POSIXt") - type <- switch(class(x)[[1]], - integer = "integer", - character = "string", - logical = "boolean", - double = "double", - numeric = "double", - raw = "binary", - list = "array", - struct = "struct", - environment = "map", - Date = "date", - POSIXlt = "timestamp", - POSIXct = "timestamp", - stop(paste("Unsupported type for DataFrame:", class(x)))) + type <- getInternalType(x) if (type == "map") { stopifnot(length(x) > 0) @@ -90,19 +96,25 @@ createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0 if (is.null(schema)) { schema <- names(data) } - n <- nrow(data) - m <- ncol(data) + # get rid of factor type - dropFactor <- function(x) { + cleanCols <- function(x) { if (is.factor(x)) { as.character(x) } else { x } } - data <- lapply(1:n, function(i) { - lapply(1:m, function(j) { dropFactor(data[i,j]) }) - }) + + # drop factors and wrap lists + data <- setNames(lapply(data, cleanCols), NULL) + + # check if all columns have supported type + lapply(data, getInternalType) + + # convert to rows + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + data <- do.call(mapply, append(args, data)) } if (is.list(data)) { sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sqlContext) diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index af024e6183..8ff0627659 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -242,6 +242,14 @@ test_that("create DataFrame from list or data.frame", { expect_equal(count(df), 3) ldf2 <- collect(df) expect_equal(ldf$a, ldf2$a) + + irisdf <- createDataFrame(sqlContext, iris) + iris_collected <- collect(irisdf) + expect_equivalent(iris_collected[,-5], iris[,-5]) + expect_equal(iris_collected$Species, as.character(iris$Species)) + + mtcarsdf <- createDataFrame(sqlContext, mtcars) + expect_equivalent(collect(mtcarsdf), mtcars) }) test_that("create DataFrame with different data types", { @@ -283,6 +291,14 @@ test_that("create DataFrame with complex types", { expect_equal(s$b, 3L) }) +test_that("create DataFrame from a data.frame with complex types", { + ldf <- data.frame(row.names=1:2) + ldf$a_list <- list(list(1, 2), list(3, 4)) + sdf <- createDataFrame(sqlContext, ldf) + + expect_equivalent(ldf, collect(sdf)) +}) + # For test map type and struct type in DataFrame mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}", "{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}", -- cgit v1.2.3