diff options
author | Shivaram Venkataraman <shivaram@cs.berkeley.edu> | 2015-06-05 10:19:03 -0700 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@cs.berkeley.edu> | 2015-06-05 10:19:03 -0700 |
commit | 12f5eaeee1235850a076ce5716d069bd2f1205a5 (patch) | |
tree | 6d19fda4584e07e07a97388987891a88ff468d1c /R | |
parent | bc0d76a246cc534234b96a661d70feb94b26538c (diff) | |
download | spark-12f5eaeee1235850a076ce5716d069bd2f1205a5.tar.gz spark-12f5eaeee1235850a076ce5716d069bd2f1205a5.tar.bz2 spark-12f5eaeee1235850a076ce5716d069bd2f1205a5.zip |
[SPARK-8085] [SPARKR] Support user-specified schema in read.df
cc davies sun-rui
Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
Closes #6620 from shivaram/sparkr-read-schema and squashes the following commits:
16a6726 [Shivaram Venkataraman] Fix loadDF to pass schema Also add a unit test
a229877 [Shivaram Venkataraman] Use wrapper function to DataFrameReader
ee70ba8 [Shivaram Venkataraman] Support user-specified schema in read.df
Diffstat (limited to 'R')
-rw-r--r-- | R/pkg/R/SQLContext.R | 14 | ||||
-rw-r--r-- | R/pkg/inst/tests/test_sparkSQL.R | 13 |
2 files changed, 23 insertions, 4 deletions
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 88e1a508f3..22a4b5bf86 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -452,7 +452,7 @@ dropTempTable <- function(sqlContext, tableName) { #' df <- read.df(sqlContext, "path/to/file.json", source = "json") #' } -read.df <- function(sqlContext, path = NULL, source = NULL, ...) { +read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) { options <- varargsToEnv(...) if (!is.null(path)) { options[['path']] <- path @@ -462,15 +462,21 @@ read.df <- function(sqlContext, path = NULL, source = NULL, ...) { source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default", "org.apache.spark.sql.parquet") } - sdf <- callJMethod(sqlContext, "load", source, options) + if (!is.null(schema)) { + stopifnot(class(schema) == "structType") + sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sqlContext, source, + schema$jobj, options) + } else { + sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sqlContext, source, options) + } dataFrame(sdf) } #' @aliases loadDF #' @export -loadDF <- function(sqlContext, path = NULL, source = NULL, ...) { - read.df(sqlContext, path, source, ...) +loadDF <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) { + read.df(sqlContext, path, source, schema, ...) } #' Create an external table diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index d2d82e791e..30edfc8a7b 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -504,6 +504,19 @@ test_that("read.df() from json file", { df <- read.df(sqlContext, jsonPath, "json") expect_true(inherits(df, "DataFrame")) expect_true(count(df) == 3) + + # Check if we can apply a user defined schema + schema <- structType(structField("name", type = "string"), + structField("age", type = "double")) + + df1 <- read.df(sqlContext, jsonPath, "json", schema) + expect_true(inherits(df1, "DataFrame")) + expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double"))) + + # Run the same with loadDF + df2 <- loadDF(sqlContext, jsonPath, "json", schema) + expect_true(inherits(df2, "DataFrame")) + expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double"))) }) test_that("write.df() as parquet file", { |