aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-06-20 11:30:26 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2016-06-20 11:30:26 -0700
commitc44bf137c7ca649e0c504229eb3e6ff7955e9a53 (patch)
treefa7fade42068841c8fb8403b0c7434eefc874e55 /R
parent36e812d4b695566437c6bac991ef06a0f81fb1c5 (diff)
downloadspark-c44bf137c7ca649e0c504229eb3e6ff7955e9a53.tar.gz
spark-c44bf137c7ca649e0c504229eb3e6ff7955e9a53.tar.bz2
spark-c44bf137c7ca649e0c504229eb3e6ff7955e9a53.zip
[SPARK-16051][R] Add `read.orc/write.orc` to SparkR
## What changes were proposed in this pull request? This issue adds `read.orc/write.orc` to SparkR for API parity. ## How was this patch tested? Pass the Jenkins tests (with new testcases). Author: Dongjoon Hyun <dongjoon@apache.org> Closes #13763 from dongjoon-hyun/SPARK-16051.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/NAMESPACE2
-rw-r--r--R/pkg/R/DataFrame.R27
-rw-r--r--R/pkg/R/SQLContext.R21
-rw-r--r--R/pkg/R/generics.R4
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R21
5 files changed, 74 insertions, 1 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index cc129a73fe..aaeab665a4 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -117,6 +117,7 @@ exportMethods("arrange",
"write.df",
"write.jdbc",
"write.json",
+ "write.orc",
"write.parquet",
"write.text",
"write.ml")
@@ -306,6 +307,7 @@ export("as.DataFrame",
"read.df",
"read.jdbc",
"read.json",
+ "read.orc",
"read.parquet",
"read.text",
"spark.lapply",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index ea091c8101..f3a3eff46d 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -701,6 +701,33 @@ setMethod("write.json",
invisible(callJMethod(write, "json", path))
})
+#' Save the contents of SparkDataFrame as an ORC file, preserving the schema.
+#'
+#' Save the contents of a SparkDataFrame as an ORC file, preserving the schema. Files written out
+#' with this method can be read back in as a SparkDataFrame using read.orc().
+#'
+#' @param x A SparkDataFrame
+#' @param path The directory where the file is saved
+#'
+#' @family SparkDataFrame functions
+#' @rdname write.orc
+#' @name write.orc
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' path <- "path/to/file.json"
+#' df <- read.json(path)
+#' write.orc(df, "/tmp/sparkr-tmp1/")
+#' }
+#' @note write.orc since 2.0.0
+setMethod("write.orc",
+ signature(x = "SparkDataFrame", path = "character"),
+ function(x, path) {
+ write <- callJMethod(x@sdf, "write")
+ invisible(callJMethod(write, "orc", path))
+ })
+
#' Save the contents of SparkDataFrame as a Parquet file, preserving the schema.
#'
#' Save the contents of a SparkDataFrame as a Parquet file, preserving the schema. Files written out
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index b0ccc42ff8..b7e1c062c7 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -330,6 +330,25 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
}
}
+#' Create a SparkDataFrame from an ORC file.
+#'
+#' Loads an ORC file, returning the result as a SparkDataFrame.
+#'
+#' @param path Path of file to read.
+#' @return SparkDataFrame
+#' @rdname read.orc
+#' @export
+#' @name read.orc
+#' @note read.orc since 2.0.0
+read.orc <- function(path) {
+ sparkSession <- getSparkSession()
+ # Allow the user to have a more flexible definiton of the ORC file path
+ path <- suppressWarnings(normalizePath(path))
+ read <- callJMethod(sparkSession, "read")
+ sdf <- callJMethod(read, "orc", path)
+ dataFrame(sdf)
+}
+
#' Create a SparkDataFrame from a Parquet file.
#'
#' Loads a Parquet file, returning the result as a SparkDataFrame.
@@ -343,7 +362,7 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
read.parquet.default <- function(path) {
sparkSession <- getSparkSession()
- # Allow the user to have a more flexible definiton of the text file path
+ # Allow the user to have a more flexible definiton of the Parquet file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
sdf <- callJMethod(read, "parquet", paths)
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 37d05560c3..dcc1cf241f 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -610,6 +610,10 @@ setGeneric("write.jdbc", function(x, url, tableName, mode = "error", ...) {
#' @export
setGeneric("write.json", function(x, path) { standardGeneric("write.json") })
+#' @rdname write.orc
+#' @export
+setGeneric("write.orc", function(x, path) { standardGeneric("write.orc") })
+
#' @rdname write.parquet
#' @export
setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") })
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index ceba0d138e..114fec6e36 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -68,6 +68,7 @@ mockLines <- c("{\"name\":\"Michael\"}",
"{\"name\":\"Justin\", \"age\":19}")
jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
+orcPath <- tempfile(pattern = "sparkr-test", fileext = ".orc")
writeLines(mockLines, jsonPath)
# For test nafunctions, like dropna(), fillna(),...
@@ -1667,6 +1668,25 @@ test_that("mutate(), transform(), rename() and names()", {
detach(airquality)
})
+test_that("read/write ORC files", {
+ df <- read.df(jsonPath, "json")
+
+ # Test write.df and read.df
+ write.df(df, orcPath, "orc", mode = "overwrite")
+ df2 <- read.df(orcPath, "orc")
+ expect_is(df2, "SparkDataFrame")
+ expect_equal(count(df), count(df2))
+
+ # Test write.orc and read.orc
+ orcPath2 <- tempfile(pattern = "orcPath2", fileext = ".orc")
+ write.orc(df, orcPath2)
+ orcDF <- read.orc(orcPath2)
+ expect_is(orcDF, "SparkDataFrame")
+ expect_equal(count(orcDF), count(df))
+
+ unlink(orcPath2)
+})
+
test_that("read/write Parquet files", {
df <- read.df(jsonPath, "json")
# Test write.df and read.df
@@ -2351,5 +2371,6 @@ test_that("enableHiveSupport on SparkSession", {
})
unlink(parquetPath)
+unlink(orcPath)
unlink(jsonPath)
unlink(jsonPathNa)