aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorSun Rui <rui.sun@intel.com>2015-09-16 13:20:39 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-09-16 13:20:39 -0700
commit896edb51ab7a88bbb31259e565311a9be6f2ca6d (patch)
treea21dd5c096fc1cacb309b7a5bf7ea342e0b4b4fe /R
parent5dbaf3d3911bbfa003bc75459aaad66b4f6e0c67 (diff)
downloadspark-896edb51ab7a88bbb31259e565311a9be6f2ca6d.tar.gz
spark-896edb51ab7a88bbb31259e565311a9be6f2ca6d.tar.bz2
spark-896edb51ab7a88bbb31259e565311a9be6f2ca6d.zip
[SPARK-10050] [SPARKR] Support collecting data of MapType in DataFrame.
1. Support collecting data of MapType from DataFrame. 2. Support data of MapType in createDataFrame. Author: Sun Rui <rui.sun@intel.com> Closes #8711 from sun-rui/SPARK-10050.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/R/SQLContext.R5
-rw-r--r--R/pkg/R/deserialize.R14
-rw-r--r--R/pkg/R/schema.R34
-rw-r--r--R/pkg/inst/tests/test_sparkSQL.R56
4 files changed, 86 insertions, 23 deletions
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 4ac057d0f2..1c58fd96d7 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -41,10 +41,7 @@ infer_type <- function(x) {
if (type == "map") {
stopifnot(length(x) > 0)
key <- ls(x)[[1]]
- list(type = "map",
- keyType = "string",
- valueType = infer_type(get(key, x)),
- valueContainsNull = TRUE)
+ paste0("map<string,", infer_type(get(key, x)), ">")
} else if (type == "array") {
stopifnot(length(x) > 0)
names <- names(x)
diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R
index d1858ec227..ce88d0b071 100644
--- a/R/pkg/R/deserialize.R
+++ b/R/pkg/R/deserialize.R
@@ -50,6 +50,7 @@ readTypedObject <- function(con, type) {
"t" = readTime(con),
"a" = readArray(con),
"l" = readList(con),
+ "e" = readEnv(con),
"n" = NULL,
"j" = getJobj(readString(con)),
stop(paste("Unsupported type for deserialization", type)))
@@ -121,6 +122,19 @@ readList <- function(con) {
}
}
+readEnv <- function(con) {
+ env <- new.env()
+ len <- readInt(con)
+ if (len > 0) {
+ for (i in 1:len) {
+ key <- readString(con)
+ value <- readObject(con)
+ env[[key]] <- value
+ }
+ }
+ env
+}
+
readRaw <- function(con) {
dataLen <- readInt(con)
readBin(con, raw(), as.integer(dataLen), endian = "big")
diff --git a/R/pkg/R/schema.R b/R/pkg/R/schema.R
index 62d4f73878..8df1563f8e 100644
--- a/R/pkg/R/schema.R
+++ b/R/pkg/R/schema.R
@@ -131,13 +131,33 @@ checkType <- function(type) {
if (type %in% primtiveTypes) {
return()
} else {
- m <- regexec("^array<(.*)>$", type)
- matchedStrings <- regmatches(type, m)
- if (length(matchedStrings[[1]]) >= 2) {
- elemType <- matchedStrings[[1]][2]
- checkType(elemType)
- return()
- }
+ # Check complex types
+ firstChar <- substr(type, 1, 1)
+ switch (firstChar,
+ a = {
+ # Array type
+ m <- regexec("^array<(.*)>$", type)
+ matchedStrings <- regmatches(type, m)
+ if (length(matchedStrings[[1]]) >= 2) {
+ elemType <- matchedStrings[[1]][2]
+ checkType(elemType)
+ return()
+ }
+ },
+ m = {
+ # Map type
+ m <- regexec("^map<(.*),(.*)>$", type)
+ matchedStrings <- regmatches(type, m)
+ if (length(matchedStrings[[1]]) >= 3) {
+ keyType <- matchedStrings[[1]][2]
+ if (keyType != "string" && keyType != "character") {
+ stop("Key type in a map must be string or character")
+ }
+ valueType <- matchedStrings[[1]][3]
+ checkType(valueType)
+ return()
+ }
+ })
}
stop(paste("Unsupported type for Dataframe:", type))
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index 98d4402d36..e159a69584 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -57,7 +57,7 @@ mockLinesComplexType <-
complexTypeJsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(mockLinesComplexType, complexTypeJsonPath)
-test_that("infer types", {
+test_that("infer types and check types", {
expect_equal(infer_type(1L), "integer")
expect_equal(infer_type(1.0), "double")
expect_equal(infer_type("abc"), "string")
@@ -72,9 +72,9 @@ test_that("infer types", {
checkStructField(testStruct$fields()[[2]], "b", "StringType", TRUE)
e <- new.env()
assign("a", 1L, envir = e)
- expect_equal(infer_type(e),
- list(type = "map", keyType = "string", valueType = "integer",
- valueContainsNull = TRUE))
+ expect_equal(infer_type(e), "map<string,integer>")
+
+ expect_error(checkType("map<integer,integer>"), "Key type in a map must be string or character")
})
test_that("structType and structField", {
@@ -242,7 +242,7 @@ test_that("create DataFrame with different data types", {
expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE))
})
-test_that("create DataFrame with nested array and struct", {
+test_that("create DataFrame with nested array and map", {
# e <- new.env()
# assign("n", 3L, envir = e)
# l <- list(1:10, list("a", "b"), e, list(a="aa", b=3L))
@@ -253,21 +253,35 @@ test_that("create DataFrame with nested array and struct", {
# ldf <- collect(df)
# expect_equal(ldf[1,], l[[1]])
+ # ArrayType and MapType
+ e <- new.env()
+ assign("n", 3L, envir = e)
- # ArrayType only for now
- l <- list(as.list(1:10), list("a", "b"))
- df <- createDataFrame(sqlContext, list(l), c("a", "b"))
- expect_equal(dtypes(df), list(c("a", "array<int>"), c("b", "array<string>")))
+ l <- list(as.list(1:10), list("a", "b"), e)
+ df <- createDataFrame(sqlContext, list(l), c("a", "b", "c"))
+ expect_equal(dtypes(df), list(c("a", "array<int>"),
+ c("b", "array<string>"),
+ c("c", "map<string,int>")))
expect_equal(count(df), 1)
ldf <- collect(df)
- expect_equal(names(ldf), c("a", "b"))
+ expect_equal(names(ldf), c("a", "b", "c"))
expect_equal(ldf[1, 1][[1]], l[[1]])
expect_equal(ldf[1, 2][[1]], l[[2]])
+ e <- ldf$c[[1]]
+ expect_equal(class(e), "environment")
+ expect_equal(ls(e), "n")
+ expect_equal(e$n, 3L)
})
+# For test map type in DataFrame
+mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
+ "{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",
+ "{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}")
+mapTypeJsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
+writeLines(mockLinesMapType, mapTypeJsonPath)
+
test_that("Collect DataFrame with complex types", {
- # only ArrayType now
- # TODO: tests for StructType and MapType after they are supported
+ # ArrayType
df <- jsonFile(sqlContext, complexTypeJsonPath)
ldf <- collect(df)
@@ -277,6 +291,24 @@ test_that("Collect DataFrame with complex types", {
expect_equal(ldf$c1, list(list(1, 2, 3), list(4, 5, 6), list (7, 8, 9)))
expect_equal(ldf$c2, list(list("a", "b", "c"), list("d", "e", "f"), list ("g", "h", "i")))
expect_equal(ldf$c3, list(list(1.0, 2.0, 3.0), list(4.0, 5.0, 6.0), list (7.0, 8.0, 9.0)))
+
+ # MapType
+ schema <- structType(structField("name", "string"),
+ structField("info", "map<string,double>"))
+ df <- read.df(sqlContext, mapTypeJsonPath, "json", schema)
+ expect_equal(dtypes(df), list(c("name", "string"),
+ c("info", "map<string,double>")))
+ ldf <- collect(df)
+ expect_equal(nrow(ldf), 3)
+ expect_equal(ncol(ldf), 2)
+ expect_equal(names(ldf), c("name", "info"))
+ expect_equal(ldf$name, c("Bob", "Alice", "David"))
+ bob <- ldf$info[[1]]
+ expect_equal(class(bob), "environment")
+ expect_equal(bob$age, 16)
+ expect_equal(bob$height, 176.5)
+
+ # TODO: tests for StructType after it is supported
})
test_that("jsonFile() on a local file returns a DataFrame", {