aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorfelixcheung <felixcheung_m@hotmail.com>2015-11-28 21:16:21 -0800
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-11-28 21:16:21 -0800
commitc793d2d9a1ccc203fc103eb0636958fe8d71f471 (patch)
treea031cf5f43c0808478d738a9cde8ba412a9ab758 /R
parent28e46ab46368ea3833c8e805163893bbb6f2a265 (diff)
downloadspark-c793d2d9a1ccc203fc103eb0636958fe8d71f471.tar.gz
spark-c793d2d9a1ccc203fc103eb0636958fe8d71f471.tar.bz2
spark-c793d2d9a1ccc203fc103eb0636958fe8d71f471.zip
[SPARK-9319][SPARKR] Add support for setting column names, types
Add support for for colnames, colnames<-, coltypes<- Also added tests for names, names<- which have no test previously. I merged with PR 8984 (coltypes). Clicked the wrong thing, crewed up the PR. Recreated it here. Was #9218 shivaram sun-rui Author: felixcheung <felixcheung_m@hotmail.com> Closes #9654 from felixcheung/colnamescoltypes.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/NAMESPACE6
-rw-r--r--R/pkg/R/DataFrame.R166
-rw-r--r--R/pkg/R/generics.R20
-rw-r--r--R/pkg/R/types.R8
-rw-r--r--R/pkg/inst/tests/test_sparkSQL.R40
5 files changed, 185 insertions, 55 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 5d04dd6aca..43e5e0119e 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -27,7 +27,10 @@ exportMethods("arrange",
"attach",
"cache",
"collect",
+ "colnames",
+ "colnames<-",
"coltypes",
+ "coltypes<-",
"columns",
"count",
"cov",
@@ -56,6 +59,7 @@ exportMethods("arrange",
"mutate",
"na.omit",
"names",
+ "names<-",
"ncol",
"nrow",
"orderBy",
@@ -276,4 +280,4 @@ export("structField",
"structType",
"structType.jobj",
"structType.structField",
- "print.structType") \ No newline at end of file
+ "print.structType")
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 8a13e7a367..f89e2682d9 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -254,6 +254,7 @@ setMethod("dtypes",
#' @family DataFrame functions
#' @rdname columns
#' @name columns
+
#' @export
#' @examples
#'\dontrun{
@@ -262,6 +263,7 @@ setMethod("dtypes",
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlContext, path)
#' columns(df)
+#' colnames(df)
#'}
setMethod("columns",
signature(x = "DataFrame"),
@@ -290,6 +292,121 @@ setMethod("names<-",
}
})
+#' @rdname columns
+#' @name colnames
+setMethod("colnames",
+ signature(x = "DataFrame"),
+ function(x) {
+ columns(x)
+ })
+
+#' @rdname columns
+#' @name colnames<-
+setMethod("colnames<-",
+ signature(x = "DataFrame", value = "character"),
+ function(x, value) {
+ sdf <- callJMethod(x@sdf, "toDF", as.list(value))
+ dataFrame(sdf)
+ })
+
+#' coltypes
+#'
+#' Get column types of a DataFrame
+#'
+#' @param x A SparkSQL DataFrame
+#' @return value A character vector with the column types of the given DataFrame
+#' @rdname coltypes
+#' @name coltypes
+#' @family DataFrame functions
+#' @export
+#' @examples
+#'\dontrun{
+#' irisDF <- createDataFrame(sqlContext, iris)
+#' coltypes(irisDF)
+#'}
+setMethod("coltypes",
+ signature(x = "DataFrame"),
+ function(x) {
+ # Get the data types of the DataFrame by invoking dtypes() function
+ types <- sapply(dtypes(x), function(x) {x[[2]]})
+
+ # Map Spark data types into R's data types using DATA_TYPES environment
+ rTypes <- sapply(types, USE.NAMES=F, FUN=function(x) {
+ # Check for primitive types
+ type <- PRIMITIVE_TYPES[[x]]
+
+ if (is.null(type)) {
+ # Check for complex types
+ for (t in names(COMPLEX_TYPES)) {
+ if (substring(x, 1, nchar(t)) == t) {
+ type <- COMPLEX_TYPES[[t]]
+ break
+ }
+ }
+
+ if (is.null(type)) {
+ stop(paste("Unsupported data type: ", x))
+ }
+ }
+ type
+ })
+
+ # Find which types don't have mapping to R
+ naIndices <- which(is.na(rTypes))
+
+ # Assign the original scala data types to the unmatched ones
+ rTypes[naIndices] <- types[naIndices]
+
+ rTypes
+ })
+
+#' coltypes
+#'
+#' Set the column types of a DataFrame.
+#'
+#' @param x A SparkSQL DataFrame
+#' @param value A character vector with the target column types for the given
+#' DataFrame. Column types can be one of integer, numeric/double, character, logical, or NA
+#' to keep that column as-is.
+#' @rdname coltypes
+#' @name coltypes<-
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlContext <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlContext, path)
+#' coltypes(df) <- c("character", "integer")
+#' coltypes(df) <- c(NA, "numeric")
+#'}
+setMethod("coltypes<-",
+ signature(x = "DataFrame", value = "character"),
+ function(x, value) {
+ cols <- columns(x)
+ ncols <- length(cols)
+ if (length(value) == 0) {
+ stop("Cannot set types of an empty DataFrame with no Column")
+ }
+ if (length(value) != ncols) {
+ stop("Length of type vector should match the number of columns for DataFrame")
+ }
+ newCols <- lapply(seq_len(ncols), function(i) {
+ col <- getColumn(x, cols[i])
+ if (!is.na(value[i])) {
+ stype <- rToSQLTypes[[value[i]]]
+ if (is.null(stype)) {
+ stop("Only atomic type is supported for column types")
+ }
+ cast(col, stype)
+ } else {
+ col
+ }
+ })
+ nx <- select(x, newCols)
+ dataFrame(nx@sdf)
+ })
+
#' Register Temporary Table
#'
#' Registers a DataFrame as a Temporary Table in the SQLContext
@@ -2102,52 +2219,3 @@ setMethod("with",
newEnv <- assignNewEnv(data)
eval(substitute(expr), envir = newEnv, enclos = newEnv)
})
-
-#' Returns the column types of a DataFrame.
-#'
-#' @name coltypes
-#' @title Get column types of a DataFrame
-#' @family dataframe_funcs
-#' @param x (DataFrame)
-#' @return value (character) A character vector with the column types of the given DataFrame
-#' @rdname coltypes
-#' @examples \dontrun{
-#' irisDF <- createDataFrame(sqlContext, iris)
-#' coltypes(irisDF)
-#' }
-setMethod("coltypes",
- signature(x = "DataFrame"),
- function(x) {
- # Get the data types of the DataFrame by invoking dtypes() function
- types <- sapply(dtypes(x), function(x) {x[[2]]})
-
- # Map Spark data types into R's data types using DATA_TYPES environment
- rTypes <- sapply(types, USE.NAMES=F, FUN=function(x) {
-
- # Check for primitive types
- type <- PRIMITIVE_TYPES[[x]]
-
- if (is.null(type)) {
- # Check for complex types
- for (t in names(COMPLEX_TYPES)) {
- if (substring(x, 1, nchar(t)) == t) {
- type <- COMPLEX_TYPES[[t]]
- break
- }
- }
-
- if (is.null(type)) {
- stop(paste("Unsupported data type: ", x))
- }
- }
- type
- })
-
- # Find which types don't have mapping to R
- naIndices <- which(is.na(rTypes))
-
- # Assign the original scala data types to the unmatched ones
- rTypes[naIndices] <- types[naIndices]
-
- rTypes
- })
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 0c305441e0..711ce38f9e 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -385,6 +385,22 @@ setGeneric("agg", function (x, ...) { standardGeneric("agg") })
#' @export
setGeneric("arrange", function(x, col, ...) { standardGeneric("arrange") })
+#' @rdname columns
+#' @export
+setGeneric("colnames", function(x, do.NULL = TRUE, prefix = "col") { standardGeneric("colnames") })
+
+#' @rdname columns
+#' @export
+setGeneric("colnames<-", function(x, value) { standardGeneric("colnames<-") })
+
+#' @rdname coltypes
+#' @export
+setGeneric("coltypes", function(x) { standardGeneric("coltypes") })
+
+#' @rdname coltypes
+#' @export
+setGeneric("coltypes<-", function(x, value) { standardGeneric("coltypes<-") })
+
#' @rdname schema
#' @export
setGeneric("columns", function(x) {standardGeneric("columns") })
@@ -1081,7 +1097,3 @@ setGeneric("attach")
#' @rdname with
#' @export
setGeneric("with")
-
-#' @rdname coltypes
-#' @export
-setGeneric("coltypes", function(x) { standardGeneric("coltypes") })
diff --git a/R/pkg/R/types.R b/R/pkg/R/types.R
index 1828c23ab0..dae4fe858b 100644
--- a/R/pkg/R/types.R
+++ b/R/pkg/R/types.R
@@ -41,3 +41,11 @@ COMPLEX_TYPES <- list(
# The full list of data types.
DATA_TYPES <- as.environment(c(as.list(PRIMITIVE_TYPES), COMPLEX_TYPES))
+
+# An environment for mapping R to Scala, names are R types and values are Scala types.
+rToSQLTypes <- as.environment(list(
+ "integer" = "integer", # in R, integer is 32bit
+ "numeric" = "double", # in R, numeric == double which is 64bit
+ "double" = "double",
+ "character" = "string",
+ "logical" = "boolean"))
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index 899fc3b977..d3b2f20bf8 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -622,6 +622,26 @@ test_that("schema(), dtypes(), columns(), names() return the correct values/form
expect_equal(testNames[2], "name")
})
+test_that("names() colnames() set the column names", {
+ df <- jsonFile(sqlContext, jsonPath)
+ names(df) <- c("col1", "col2")
+ expect_equal(colnames(df)[2], "col2")
+
+ colnames(df) <- c("col3", "col4")
+ expect_equal(names(df)[1], "col3")
+
+ # Test base::colnames base::names
+ m2 <- cbind(1, 1:4)
+ expect_equal(colnames(m2, do.NULL = FALSE), c("col1", "col2"))
+ colnames(m2) <- c("x","Y")
+ expect_equal(colnames(m2), c("x", "Y"))
+
+ z <- list(a = 1, b = "c", c = 1:3)
+ expect_equal(names(z)[3], "c")
+ names(z)[3] <- "c2"
+ expect_equal(names(z)[3], "c2")
+})
+
test_that("head() and first() return the correct data", {
df <- jsonFile(sqlContext, jsonPath)
testHead <- head(df)
@@ -1617,7 +1637,7 @@ test_that("with() on a DataFrame", {
expect_equal(nrow(sum2), 35)
})
-test_that("Method coltypes() to get R's data types of a DataFrame", {
+test_that("Method coltypes() to get and set R's data types of a DataFrame", {
expect_equal(coltypes(irisDF), c(rep("numeric", 4), "character"))
data <- data.frame(c1=c(1,2,3),
@@ -1636,6 +1656,24 @@ test_that("Method coltypes() to get R's data types of a DataFrame", {
x <- createDataFrame(sqlContext, list(list(as.environment(
list("a"="b", "c"="d", "e"="f")))))
expect_equal(coltypes(x), "map<string,string>")
+
+ df <- selectExpr(jsonFile(sqlContext, jsonPath), "name", "(age * 1.21) as age")
+ expect_equal(dtypes(df), list(c("name", "string"), c("age", "decimal(24,2)")))
+
+ df1 <- select(df, cast(df$age, "integer"))
+ coltypes(df) <- c("character", "integer")
+ expect_equal(dtypes(df), list(c("name", "string"), c("age", "int")))
+ value <- collect(df[, 2])[[3, 1]]
+ expect_equal(value, collect(df1)[[3, 1]])
+ expect_equal(value, 22)
+
+ coltypes(df) <- c(NA, "numeric")
+ expect_equal(dtypes(df), list(c("name", "string"), c("age", "double")))
+
+ expect_error(coltypes(df) <- c("character"),
+ "Length of type vector should match the number of columns for DataFrame")
+ expect_error(coltypes(df) <- c("environment", "list"),
+ "Only atomic type is supported for column types")
})
unlink(parquetPath)