aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2016-01-06 12:05:41 +0530
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2016-01-06 12:05:41 +0530
commitd1fea41363c175a67b97cb7b3fe89f9043708739 (patch)
treeba407c27858e56e92e8e4601138418a3cdde10d4 /R
parentb3ba1be3b77e42120145252b2730a56f1d55fd21 (diff)
downloadspark-d1fea41363c175a67b97cb7b3fe89f9043708739.tar.gz
spark-d1fea41363c175a67b97cb7b3fe89f9043708739.tar.bz2
spark-d1fea41363c175a67b97cb7b3fe89f9043708739.zip
[SPARK-12393][SPARKR] Add read.text and write.text for SparkR
Add ```read.text``` and ```write.text``` for SparkR. cc sun-rui felixcheung shivaram Author: Yanbo Liang <ybliang8@gmail.com> Closes #10348 from yanboliang/spark-12393.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/NAMESPACE4
-rw-r--r--R/pkg/R/DataFrame.R28
-rw-r--r--R/pkg/R/SQLContext.R26
-rw-r--r--R/pkg/R/generics.R4
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R21
5 files changed, 82 insertions, 1 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index ccc01fe169..beacc39500 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -94,7 +94,8 @@ exportMethods("arrange",
"withColumnRenamed",
"write.df",
"write.json",
- "write.parquet")
+ "write.parquet",
+ "write.text")
exportClasses("Column")
@@ -274,6 +275,7 @@ export("as.DataFrame",
"parquetFile",
"read.df",
"read.parquet",
+ "read.text",
"sql",
"table",
"tableNames",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index c126f9efb4..3bf5bc924f 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -664,6 +664,34 @@ setMethod("saveAsParquetFile",
write.parquet(x, path)
})
+#' write.text
+#'
+#' Saves the content of the DataFrame in a text file at the specified path.
+#' The DataFrame must have only one column of string type with the name "value".
+#' Each row becomes a new line in the output file.
+#'
+#' @param x A SparkSQL DataFrame
+#' @param path The directory where the file is saved
+#'
+#' @family DataFrame functions
+#' @rdname write.text
+#' @name write.text
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlContext <- sparkRSQL.init(sc)
+#' path <- "path/to/file.txt"
+#' df <- read.text(sqlContext, path)
+#' write.text(df, "/tmp/sparkr-tmp/")
+#'}
+setMethod("write.text",
+ signature(x = "DataFrame", path = "character"),
+ function(x, path) {
+ write <- callJMethod(x@sdf, "write")
+ invisible(callJMethod(write, "text", path))
+ })
+
#' Distinct
#'
#' Return a new DataFrame containing the distinct rows in this DataFrame.
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index ccc683d86a..99679b4a77 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -295,6 +295,32 @@ parquetFile <- function(sqlContext, ...) {
read.parquet(sqlContext, unlist(list(...)))
}
+#' Create a DataFrame from a text file.
+#'
+#' Loads a text file and returns a DataFrame with a single string column named "value".
+#' Each line in the text file is a new row in the resulting DataFrame.
+#'
+#' @param sqlContext SQLContext to use
+#' @param path Path of file to read. A vector of multiple paths is allowed.
+#' @return DataFrame
+#' @rdname read.text
+#' @name read.text
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlContext <- sparkRSQL.init(sc)
+#' path <- "path/to/file.txt"
+#' df <- read.text(sqlContext, path)
+#' }
+read.text <- function(sqlContext, path) {
+ # Allow the user to have a more flexible definiton of the text file path
+ paths <- as.list(suppressWarnings(normalizePath(path)))
+ read <- callJMethod(sqlContext, "read")
+ sdf <- callJMethod(read, "text", paths)
+ dataFrame(sdf)
+}
+
#' SQL Query
#'
#' Executes a SQL query using Spark, returning the result as a DataFrame.
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 62be2ddc8f..ba68617097 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -549,6 +549,10 @@ setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet")
#' @export
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
+#' @rdname write.text
+#' @export
+setGeneric("write.text", function(x, path) { standardGeneric("write.text") })
+
#' @rdname schema
#' @export
setGeneric("schema", function(x) { standardGeneric("schema") })
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index ebe8faa34c..eaf60beda3 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1497,6 +1497,27 @@ test_that("read/write Parquet files", {
unlink(parquetPath4)
})
+test_that("read/write text files", {
+ # Test write.df and read.df
+ df <- read.df(sqlContext, jsonPath, "text")
+ expect_is(df, "DataFrame")
+ expect_equal(colnames(df), c("value"))
+ expect_equal(count(df), 3)
+ textPath <- tempfile(pattern = "textPath", fileext = ".txt")
+ write.df(df, textPath, "text", mode="overwrite")
+
+ # Test write.text and read.text
+ textPath2 <- tempfile(pattern = "textPath2", fileext = ".txt")
+ write.text(df, textPath2)
+ df2 <- read.text(sqlContext, c(textPath, textPath2))
+ expect_is(df2, "DataFrame")
+ expect_equal(colnames(df2), c("value"))
+ expect_equal(count(df2), count(df) * 2)
+
+ unlink(textPath)
+ unlink(textPath2)
+})
+
test_that("describe() and summarize() on a DataFrame", {
df <- read.json(sqlContext, jsonPath)
stats <- describe(df, "age")