From fb0a8a8dd7e8985676a846684b956e2d988875c6 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Wed, 26 Oct 2016 13:26:43 -0700 Subject: [SPARK-17961][SPARKR][SQL] Add storageLevel to DataFrame for SparkR ## What changes were proposed in this pull request? Add storageLevel to DataFrame for SparkR. This is similar to this RP: https://github.com/apache/spark/pull/13780 but in R I do not make a class for `StorageLevel` but add a method `storageToString` ## How was this patch tested? test added. Author: WeichenXu Closes #15516 from WeichenXu123/storageLevel_df_r. --- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 28 ++++++++++++++++++++- R/pkg/R/RDD.R | 2 +- R/pkg/R/generics.R | 6 ++++- R/pkg/R/utils.R | 41 +++++++++++++++++++++++++++++++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 5 +++- 6 files changed, 79 insertions(+), 4 deletions(-) (limited to 'R/pkg') diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 8718185171..eb314f4718 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -124,6 +124,7 @@ exportMethods("arrange", "selectExpr", "show", "showDF", + "storageLevel", "subset", "summarize", "summary", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index b6ce838969..be34e4b32f 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -633,7 +633,7 @@ setMethod("persist", #' @param ... further arguments to be passed to or from other methods. #' #' @family SparkDataFrame functions -#' @rdname unpersist-methods +#' @rdname unpersist #' @aliases unpersist,SparkDataFrame-method #' @name unpersist #' @export @@ -654,6 +654,32 @@ setMethod("unpersist", x }) +#' StorageLevel +#' +#' Get storagelevel of this SparkDataFrame. +#' +#' @param x the SparkDataFrame to get the storageLevel. +#' +#' @family SparkDataFrame functions +#' @rdname storageLevel +#' @aliases storageLevel,SparkDataFrame-method +#' @name storageLevel +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' path <- "path/to/file.json" +#' df <- read.json(path) +#' persist(df, "MEMORY_AND_DISK") +#' storageLevel(df) +#'} +#' @note storageLevel since 2.1.0 +setMethod("storageLevel", + signature(x = "SparkDataFrame"), + function(x) { + storageLevelToString(callJMethod(x@sdf, "storageLevel")) + }) + #' Repartition #' #' The following options for repartition are possible: diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 6cd0704003..0f1162fec1 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -261,7 +261,7 @@ setMethod("persistRDD", #' cache(rdd) # rdd@@env$isCached == TRUE #' unpersistRDD(rdd) # rdd@@env$isCached == FALSE #'} -#' @rdname unpersist-methods +#' @rdname unpersist #' @aliases unpersist,RDD-method #' @noRd setMethod("unpersistRDD", diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 5549cd7cac..4569fe4890 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -691,6 +691,10 @@ setGeneric("selectExpr", function(x, expr, ...) { standardGeneric("selectExpr") #' @export setGeneric("showDF", function(x, ...) { standardGeneric("showDF") }) +# @rdname storageLevel +# @export +setGeneric("storageLevel", function(x) { standardGeneric("storageLevel") }) + #' @rdname subset #' @export setGeneric("subset", function(x, ...) { standardGeneric("subset") }) @@ -715,7 +719,7 @@ setGeneric("union", function(x, y) { standardGeneric("union") }) #' @export setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") }) -#' @rdname unpersist-methods +#' @rdname unpersist #' @export setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") }) diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index fa8bb0f79c..c4e78cbb80 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -385,6 +385,47 @@ getStorageLevel <- function(newLevel = c("DISK_ONLY", "OFF_HEAP" = callJStatic(storageLevelClass, "OFF_HEAP")) } +storageLevelToString <- function(levelObj) { + useDisk <- callJMethod(levelObj, "useDisk") + useMemory <- callJMethod(levelObj, "useMemory") + useOffHeap <- callJMethod(levelObj, "useOffHeap") + deserialized <- callJMethod(levelObj, "deserialized") + replication <- callJMethod(levelObj, "replication") + shortName <- if (!useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) { + "NONE" + } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) { + "DISK_ONLY" + } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 2) { + "DISK_ONLY_2" + } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 1) { + "MEMORY_ONLY" + } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 2) { + "MEMORY_ONLY_2" + } else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) { + "MEMORY_ONLY_SER" + } else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) { + "MEMORY_ONLY_SER_2" + } else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 1) { + "MEMORY_AND_DISK" + } else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 2) { + "MEMORY_AND_DISK_2" + } else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) { + "MEMORY_AND_DISK_SER" + } else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) { + "MEMORY_AND_DISK_SER_2" + } else if (useDisk && useMemory && useOffHeap && !deserialized && replication == 1) { + "OFF_HEAP" + } else { + NULL + } + fullInfo <- callJMethod(levelObj, "toString") + if (is.null(shortName)) { + fullInfo + } else { + paste(shortName, "-", fullInfo) + } +} + # Utility function for functions where an argument needs to be integer but we want to allow # the user to type (for example) `5` instead of `5L` to avoid a confusing error message. numToInt <- function(num) { diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index e77dbde44e..9289db57b6 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -796,7 +796,7 @@ test_that("multiple pipeline transformations result in an RDD with the correct v expect_false(collectRDD(second)[[3]]$testCol) }) -test_that("cache(), persist(), and unpersist() on a DataFrame", { +test_that("cache(), storageLevel(), persist(), and unpersist() on a DataFrame", { df <- read.json(jsonPath) expect_false(df@env$isCached) cache(df) @@ -808,6 +808,9 @@ test_that("cache(), persist(), and unpersist() on a DataFrame", { persist(df, "MEMORY_AND_DISK") expect_true(df@env$isCached) + expect_equal(storageLevel(df), + "MEMORY_AND_DISK - StorageLevel(disk, memory, deserialized, 1 replicas)") + unpersist(df) expect_false(df@env$isCached) -- cgit v1.2.3