aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorFelix Cheung <felixcheung_m@hotmail.com>2017-03-19 22:34:18 -0700
committerFelix Cheung <felixcheung@apache.org>2017-03-19 22:34:18 -0700
commitc40597720e8e66a6b11ca241b1ad387154a8fe72 (patch)
tree59fbba7991666313502bd2b80d2d41d6d37ee593 /R
parent0cdcf9114527a2c359c25e46fd6556b3855bfb28 (diff)
downloadspark-c40597720e8e66a6b11ca241b1ad387154a8fe72.tar.gz
spark-c40597720e8e66a6b11ca241b1ad387154a8fe72.tar.bz2
spark-c40597720e8e66a6b11ca241b1ad387154a8fe72.zip
[SPARK-20020][SPARKR] DataFrame checkpoint API
## What changes were proposed in this pull request? Add checkpoint, setCheckpointDir API to R ## How was this patch tested? unit tests, manual tests Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #17351 from felixcheung/rdfcheckpoint.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/NAMESPACE2
-rw-r--r--R/pkg/R/DataFrame.R29
-rw-r--r--R/pkg/R/RDD.R2
-rw-r--r--R/pkg/R/context.R21
-rw-r--r--R/pkg/R/generics.R6
-rw-r--r--R/pkg/inst/tests/testthat/test_rdd.R4
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R11
7 files changed, 70 insertions, 5 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 78344ce9ff..8be7875ad2 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -82,6 +82,7 @@ exportMethods("arrange",
"as.data.frame",
"attach",
"cache",
+ "checkpoint",
"coalesce",
"collect",
"colnames",
@@ -369,6 +370,7 @@ export("as.DataFrame",
"read.parquet",
"read.stream",
"read.text",
+ "setCheckpointDir",
"spark.lapply",
"spark.addFile",
"spark.getSparkFilesRootDirectory",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index bc81633815..97786df4ae 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -3613,3 +3613,32 @@ setMethod("write.stream",
ssq <- handledCallJMethod(write, "start")
streamingQuery(ssq)
})
+
+#' checkpoint
+#'
+#' Returns a checkpointed version of this SparkDataFrame. Checkpointing can be used to truncate the
+#' logical plan, which is especially useful in iterative algorithms where the plan may grow
+#' exponentially. It will be saved to files inside the checkpoint directory set with
+#' \code{setCheckpointDir}
+#'
+#' @param x A SparkDataFrame
+#' @param eager whether to checkpoint this SparkDataFrame immediately
+#' @return a new checkpointed SparkDataFrame
+#' @family SparkDataFrame functions
+#' @aliases checkpoint,SparkDataFrame-method
+#' @rdname checkpoint
+#' @name checkpoint
+#' @seealso \link{setCheckpointDir}
+#' @export
+#' @examples
+#'\dontrun{
+#' setCheckpointDir("/checkpoint")
+#' df <- checkpoint(df)
+#' }
+#' @note checkpoint since 2.2.0
+setMethod("checkpoint",
+ signature(x = "SparkDataFrame"),
+ function(x, eager = TRUE) {
+ df <- callJMethod(x@sdf, "checkpoint", as.logical(eager))
+ dataFrame(df)
+ })
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 5667b9d788..7ad3993e9e 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -291,7 +291,7 @@ setMethod("unpersistRDD",
#' @rdname checkpoint-methods
#' @aliases checkpoint,RDD-method
#' @noRd
-setMethod("checkpoint",
+setMethod("checkpointRDD",
signature(x = "RDD"),
function(x) {
jrdd <- getJRDD(x)
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 1a0dd65f45..cb0f83b2fa 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -291,7 +291,7 @@ broadcast <- function(sc, object) {
#' rdd <- parallelize(sc, 1:2, 2L)
#' checkpoint(rdd)
#'}
-setCheckpointDir <- function(sc, dirName) {
+setCheckpointDirSC <- function(sc, dirName) {
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
}
@@ -410,3 +410,22 @@ setLogLevel <- function(level) {
sc <- getSparkContext()
invisible(callJMethod(sc, "setLogLevel", level))
}
+
+#' Set checkpoint directory
+#'
+#' Set the directory under which SparkDataFrame are going to be checkpointed. The directory must be
+#' a HDFS path if running on a cluster.
+#'
+#' @rdname setCheckpointDir
+#' @param directory Directory path to checkpoint to
+#' @seealso \link{checkpoint}
+#' @export
+#' @examples
+#'\dontrun{
+#' setCheckpointDir("/checkpoint")
+#'}
+#' @note setCheckpointDir since 2.0.0
+setCheckpointDir <- function(directory) {
+ sc <- getSparkContext()
+ invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(directory))))
+}
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 029771289f..80283e48ce 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -32,7 +32,7 @@ setGeneric("coalesceRDD", function(x, numPartitions, ...) { standardGeneric("coa
# @rdname checkpoint-methods
# @export
-setGeneric("checkpoint", function(x) { standardGeneric("checkpoint") })
+setGeneric("checkpointRDD", function(x) { standardGeneric("checkpointRDD") })
setGeneric("collectRDD", function(x, ...) { standardGeneric("collectRDD") })
@@ -406,6 +406,10 @@ setGeneric("attach")
#' @export
setGeneric("cache", function(x) { standardGeneric("cache") })
+#' @rdname checkpoint
+#' @export
+setGeneric("checkpoint", function(x, eager = TRUE) { standardGeneric("checkpoint") })
+
#' @rdname coalesce
#' @param x a Column or a SparkDataFrame.
#' @param ... additional argument(s). If \code{x} is a Column, additional Columns can be optionally
diff --git a/R/pkg/inst/tests/testthat/test_rdd.R b/R/pkg/inst/tests/testthat/test_rdd.R
index 787ef51c50..b72c801dd9 100644
--- a/R/pkg/inst/tests/testthat/test_rdd.R
+++ b/R/pkg/inst/tests/testthat/test_rdd.R
@@ -143,8 +143,8 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
expect_false(rdd2@env$isCached)
tempDir <- tempfile(pattern = "checkpoint")
- setCheckpointDir(sc, tempDir)
- checkpoint(rdd2)
+ setCheckpointDirSC(sc, tempDir)
+ checkpointRDD(rdd2)
expect_true(rdd2@env$isCheckpointed)
rdd2 <- lapply(rdd2, function(x) x)
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 9c38e0d866..cbc3569795 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -841,6 +841,17 @@ test_that("cache(), storageLevel(), persist(), and unpersist() on a DataFrame",
expect_true(is.data.frame(collect(df)))
})
+test_that("setCheckpointDir(), checkpoint() on a DataFrame", {
+ checkpointDir <- file.path(tempdir(), "cproot")
+ expect_true(length(list.files(path = checkpointDir, all.files = TRUE)) == 0)
+
+ setCheckpointDir(checkpointDir)
+ df <- read.json(jsonPath)
+ df <- checkpoint(df)
+ expect_is(df, "SparkDataFrame")
+ expect_false(length(list.files(path = checkpointDir, all.files = TRUE)) == 0)
+})
+
test_that("schema(), dtypes(), columns(), names() return the correct values/format", {
df <- read.json(jsonPath)
testSchema <- schema(df)