aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorSun Rui <rui.sun@intel.com>2015-05-12 23:52:30 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-05-12 23:52:30 -0700
commitdf9b94a57cbd0e028228059d215b446d59d25ba8 (patch)
tree21fca7d47026df323baaa0f4f84ff38c383cb477 /R
parent208b902257bbfb85bf8cadfc942b7134ad690f8b (diff)
downloadspark-df9b94a57cbd0e028228059d215b446d59d25ba8.tar.gz
spark-df9b94a57cbd0e028228059d215b446d59d25ba8.tar.bz2
spark-df9b94a57cbd0e028228059d215b446d59d25ba8.zip
[SPARK-7482] [SPARKR] Rename some DataFrame API methods in SparkR to match their counterparts in Scala.
Author: Sun Rui <rui.sun@intel.com> Closes #6007 from sun-rui/SPARK-7482 and squashes the following commits: 5c5cf5e [Sun Rui] Implement alias loadDF() as a new function. 3a30c10 [Sun Rui] Rename load()/save() to read.df()/write.df(). Also add loadDF()/saveDF() as aliases. 9f569d6 [Sun Rui] [SPARK-7482][SparkR] Rename some DataFrame API methods in SparkR to match their counterparts in Scala.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/NAMESPACE6
-rw-r--r--R/pkg/R/DataFrame.R35
-rw-r--r--R/pkg/R/RDD.R4
-rw-r--r--R/pkg/R/SQLContext.R13
-rw-r--r--R/pkg/R/generics.R22
-rw-r--r--R/pkg/inst/tests/test_sparkSQL.R40
6 files changed, 71 insertions, 49 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 819e9a24e5..ba29614e7b 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -37,7 +37,7 @@ exportMethods("arrange",
"registerTempTable",
"rename",
"repartition",
- "sampleDF",
+ "sample",
"sample_frac",
"saveAsParquetFile",
"saveAsTable",
@@ -53,7 +53,8 @@ exportMethods("arrange",
"unpersist",
"where",
"withColumn",
- "withColumnRenamed")
+ "withColumnRenamed",
+ "write.df")
exportClasses("Column")
@@ -101,6 +102,7 @@ export("cacheTable",
"jsonFile",
"loadDF",
"parquetFile",
+ "read.df",
"sql",
"table",
"tableNames",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 2705817531..a7fa32e291 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -294,8 +294,8 @@ setMethod("registerTempTable",
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
-#' df <- loadDF(sqlCtx, path, "parquet")
-#' df2 <- loadDF(sqlCtx, path2, "parquet")
+#' df <- read.df(sqlCtx, path, "parquet")
+#' df2 <- read.df(sqlCtx, path2, "parquet")
#' registerTempTable(df, "table1")
#' insertInto(df2, "table1", overwrite = TRUE)
#'}
@@ -473,14 +473,14 @@ setMethod("distinct",
dataFrame(sdf)
})
-#' SampleDF
+#' Sample
#'
#' Return a sampled subset of this DataFrame using a random seed.
#'
#' @param x A SparkSQL DataFrame
#' @param withReplacement Sampling with replacement or not
#' @param fraction The (rough) sample target fraction
-#' @rdname sampleDF
+#' @rdname sample
#' @aliases sample_frac
#' @export
#' @examples
@@ -489,10 +489,10 @@ setMethod("distinct",
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
-#' collect(sampleDF(df, FALSE, 0.5))
-#' collect(sampleDF(df, TRUE, 0.5))
+#' collect(sample(df, FALSE, 0.5))
+#' collect(sample(df, TRUE, 0.5))
#'}
-setMethod("sampleDF",
+setMethod("sample",
# TODO : Figure out how to send integer as java.lang.Long to JVM so
# we can send seed as an argument through callJMethod
signature(x = "DataFrame", withReplacement = "logical",
@@ -503,13 +503,13 @@ setMethod("sampleDF",
dataFrame(sdf)
})
-#' @rdname sampleDF
-#' @aliases sampleDF
+#' @rdname sample
+#' @aliases sample
setMethod("sample_frac",
signature(x = "DataFrame", withReplacement = "logical",
fraction = "numeric"),
function(x, withReplacement, fraction) {
- sampleDF(x, withReplacement, fraction)
+ sample(x, withReplacement, fraction)
})
#' Count
@@ -1303,7 +1303,7 @@ setMethod("except",
#' @param source A name for external data source
#' @param mode One of 'append', 'overwrite', 'error', 'ignore'
#'
-#' @rdname saveAsTable
+#' @rdname write.df
#' @export
#' @examples
#'\dontrun{
@@ -1311,9 +1311,9 @@ setMethod("except",
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
-#' saveAsTable(df, "myfile")
+#' write.df(df, "myfile", "parquet", "overwrite")
#' }
-setMethod("saveDF",
+setMethod("write.df",
signature(df = "DataFrame", path = 'character', source = 'character',
mode = 'character'),
function(df, path = NULL, source = NULL, mode = "append", ...){
@@ -1334,6 +1334,15 @@ setMethod("saveDF",
callJMethod(df@sdf, "save", source, jmode, options)
})
+#' @rdname write.df
+#' @aliases saveDF
+#' @export
+setMethod("saveDF",
+ signature(df = "DataFrame", path = 'character', source = 'character',
+ mode = 'character'),
+ function(df, path = NULL, source = NULL, mode = "append", ...){
+ write.df(df, path, source, mode, ...)
+ })
#' saveAsTable
#'
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 9138629cac..d3a68fff78 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -927,7 +927,7 @@ setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
MAXINT)))))
# TODO(zongheng): investigate if this call is an in-place shuffle?
- sample(samples)[1:total]
+ base::sample(samples)[1:total]
})
# Creates tuples of the elements in this RDD by applying a function.
@@ -996,7 +996,7 @@ setMethod("coalesce",
if (shuffle || numPartitions > SparkR:::numPartitions(x)) {
func <- function(partIndex, part) {
set.seed(partIndex) # partIndex as seed
- start <- as.integer(sample(numPartitions, 1) - 1)
+ start <- as.integer(base::sample(numPartitions, 1) - 1)
lapply(seq_along(part),
function(i) {
pos <- (start + i) %% numPartitions
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index cae06e6af2..531442e845 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -421,7 +421,7 @@ clearCache <- function(sqlCtx) {
#' \dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
-#' df <- loadDF(sqlCtx, path, "parquet")
+#' df <- read.df(sqlCtx, path, "parquet")
#' registerTempTable(df, "table")
#' dropTempTable(sqlCtx, "table")
#' }
@@ -450,10 +450,10 @@ dropTempTable <- function(sqlCtx, tableName) {
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
-#' df <- load(sqlCtx, "path/to/file.json", source = "json")
+#' df <- read.df(sqlCtx, "path/to/file.json", source = "json")
#' }
-loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
+read.df <- function(sqlCtx, path = NULL, source = NULL, ...) {
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
@@ -462,6 +462,13 @@ loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
dataFrame(sdf)
}
+#' @aliases loadDF
+#' @export
+
+loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
+ read.df(sqlCtx, path, source, ...)
+}
+
#' Create an external table
#'
#' Creates an external table based on the dataset in a data source,
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 557128a419..6d2bfb1181 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -456,19 +456,19 @@ setGeneric("rename", function(x, ...) { standardGeneric("rename") })
#' @export
setGeneric("registerTempTable", function(x, tableName) { standardGeneric("registerTempTable") })
-#' @rdname sampleDF
+#' @rdname sample
#' @export
-setGeneric("sample_frac",
+setGeneric("sample",
function(x, withReplacement, fraction, seed) {
- standardGeneric("sample_frac")
- })
+ standardGeneric("sample")
+ })
-#' @rdname sampleDF
+#' @rdname sample
#' @export
-setGeneric("sampleDF",
+setGeneric("sample_frac",
function(x, withReplacement, fraction, seed) {
- standardGeneric("sampleDF")
- })
+ standardGeneric("sample_frac")
+ })
#' @rdname saveAsParquetFile
#' @export
@@ -480,7 +480,11 @@ setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
standardGeneric("saveAsTable")
})
-#' @rdname saveAsTable
+#' @rdname write.df
+#' @export
+setGeneric("write.df", function(df, path, source, mode, ...) { standardGeneric("write.df") })
+
+#' @rdname write.df
#' @export
setGeneric("saveDF", function(df, path, source, mode, ...) { standardGeneric("saveDF") })
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index 99c28830c6..1109e8fdba 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -209,18 +209,18 @@ test_that("registerTempTable() results in a queryable table and sql() results in
})
test_that("insertInto() on a registered table", {
- df <- loadDF(sqlCtx, jsonPath, "json")
- saveDF(df, parquetPath, "parquet", "overwrite")
- dfParquet <- loadDF(sqlCtx, parquetPath, "parquet")
+ df <- read.df(sqlCtx, jsonPath, "json")
+ write.df(df, parquetPath, "parquet", "overwrite")
+ dfParquet <- read.df(sqlCtx, parquetPath, "parquet")
lines <- c("{\"name\":\"Bob\", \"age\":24}",
"{\"name\":\"James\", \"age\":35}")
jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".tmp")
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
writeLines(lines, jsonPath2)
- df2 <- loadDF(sqlCtx, jsonPath2, "json")
- saveDF(df2, parquetPath2, "parquet", "overwrite")
- dfParquet2 <- loadDF(sqlCtx, parquetPath2, "parquet")
+ df2 <- read.df(sqlCtx, jsonPath2, "json")
+ write.df(df2, parquetPath2, "parquet", "overwrite")
+ dfParquet2 <- read.df(sqlCtx, parquetPath2, "parquet")
registerTempTable(dfParquet, "table1")
insertInto(dfParquet2, "table1")
@@ -421,12 +421,12 @@ test_that("distinct() on DataFrames", {
expect_true(count(uniques) == 3)
})
-test_that("sampleDF on a DataFrame", {
+test_that("sample on a DataFrame", {
df <- jsonFile(sqlCtx, jsonPath)
- sampled <- sampleDF(df, FALSE, 1.0)
+ sampled <- sample(df, FALSE, 1.0)
expect_equal(nrow(collect(sampled)), count(df))
expect_true(inherits(sampled, "DataFrame"))
- sampled2 <- sampleDF(df, FALSE, 0.1)
+ sampled2 <- sample(df, FALSE, 0.1)
expect_true(count(sampled2) < 3)
# Also test sample_frac
@@ -491,16 +491,16 @@ test_that("column calculation", {
expect_true(count(df2) == 3)
})
-test_that("load() from json file", {
- df <- loadDF(sqlCtx, jsonPath, "json")
+test_that("read.df() from json file", {
+ df <- read.df(sqlCtx, jsonPath, "json")
expect_true(inherits(df, "DataFrame"))
expect_true(count(df) == 3)
})
-test_that("save() as parquet file", {
- df <- loadDF(sqlCtx, jsonPath, "json")
- saveDF(df, parquetPath, "parquet", mode="overwrite")
- df2 <- loadDF(sqlCtx, parquetPath, "parquet")
+test_that("write.df() as parquet file", {
+ df <- read.df(sqlCtx, jsonPath, "json")
+ write.df(df, parquetPath, "parquet", mode="overwrite")
+ df2 <- read.df(sqlCtx, parquetPath, "parquet")
expect_true(inherits(df2, "DataFrame"))
expect_true(count(df2) == 3)
})
@@ -670,7 +670,7 @@ test_that("unionAll(), except(), and intersect() on a DataFrame", {
"{\"name\":\"James\", \"age\":35}")
jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(lines, jsonPath2)
- df2 <- loadDF(sqlCtx, jsonPath2, "json")
+ df2 <- read.df(sqlCtx, jsonPath2, "json")
unioned <- arrange(unionAll(df, df2), df$age)
expect_true(inherits(unioned, "DataFrame"))
@@ -712,9 +712,9 @@ test_that("mutate() and rename()", {
expect_true(columns(newDF2)[1] == "newerAge")
})
-test_that("saveDF() on DataFrame and works with parquetFile", {
+test_that("write.df() on DataFrame and works with parquetFile", {
df <- jsonFile(sqlCtx, jsonPath)
- saveDF(df, parquetPath, "parquet", mode="overwrite")
+ write.df(df, parquetPath, "parquet", mode="overwrite")
parquetDF <- parquetFile(sqlCtx, parquetPath)
expect_true(inherits(parquetDF, "DataFrame"))
expect_equal(count(df), count(parquetDF))
@@ -722,9 +722,9 @@ test_that("saveDF() on DataFrame and works with parquetFile", {
test_that("parquetFile works with multiple input paths", {
df <- jsonFile(sqlCtx, jsonPath)
- saveDF(df, parquetPath, "parquet", mode="overwrite")
+ write.df(df, parquetPath, "parquet", mode="overwrite")
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
- saveDF(df, parquetPath2, "parquet", mode="overwrite")
+ write.df(df, parquetPath2, "parquet", mode="overwrite")
parquetDF <- parquetFile(sqlCtx, parquetPath, parquetPath2)
expect_true(inherits(parquetDF, "DataFrame"))
expect_true(count(parquetDF) == count(df)*2)