aboutsummaryrefslogtreecommitdiff
path: root/R/pkg
diff options
context:
space:
mode:
authorWeichenXu <WeichenXu123@outlook.com>2016-10-26 13:26:43 -0700
committerFelix Cheung <felixcheung@apache.org>2016-10-26 13:26:43 -0700
commitfb0a8a8dd7e8985676a846684b956e2d988875c6 (patch)
treed4949992a9fce376e9aa1a6e9ed4d69f5971f750 /R/pkg
parentea3605e82545031a00235ee0f449e1e2418674e8 (diff)
downloadspark-fb0a8a8dd7e8985676a846684b956e2d988875c6.tar.gz
spark-fb0a8a8dd7e8985676a846684b956e2d988875c6.tar.bz2
spark-fb0a8a8dd7e8985676a846684b956e2d988875c6.zip
[SPARK-17961][SPARKR][SQL] Add storageLevel to DataFrame for SparkR
## What changes were proposed in this pull request? Add storageLevel to DataFrame for SparkR. This is similar to this RP: https://github.com/apache/spark/pull/13780 but in R I do not make a class for `StorageLevel` but add a method `storageToString` ## How was this patch tested? test added. Author: WeichenXu <WeichenXu123@outlook.com> Closes #15516 from WeichenXu123/storageLevel_df_r.
Diffstat (limited to 'R/pkg')
-rw-r--r--R/pkg/NAMESPACE1
-rw-r--r--R/pkg/R/DataFrame.R28
-rw-r--r--R/pkg/R/RDD.R2
-rw-r--r--R/pkg/R/generics.R6
-rw-r--r--R/pkg/R/utils.R41
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R5
6 files changed, 79 insertions, 4 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 8718185171..eb314f4718 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -124,6 +124,7 @@ exportMethods("arrange",
"selectExpr",
"show",
"showDF",
+ "storageLevel",
"subset",
"summarize",
"summary",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index b6ce838969..be34e4b32f 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -633,7 +633,7 @@ setMethod("persist",
#' @param ... further arguments to be passed to or from other methods.
#'
#' @family SparkDataFrame functions
-#' @rdname unpersist-methods
+#' @rdname unpersist
#' @aliases unpersist,SparkDataFrame-method
#' @name unpersist
#' @export
@@ -654,6 +654,32 @@ setMethod("unpersist",
x
})
+#' StorageLevel
+#'
+#' Get storagelevel of this SparkDataFrame.
+#'
+#' @param x the SparkDataFrame to get the storageLevel.
+#'
+#' @family SparkDataFrame functions
+#' @rdname storageLevel
+#' @aliases storageLevel,SparkDataFrame-method
+#' @name storageLevel
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' path <- "path/to/file.json"
+#' df <- read.json(path)
+#' persist(df, "MEMORY_AND_DISK")
+#' storageLevel(df)
+#'}
+#' @note storageLevel since 2.1.0
+setMethod("storageLevel",
+ signature(x = "SparkDataFrame"),
+ function(x) {
+ storageLevelToString(callJMethod(x@sdf, "storageLevel"))
+ })
+
#' Repartition
#'
#' The following options for repartition are possible:
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 6cd0704003..0f1162fec1 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -261,7 +261,7 @@ setMethod("persistRDD",
#' cache(rdd) # rdd@@env$isCached == TRUE
#' unpersistRDD(rdd) # rdd@@env$isCached == FALSE
#'}
-#' @rdname unpersist-methods
+#' @rdname unpersist
#' @aliases unpersist,RDD-method
#' @noRd
setMethod("unpersistRDD",
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 5549cd7cac..4569fe4890 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -691,6 +691,10 @@ setGeneric("selectExpr", function(x, expr, ...) { standardGeneric("selectExpr")
#' @export
setGeneric("showDF", function(x, ...) { standardGeneric("showDF") })
+# @rdname storageLevel
+# @export
+setGeneric("storageLevel", function(x) { standardGeneric("storageLevel") })
+
#' @rdname subset
#' @export
setGeneric("subset", function(x, ...) { standardGeneric("subset") })
@@ -715,7 +719,7 @@ setGeneric("union", function(x, y) { standardGeneric("union") })
#' @export
setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })
-#' @rdname unpersist-methods
+#' @rdname unpersist
#' @export
setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index fa8bb0f79c..c4e78cbb80 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -385,6 +385,47 @@ getStorageLevel <- function(newLevel = c("DISK_ONLY",
"OFF_HEAP" = callJStatic(storageLevelClass, "OFF_HEAP"))
}
+storageLevelToString <- function(levelObj) {
+ useDisk <- callJMethod(levelObj, "useDisk")
+ useMemory <- callJMethod(levelObj, "useMemory")
+ useOffHeap <- callJMethod(levelObj, "useOffHeap")
+ deserialized <- callJMethod(levelObj, "deserialized")
+ replication <- callJMethod(levelObj, "replication")
+ shortName <- if (!useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
+ "NONE"
+ } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
+ "DISK_ONLY"
+ } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 2) {
+ "DISK_ONLY_2"
+ } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
+ "MEMORY_ONLY"
+ } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
+ "MEMORY_ONLY_2"
+ } else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
+ "MEMORY_ONLY_SER"
+ } else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
+ "MEMORY_ONLY_SER_2"
+ } else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
+ "MEMORY_AND_DISK"
+ } else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
+ "MEMORY_AND_DISK_2"
+ } else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
+ "MEMORY_AND_DISK_SER"
+ } else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
+ "MEMORY_AND_DISK_SER_2"
+ } else if (useDisk && useMemory && useOffHeap && !deserialized && replication == 1) {
+ "OFF_HEAP"
+ } else {
+ NULL
+ }
+ fullInfo <- callJMethod(levelObj, "toString")
+ if (is.null(shortName)) {
+ fullInfo
+ } else {
+ paste(shortName, "-", fullInfo)
+ }
+}
+
# Utility function for functions where an argument needs to be integer but we want to allow
# the user to type (for example) `5` instead of `5L` to avoid a confusing error message.
numToInt <- function(num) {
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index e77dbde44e..9289db57b6 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -796,7 +796,7 @@ test_that("multiple pipeline transformations result in an RDD with the correct v
expect_false(collectRDD(second)[[3]]$testCol)
})
-test_that("cache(), persist(), and unpersist() on a DataFrame", {
+test_that("cache(), storageLevel(), persist(), and unpersist() on a DataFrame", {
df <- read.json(jsonPath)
expect_false(df@env$isCached)
cache(df)
@@ -808,6 +808,9 @@ test_that("cache(), persist(), and unpersist() on a DataFrame", {
persist(df, "MEMORY_AND_DISK")
expect_true(df@env$isCached)
+ expect_equal(storageLevel(df),
+ "MEMORY_AND_DISK - StorageLevel(disk, memory, deserialized, 1 replicas)")
+
unpersist(df)
expect_false(df@env$isCached)