aboutsummaryrefslogtreecommitdiff
path: root/R/pkg
diff options
context:
space:
mode:
authorzero323 <matthew.szymkiewicz@gmail.com>2015-11-15 19:15:27 -0800
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-11-15 19:15:27 -0800
commitd7d9fa0b8750166f8b74f9bc321df26908683a8b (patch)
treecbd4e96432c4f54ae07b5417eb97a22db7875b9a /R/pkg
parent72c1d68b4ab6acb3f85971e10947caabb4bd846d (diff)
downloadspark-d7d9fa0b8750166f8b74f9bc321df26908683a8b.tar.gz
spark-d7d9fa0b8750166f8b74f9bc321df26908683a8b.tar.bz2
spark-d7d9fa0b8750166f8b74f9bc321df26908683a8b.zip
[SPARK-11086][SPARKR] Use dropFactors column-wise instead of nested loop when createDataFrame
Use `dropFactors` column-wise instead of nested loop when `createDataFrame` from a `data.frame` At this moment SparkR createDataFrame is using nested loop to convert factors to character when called on a local data.frame. It works but is incredibly slow especially with data.table (~ 2 orders of magnitude compared to PySpark / Pandas version on a DateFrame of size 1M rows x 2 columns). A simple improvement is to apply `dropFactor `column-wise and then reshape output list. It should at least partially address [SPARK-8277](https://issues.apache.org/jira/browse/SPARK-8277). Author: zero323 <matthew.szymkiewicz@gmail.com> Closes #9099 from zero323/SPARK-11086.
Diffstat (limited to 'R/pkg')
-rw-r--r--R/pkg/R/SQLContext.R54
-rw-r--r--R/pkg/inst/tests/test_sparkSQL.R16
2 files changed, 49 insertions, 21 deletions
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index fd013fdb30..a62b25fde9 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -17,27 +17,33 @@
# SQLcontext.R: SQLContext-driven functions
+
+# Map top level R type to SQL type
+getInternalType <- function(x) {
+ # class of POSIXlt is c("POSIXlt" "POSIXt")
+ switch(class(x)[[1]],
+ integer = "integer",
+ character = "string",
+ logical = "boolean",
+ double = "double",
+ numeric = "double",
+ raw = "binary",
+ list = "array",
+ struct = "struct",
+ environment = "map",
+ Date = "date",
+ POSIXlt = "timestamp",
+ POSIXct = "timestamp",
+ stop(paste("Unsupported type for DataFrame:", class(x))))
+}
+
#' infer the SQL type
infer_type <- function(x) {
if (is.null(x)) {
stop("can not infer type from NULL")
}
- # class of POSIXlt is c("POSIXlt" "POSIXt")
- type <- switch(class(x)[[1]],
- integer = "integer",
- character = "string",
- logical = "boolean",
- double = "double",
- numeric = "double",
- raw = "binary",
- list = "array",
- struct = "struct",
- environment = "map",
- Date = "date",
- POSIXlt = "timestamp",
- POSIXct = "timestamp",
- stop(paste("Unsupported type for DataFrame:", class(x))))
+ type <- getInternalType(x)
if (type == "map") {
stopifnot(length(x) > 0)
@@ -90,19 +96,25 @@ createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0
if (is.null(schema)) {
schema <- names(data)
}
- n <- nrow(data)
- m <- ncol(data)
+
# get rid of factor type
- dropFactor <- function(x) {
+ cleanCols <- function(x) {
if (is.factor(x)) {
as.character(x)
} else {
x
}
}
- data <- lapply(1:n, function(i) {
- lapply(1:m, function(j) { dropFactor(data[i,j]) })
- })
+
+ # drop factors and wrap lists
+ data <- setNames(lapply(data, cleanCols), NULL)
+
+ # check if all columns have supported type
+ lapply(data, getInternalType)
+
+ # convert to rows
+ args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+ data <- do.call(mapply, append(args, data))
}
if (is.list(data)) {
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sqlContext)
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index af024e6183..8ff0627659 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -242,6 +242,14 @@ test_that("create DataFrame from list or data.frame", {
expect_equal(count(df), 3)
ldf2 <- collect(df)
expect_equal(ldf$a, ldf2$a)
+
+ irisdf <- createDataFrame(sqlContext, iris)
+ iris_collected <- collect(irisdf)
+ expect_equivalent(iris_collected[,-5], iris[,-5])
+ expect_equal(iris_collected$Species, as.character(iris$Species))
+
+ mtcarsdf <- createDataFrame(sqlContext, mtcars)
+ expect_equivalent(collect(mtcarsdf), mtcars)
})
test_that("create DataFrame with different data types", {
@@ -283,6 +291,14 @@ test_that("create DataFrame with complex types", {
expect_equal(s$b, 3L)
})
+test_that("create DataFrame from a data.frame with complex types", {
+ ldf <- data.frame(row.names=1:2)
+ ldf$a_list <- list(list(1, 2), list(3, 4))
+ sdf <- createDataFrame(sqlContext, ldf)
+
+ expect_equivalent(ldf, collect(sdf))
+})
+
# For test map type and struct type in DataFrame
mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
"{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",