aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--R/pkg/NAMESPACE1
-rw-r--r--R/pkg/R/DataFrame.R23
-rw-r--r--R/pkg/R/RDD.R30
-rw-r--r--R/pkg/R/generics.R8
-rw-r--r--R/pkg/R/pairRDD.R4
-rw-r--r--R/pkg/inst/tests/testthat/test_rdd.R10
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R14
7 files changed, 59 insertions, 31 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index caa1c3b91b..7ff6e9a9d3 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -95,6 +95,7 @@ exportMethods("arrange",
"freqItems",
"gapply",
"gapplyCollect",
+ "getNumPartitions",
"group_by",
"groupBy",
"head",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 0a1012283e..523343ea9f 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -3428,3 +3428,26 @@ setMethod("randomSplit",
}
sapply(sdfs, dataFrame)
})
+
+#' getNumPartitions
+#'
+#' Return the number of partitions
+#'
+#' @param x A SparkDataFrame
+#' @family SparkDataFrame functions
+#' @aliases getNumPartitions,SparkDataFrame-method
+#' @rdname getNumPartitions
+#' @name getNumPartitions
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df <- createDataFrame(cars, numPartitions = 2)
+#' getNumPartitions(df)
+#' }
+#' @note getNumPartitions since 2.1.1
+setMethod("getNumPartitions",
+ signature(x = "SparkDataFrame"),
+ function(x) {
+ callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
+ })
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 0f1162fec1..91bab332c2 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -313,7 +313,7 @@ setMethod("checkpoint",
#' @rdname getNumPartitions
#' @aliases getNumPartitions,RDD-method
#' @noRd
-setMethod("getNumPartitions",
+setMethod("getNumPartitionsRDD",
signature(x = "RDD"),
function(x) {
callJMethod(getJRDD(x), "getNumPartitions")
@@ -329,7 +329,7 @@ setMethod("numPartitions",
signature(x = "RDD"),
function(x) {
.Deprecated("getNumPartitions")
- getNumPartitions(x)
+ getNumPartitionsRDD(x)
})
#' Collect elements of an RDD
@@ -460,7 +460,7 @@ setMethod("countByValue",
signature(x = "RDD"),
function(x) {
ones <- lapply(x, function(item) { list(item, 1L) })
- collectRDD(reduceByKey(ones, `+`, getNumPartitions(x)))
+ collectRDD(reduceByKey(ones, `+`, getNumPartitionsRDD(x)))
})
#' Apply a function to all elements
@@ -780,7 +780,7 @@ setMethod("takeRDD",
resList <- list()
index <- -1
jrdd <- getJRDD(x)
- numPartitions <- getNumPartitions(x)
+ numPartitions <- getNumPartitionsRDD(x)
serializedModeRDD <- getSerializedMode(x)
# TODO(shivaram): Collect more than one partition based on size
@@ -846,7 +846,7 @@ setMethod("firstRDD",
#' @noRd
setMethod("distinctRDD",
signature(x = "RDD"),
- function(x, numPartitions = SparkR:::getNumPartitions(x)) {
+ function(x, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
identical.mapped <- lapply(x, function(x) { list(x, NULL) })
reduced <- reduceByKey(identical.mapped,
function(x, y) { x },
@@ -1053,7 +1053,7 @@ setMethod("coalesce",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
- if (shuffle || numPartitions > SparkR:::getNumPartitions(x)) {
+ if (shuffle || numPartitions > SparkR:::getNumPartitionsRDD(x)) {
func <- function(partIndex, part) {
set.seed(partIndex) # partIndex as seed
start <- as.integer(base::sample(numPartitions, 1) - 1)
@@ -1143,7 +1143,7 @@ setMethod("saveAsTextFile",
#' @noRd
setMethod("sortBy",
signature(x = "RDD", func = "function"),
- function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitions(x)) {
+ function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
values(sortByKey(keyBy(x, func), ascending, numPartitions))
})
@@ -1175,7 +1175,7 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
resList <- list()
index <- -1
jrdd <- getJRDD(newRdd)
- numPartitions <- getNumPartitions(newRdd)
+ numPartitions <- getNumPartitionsRDD(newRdd)
serializedModeRDD <- getSerializedMode(newRdd)
while (TRUE) {
@@ -1407,7 +1407,7 @@ setMethod("setName",
setMethod("zipWithUniqueId",
signature(x = "RDD"),
function(x) {
- n <- getNumPartitions(x)
+ n <- getNumPartitionsRDD(x)
partitionFunc <- function(partIndex, part) {
mapply(
@@ -1450,7 +1450,7 @@ setMethod("zipWithUniqueId",
setMethod("zipWithIndex",
signature(x = "RDD"),
function(x) {
- n <- getNumPartitions(x)
+ n <- getNumPartitionsRDD(x)
if (n > 1) {
nums <- collectRDD(lapplyPartition(x,
function(part) {
@@ -1566,8 +1566,8 @@ setMethod("unionRDD",
setMethod("zipRDD",
signature(x = "RDD", other = "RDD"),
function(x, other) {
- n1 <- getNumPartitions(x)
- n2 <- getNumPartitions(other)
+ n1 <- getNumPartitionsRDD(x)
+ n2 <- getNumPartitionsRDD(other)
if (n1 != n2) {
stop("Can only zip RDDs which have the same number of partitions.")
}
@@ -1637,7 +1637,7 @@ setMethod("cartesian",
#' @noRd
setMethod("subtract",
signature(x = "RDD", other = "RDD"),
- function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
+ function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
mapFunction <- function(e) { list(e, NA) }
rdd1 <- map(x, mapFunction)
rdd2 <- map(other, mapFunction)
@@ -1671,7 +1671,7 @@ setMethod("subtract",
#' @noRd
setMethod("intersection",
signature(x = "RDD", other = "RDD"),
- function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
+ function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
rdd1 <- map(x, function(v) { list(v, NA) })
rdd2 <- map(other, function(v) { list(v, NA) })
@@ -1714,7 +1714,7 @@ setMethod("zipPartitions",
if (length(rrdds) == 1) {
return(rrdds[[1]])
}
- nPart <- sapply(rrdds, getNumPartitions)
+ nPart <- sapply(rrdds, getNumPartitionsRDD)
if (length(unique(nPart)) != 1) {
stop("Can only zipPartitions RDDs which have the same number of partitions.")
}
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 433c16640c..0307bac349 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -138,9 +138,9 @@ setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") })
# @export
setGeneric("name", function(x) { standardGeneric("name") })
-# @rdname getNumPartitions
+# @rdname getNumPartitionsRDD
# @export
-setGeneric("getNumPartitions", function(x) { standardGeneric("getNumPartitions") })
+setGeneric("getNumPartitionsRDD", function(x) { standardGeneric("getNumPartitionsRDD") })
# @rdname getNumPartitions
# @export
@@ -492,6 +492,10 @@ setGeneric("gapply", function(x, ...) { standardGeneric("gapply") })
#' @export
setGeneric("gapplyCollect", function(x, ...) { standardGeneric("gapplyCollect") })
+# @rdname getNumPartitions
+# @export
+setGeneric("getNumPartitions", function(x) { standardGeneric("getNumPartitions") })
+
#' @rdname summary
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index 4dee3245f9..8fa21be307 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -780,7 +780,7 @@ setMethod("cogroup",
#' @noRd
setMethod("sortByKey",
signature(x = "RDD"),
- function(x, ascending = TRUE, numPartitions = SparkR:::getNumPartitions(x)) {
+ function(x, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
rangeBounds <- list()
if (numPartitions > 1) {
@@ -850,7 +850,7 @@ setMethod("sortByKey",
#' @noRd
setMethod("subtractByKey",
signature(x = "RDD", other = "RDD"),
- function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
+ function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
filterFunction <- function(elem) {
iters <- elem[[2]]
(length(iters[[1]]) > 0) && (length(iters[[2]]) == 0)
diff --git a/R/pkg/inst/tests/testthat/test_rdd.R b/R/pkg/inst/tests/testthat/test_rdd.R
index 2c41a6b075..ceb31bd896 100644
--- a/R/pkg/inst/tests/testthat/test_rdd.R
+++ b/R/pkg/inst/tests/testthat/test_rdd.R
@@ -29,8 +29,8 @@ intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
intRdd <- parallelize(sc, intPairs, 2L)
test_that("get number of partitions in RDD", {
- expect_equal(getNumPartitions(rdd), 2)
- expect_equal(getNumPartitions(intRdd), 2)
+ expect_equal(getNumPartitionsRDD(rdd), 2)
+ expect_equal(getNumPartitionsRDD(intRdd), 2)
})
test_that("first on RDD", {
@@ -305,18 +305,18 @@ test_that("repartition/coalesce on RDDs", {
# repartition
r1 <- repartitionRDD(rdd, 2)
- expect_equal(getNumPartitions(r1), 2L)
+ expect_equal(getNumPartitionsRDD(r1), 2L)
count <- length(collectPartition(r1, 0L))
expect_true(count >= 8 && count <= 12)
r2 <- repartitionRDD(rdd, 6)
- expect_equal(getNumPartitions(r2), 6L)
+ expect_equal(getNumPartitionsRDD(r2), 6L)
count <- length(collectPartition(r2, 0L))
expect_true(count >= 0 && count <= 4)
# coalesce
r3 <- coalesce(rdd, 1)
- expect_equal(getNumPartitions(r3), 1L)
+ expect_equal(getNumPartitionsRDD(r3), 1L)
count <- length(collectPartition(r3, 0L))
expect_equal(count, 20)
})
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index aaa8fb498c..417a03ff61 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -196,18 +196,18 @@ test_that("create DataFrame from RDD", {
expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(as.list(collect(where(df, df$name == "John"))),
list(name = "John", age = 19L, height = 176.5))
- expect_equal(getNumPartitions(toRDD(df)), 1)
+ expect_equal(getNumPartitions(df), 1)
df <- as.DataFrame(cars, numPartitions = 2)
- expect_equal(getNumPartitions(toRDD(df)), 2)
+ expect_equal(getNumPartitions(df), 2)
df <- createDataFrame(cars, numPartitions = 3)
- expect_equal(getNumPartitions(toRDD(df)), 3)
+ expect_equal(getNumPartitions(df), 3)
# validate limit by num of rows
df <- createDataFrame(cars, numPartitions = 60)
- expect_equal(getNumPartitions(toRDD(df)), 50)
+ expect_equal(getNumPartitions(df), 50)
# validate when 1 < (length(coll) / numSlices) << length(coll)
df <- createDataFrame(cars, numPartitions = 20)
- expect_equal(getNumPartitions(toRDD(df)), 20)
+ expect_equal(getNumPartitions(df), 20)
df <- as.DataFrame(data.frame(0))
expect_is(df, "SparkDataFrame")
@@ -215,7 +215,7 @@ test_that("create DataFrame from RDD", {
expect_is(df, "SparkDataFrame")
df <- as.DataFrame(data.frame(0), numPartitions = 2)
# no data to partition, goes to 1
- expect_equal(getNumPartitions(toRDD(df)), 1)
+ expect_equal(getNumPartitions(df), 1)
setHiveContext(sc)
sql("CREATE TABLE people (name string, age double, height float)")
@@ -234,7 +234,7 @@ test_that("createDataFrame uses files for large objects", {
conf <- callJMethod(sparkSession, "conf")
callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100")
df <- suppressWarnings(createDataFrame(iris, numPartitions = 3))
- expect_equal(getNumPartitions(toRDD(df)), 3)
+ expect_equal(getNumPartitions(df), 3)
# Resetting the conf back to default value
callJMethod(conf, "set", "spark.r.maxAllocationLimit", toString(.Machine$integer.max / 10))