From d1fea41363c175a67b97cb7b3fe89f9043708739 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Wed, 6 Jan 2016 12:05:41 +0530 Subject: [SPARK-12393][SPARKR] Add read.text and write.text for SparkR Add ```read.text``` and ```write.text``` for SparkR. cc sun-rui felixcheung shivaram Author: Yanbo Liang Closes #10348 from yanboliang/spark-12393. --- R/pkg/NAMESPACE | 4 +++- R/pkg/R/DataFrame.R | 28 ++++++++++++++++++++++++++++ R/pkg/R/SQLContext.R | 26 ++++++++++++++++++++++++++ R/pkg/R/generics.R | 4 ++++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 21 +++++++++++++++++++++ 5 files changed, 82 insertions(+), 1 deletion(-) (limited to 'R/pkg') diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index ccc01fe169..beacc39500 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -94,7 +94,8 @@ exportMethods("arrange", "withColumnRenamed", "write.df", "write.json", - "write.parquet") + "write.parquet", + "write.text") exportClasses("Column") @@ -274,6 +275,7 @@ export("as.DataFrame", "parquetFile", "read.df", "read.parquet", + "read.text", "sql", "table", "tableNames", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index c126f9efb4..3bf5bc924f 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -664,6 +664,34 @@ setMethod("saveAsParquetFile", write.parquet(x, path) }) +#' write.text +#' +#' Saves the content of the DataFrame in a text file at the specified path. +#' The DataFrame must have only one column of string type with the name "value". +#' Each row becomes a new line in the output file. +#' +#' @param x A SparkSQL DataFrame +#' @param path The directory where the file is saved +#' +#' @family DataFrame functions +#' @rdname write.text +#' @name write.text +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' path <- "path/to/file.txt" +#' df <- read.text(sqlContext, path) +#' write.text(df, "/tmp/sparkr-tmp/") +#'} +setMethod("write.text", + signature(x = "DataFrame", path = "character"), + function(x, path) { + write <- callJMethod(x@sdf, "write") + invisible(callJMethod(write, "text", path)) + }) + #' Distinct #' #' Return a new DataFrame containing the distinct rows in this DataFrame. diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index ccc683d86a..99679b4a77 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -295,6 +295,32 @@ parquetFile <- function(sqlContext, ...) { read.parquet(sqlContext, unlist(list(...))) } +#' Create a DataFrame from a text file. +#' +#' Loads a text file and returns a DataFrame with a single string column named "value". +#' Each line in the text file is a new row in the resulting DataFrame. +#' +#' @param sqlContext SQLContext to use +#' @param path Path of file to read. A vector of multiple paths is allowed. +#' @return DataFrame +#' @rdname read.text +#' @name read.text +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' path <- "path/to/file.txt" +#' df <- read.text(sqlContext, path) +#' } +read.text <- function(sqlContext, path) { + # Allow the user to have a more flexible definiton of the text file path + paths <- as.list(suppressWarnings(normalizePath(path))) + read <- callJMethod(sqlContext, "read") + sdf <- callJMethod(read, "text", paths) + dataFrame(sdf) +} + #' SQL Query #' #' Executes a SQL query using Spark, returning the result as a DataFrame. diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 62be2ddc8f..ba68617097 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -549,6 +549,10 @@ setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") #' @export setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") }) +#' @rdname write.text +#' @export +setGeneric("write.text", function(x, path) { standardGeneric("write.text") }) + #' @rdname schema #' @export setGeneric("schema", function(x) { standardGeneric("schema") }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index ebe8faa34c..eaf60beda3 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1497,6 +1497,27 @@ test_that("read/write Parquet files", { unlink(parquetPath4) }) +test_that("read/write text files", { + # Test write.df and read.df + df <- read.df(sqlContext, jsonPath, "text") + expect_is(df, "DataFrame") + expect_equal(colnames(df), c("value")) + expect_equal(count(df), 3) + textPath <- tempfile(pattern = "textPath", fileext = ".txt") + write.df(df, textPath, "text", mode="overwrite") + + # Test write.text and read.text + textPath2 <- tempfile(pattern = "textPath2", fileext = ".txt") + write.text(df, textPath2) + df2 <- read.text(sqlContext, c(textPath, textPath2)) + expect_is(df2, "DataFrame") + expect_equal(colnames(df2), c("value")) + expect_equal(count(df2), count(df) * 2) + + unlink(textPath) + unlink(textPath2) +}) + test_that("describe() and summarize() on a DataFrame", { df <- read.json(sqlContext, jsonPath) stats <- describe(df, "age") -- cgit v1.2.3