aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/inst/tests/testthat
diff options
context:
space:
mode:
authorFelix Cheung <felixcheung_m@hotmail.com>2016-08-16 11:19:18 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2016-08-16 11:19:18 -0700
commitc34b546d674ce186f13d9999b97977bc281cfedf (patch)
treefe6bee26bcc0b4b292691ca5518a4731fa217fbd /R/pkg/inst/tests/testthat
parentd37ea3c09c054f2cc1305b2520ff46b2c0e58704 (diff)
downloadspark-c34b546d674ce186f13d9999b97977bc281cfedf.tar.gz
spark-c34b546d674ce186f13d9999b97977bc281cfedf.tar.bz2
spark-c34b546d674ce186f13d9999b97977bc281cfedf.zip
[SPARK-16519][SPARKR] Handle SparkR RDD generics that create warnings in R CMD check
## What changes were proposed in this pull request? Rename RDD functions for now to avoid CRAN check warnings. Some RDD functions are sharing generics with DataFrame functions (hence the problem) so after the renames we need to add new generics, for now. ## How was this patch tested? unit tests Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #14626 from felixcheung/rrddfunctions.
Diffstat (limited to 'R/pkg/inst/tests/testthat')
-rw-r--r--R/pkg/inst/tests/testthat/test_binaryFile.R8
-rw-r--r--R/pkg/inst/tests/testthat/test_binary_function.R18
-rw-r--r--R/pkg/inst/tests/testthat/test_broadcast.R4
-rw-r--r--R/pkg/inst/tests/testthat/test_context.R6
-rw-r--r--R/pkg/inst/tests/testthat/test_includePackage.R4
-rw-r--r--R/pkg/inst/tests/testthat/test_parallelize_collect.R26
-rw-r--r--R/pkg/inst/tests/testthat/test_rdd.R172
-rw-r--r--R/pkg/inst/tests/testthat/test_shuffle.R34
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R28
-rw-r--r--R/pkg/inst/tests/testthat/test_take.R32
-rw-r--r--R/pkg/inst/tests/testthat/test_textFile.R26
-rw-r--r--R/pkg/inst/tests/testthat/test_utils.R6
12 files changed, 182 insertions, 182 deletions
diff --git a/R/pkg/inst/tests/testthat/test_binaryFile.R b/R/pkg/inst/tests/testthat/test_binaryFile.R
index 56ac8eb728..b5c279e315 100644
--- a/R/pkg/inst/tests/testthat/test_binaryFile.R
+++ b/R/pkg/inst/tests/testthat/test_binaryFile.R
@@ -31,7 +31,7 @@ test_that("saveAsObjectFile()/objectFile() following textFile() works", {
rdd <- textFile(sc, fileName1, 1)
saveAsObjectFile(rdd, fileName2)
rdd <- objectFile(sc, fileName2)
- expect_equal(collect(rdd), as.list(mockFile))
+ expect_equal(collectRDD(rdd), as.list(mockFile))
unlink(fileName1)
unlink(fileName2, recursive = TRUE)
@@ -44,7 +44,7 @@ test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
rdd <- parallelize(sc, l, 1)
saveAsObjectFile(rdd, fileName)
rdd <- objectFile(sc, fileName)
- expect_equal(collect(rdd), l)
+ expect_equal(collectRDD(rdd), l)
unlink(fileName, recursive = TRUE)
})
@@ -64,7 +64,7 @@ test_that("saveAsObjectFile()/objectFile() following RDD transformations works",
saveAsObjectFile(counts, fileName2)
counts <- objectFile(sc, fileName2)
- output <- collect(counts)
+ output <- collectRDD(counts)
expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1),
list("is", 2))
expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
@@ -83,7 +83,7 @@ test_that("saveAsObjectFile()/objectFile() works with multiple paths", {
saveAsObjectFile(rdd2, fileName2)
rdd <- objectFile(sc, c(fileName1, fileName2))
- expect_equal(count(rdd), 2)
+ expect_equal(countRDD(rdd), 2)
unlink(fileName1, recursive = TRUE)
unlink(fileName2, recursive = TRUE)
diff --git a/R/pkg/inst/tests/testthat/test_binary_function.R b/R/pkg/inst/tests/testthat/test_binary_function.R
index ae7abe20cc..59cb2e6204 100644
--- a/R/pkg/inst/tests/testthat/test_binary_function.R
+++ b/R/pkg/inst/tests/testthat/test_binary_function.R
@@ -29,7 +29,7 @@ rdd <- parallelize(sc, nums, 2L)
mockFile <- c("Spark is pretty.", "Spark is awesome.")
test_that("union on two RDDs", {
- actual <- collect(unionRDD(rdd, rdd))
+ actual <- collectRDD(unionRDD(rdd, rdd))
expect_equal(actual, as.list(rep(nums, 2)))
fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
@@ -37,13 +37,13 @@ test_that("union on two RDDs", {
text.rdd <- textFile(sc, fileName)
union.rdd <- unionRDD(rdd, text.rdd)
- actual <- collect(union.rdd)
+ actual <- collectRDD(union.rdd)
expect_equal(actual, c(as.list(nums), mockFile))
expect_equal(getSerializedMode(union.rdd), "byte")
rdd <- map(text.rdd, function(x) {x})
union.rdd <- unionRDD(rdd, text.rdd)
- actual <- collect(union.rdd)
+ actual <- collectRDD(union.rdd)
expect_equal(actual, as.list(c(mockFile, mockFile)))
expect_equal(getSerializedMode(union.rdd), "byte")
@@ -54,14 +54,14 @@ test_that("cogroup on two RDDs", {
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
- actual <- collect(cogroup.rdd)
+ actual <- collectRDD(cogroup.rdd)
expect_equal(actual,
list(list(1, list(list(1), list(2, 3))), list(2, list(list(4), list()))))
rdd1 <- parallelize(sc, list(list("a", 1), list("a", 4)))
rdd2 <- parallelize(sc, list(list("b", 2), list("a", 3)))
cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
- actual <- collect(cogroup.rdd)
+ actual <- collectRDD(cogroup.rdd)
expected <- list(list("b", list(list(), list(2))), list("a", list(list(1, 4), list(3))))
expect_equal(sortKeyValueList(actual),
@@ -72,7 +72,7 @@ test_that("zipPartitions() on RDDs", {
rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
- actual <- collect(zipPartitions(rdd1, rdd2, rdd3,
+ actual <- collectRDD(zipPartitions(rdd1, rdd2, rdd3,
func = function(x, y, z) { list(list(x, y, z))} ))
expect_equal(actual,
list(list(1, c(1, 2), c(1, 2, 3)), list(2, c(3, 4), c(4, 5, 6))))
@@ -82,19 +82,19 @@ test_that("zipPartitions() on RDDs", {
writeLines(mockFile, fileName)
rdd <- textFile(sc, fileName, 1)
- actual <- collect(zipPartitions(rdd, rdd,
+ actual <- collectRDD(zipPartitions(rdd, rdd,
func = function(x, y) { list(paste(x, y, sep = "\n")) }))
expected <- list(paste(mockFile, mockFile, sep = "\n"))
expect_equal(actual, expected)
rdd1 <- parallelize(sc, 0:1, 1)
- actual <- collect(zipPartitions(rdd1, rdd,
+ actual <- collectRDD(zipPartitions(rdd1, rdd,
func = function(x, y) { list(x + nchar(y)) }))
expected <- list(0:1 + nchar(mockFile))
expect_equal(actual, expected)
rdd <- map(rdd, function(x) { x })
- actual <- collect(zipPartitions(rdd, rdd1,
+ actual <- collectRDD(zipPartitions(rdd, rdd1,
func = function(x, y) { list(y + nchar(x)) }))
expect_equal(actual, expected)
diff --git a/R/pkg/inst/tests/testthat/test_broadcast.R b/R/pkg/inst/tests/testthat/test_broadcast.R
index c7fefb5cf9..65f204d096 100644
--- a/R/pkg/inst/tests/testthat/test_broadcast.R
+++ b/R/pkg/inst/tests/testthat/test_broadcast.R
@@ -32,7 +32,7 @@ test_that("using broadcast variable", {
useBroadcast <- function(x) {
sum(SparkR:::value(randomMatBr) * x)
}
- actual <- collect(lapply(rrdd, useBroadcast))
+ actual <- collectRDD(lapply(rrdd, useBroadcast))
expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
expect_equal(actual, expected)
})
@@ -43,7 +43,7 @@ test_that("without using broadcast variable", {
useBroadcast <- function(x) {
sum(randomMat * x)
}
- actual <- collect(lapply(rrdd, useBroadcast))
+ actual <- collectRDD(lapply(rrdd, useBroadcast))
expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
expect_equal(actual, expected)
})
diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R
index 8bd134a58d..1ab7f319df 100644
--- a/R/pkg/inst/tests/testthat/test_context.R
+++ b/R/pkg/inst/tests/testthat/test_context.R
@@ -58,7 +58,7 @@ test_that("repeatedly starting and stopping SparkR", {
for (i in 1:4) {
sc <- suppressWarnings(sparkR.init())
rdd <- parallelize(sc, 1:20, 2L)
- expect_equal(count(rdd), 20)
+ expect_equal(countRDD(rdd), 20)
suppressWarnings(sparkR.stop())
}
})
@@ -90,8 +90,8 @@ test_that("rdd GC across sparkR.stop", {
rm(rdd2)
gc()
- count(rdd3)
- count(rdd4)
+ countRDD(rdd3)
+ countRDD(rdd4)
sparkR.session.stop()
})
diff --git a/R/pkg/inst/tests/testthat/test_includePackage.R b/R/pkg/inst/tests/testthat/test_includePackage.R
index ca2b900572..563ea298c2 100644
--- a/R/pkg/inst/tests/testthat/test_includePackage.R
+++ b/R/pkg/inst/tests/testthat/test_includePackage.R
@@ -37,7 +37,7 @@ test_that("include inside function", {
}
data <- lapplyPartition(rdd, generateData)
- actual <- collect(data)
+ actual <- collectRDD(data)
}
})
@@ -53,7 +53,7 @@ test_that("use include package", {
includePackage(sc, plyr)
data <- lapplyPartition(rdd, generateData)
- actual <- collect(data)
+ actual <- collectRDD(data)
}
})
diff --git a/R/pkg/inst/tests/testthat/test_parallelize_collect.R b/R/pkg/inst/tests/testthat/test_parallelize_collect.R
index 959d7ab9e6..55972e1ba4 100644
--- a/R/pkg/inst/tests/testthat/test_parallelize_collect.R
+++ b/R/pkg/inst/tests/testthat/test_parallelize_collect.R
@@ -67,22 +67,22 @@ test_that("parallelize() on simple vectors and lists returns an RDD", {
test_that("collect(), following a parallelize(), gives back the original collections", {
numVectorRDD <- parallelize(jsc, numVector, 10)
- expect_equal(collect(numVectorRDD), as.list(numVector))
+ expect_equal(collectRDD(numVectorRDD), as.list(numVector))
numListRDD <- parallelize(jsc, numList, 1)
numListRDD2 <- parallelize(jsc, numList, 4)
- expect_equal(collect(numListRDD), as.list(numList))
- expect_equal(collect(numListRDD2), as.list(numList))
+ expect_equal(collectRDD(numListRDD), as.list(numList))
+ expect_equal(collectRDD(numListRDD2), as.list(numList))
strVectorRDD <- parallelize(jsc, strVector, 2)
strVectorRDD2 <- parallelize(jsc, strVector, 3)
- expect_equal(collect(strVectorRDD), as.list(strVector))
- expect_equal(collect(strVectorRDD2), as.list(strVector))
+ expect_equal(collectRDD(strVectorRDD), as.list(strVector))
+ expect_equal(collectRDD(strVectorRDD2), as.list(strVector))
strListRDD <- parallelize(jsc, strList, 4)
strListRDD2 <- parallelize(jsc, strList, 1)
- expect_equal(collect(strListRDD), as.list(strList))
- expect_equal(collect(strListRDD2), as.list(strList))
+ expect_equal(collectRDD(strListRDD), as.list(strList))
+ expect_equal(collectRDD(strListRDD2), as.list(strList))
})
test_that("regression: collect() following a parallelize() does not drop elements", {
@@ -90,7 +90,7 @@ test_that("regression: collect() following a parallelize() does not drop element
collLen <- 10
numPart <- 6
expected <- runif(collLen)
- actual <- collect(parallelize(jsc, expected, numPart))
+ actual <- collectRDD(parallelize(jsc, expected, numPart))
expect_equal(actual, as.list(expected))
})
@@ -99,14 +99,14 @@ test_that("parallelize() and collect() work for lists of pairs (pairwise data)",
numPairsRDDD1 <- parallelize(jsc, numPairs, 1)
numPairsRDDD2 <- parallelize(jsc, numPairs, 2)
numPairsRDDD3 <- parallelize(jsc, numPairs, 3)
- expect_equal(collect(numPairsRDDD1), numPairs)
- expect_equal(collect(numPairsRDDD2), numPairs)
- expect_equal(collect(numPairsRDDD3), numPairs)
+ expect_equal(collectRDD(numPairsRDDD1), numPairs)
+ expect_equal(collectRDD(numPairsRDDD2), numPairs)
+ expect_equal(collectRDD(numPairsRDDD3), numPairs)
# can also leave out the parameter name, if the params are supplied in order
strPairsRDDD1 <- parallelize(jsc, strPairs, 1)
strPairsRDDD2 <- parallelize(jsc, strPairs, 2)
- expect_equal(collect(strPairsRDDD1), strPairs)
- expect_equal(collect(strPairsRDDD2), strPairs)
+ expect_equal(collectRDD(strPairsRDDD1), strPairs)
+ expect_equal(collectRDD(strPairsRDDD2), strPairs)
})
sparkR.session.stop()
diff --git a/R/pkg/inst/tests/testthat/test_rdd.R b/R/pkg/inst/tests/testthat/test_rdd.R
index 508a3a7dfd..a3d66c245a 100644
--- a/R/pkg/inst/tests/testthat/test_rdd.R
+++ b/R/pkg/inst/tests/testthat/test_rdd.R
@@ -34,14 +34,14 @@ test_that("get number of partitions in RDD", {
})
test_that("first on RDD", {
- expect_equal(first(rdd), 1)
+ expect_equal(firstRDD(rdd), 1)
newrdd <- lapply(rdd, function(x) x + 1)
- expect_equal(first(newrdd), 2)
+ expect_equal(firstRDD(newrdd), 2)
})
test_that("count and length on RDD", {
- expect_equal(count(rdd), 10)
- expect_equal(length(rdd), 10)
+ expect_equal(countRDD(rdd), 10)
+ expect_equal(lengthRDD(rdd), 10)
})
test_that("count by values and keys", {
@@ -57,40 +57,40 @@ test_that("count by values and keys", {
test_that("lapply on RDD", {
multiples <- lapply(rdd, function(x) { 2 * x })
- actual <- collect(multiples)
+ actual <- collectRDD(multiples)
expect_equal(actual, as.list(nums * 2))
})
test_that("lapplyPartition on RDD", {
sums <- lapplyPartition(rdd, function(part) { sum(unlist(part)) })
- actual <- collect(sums)
+ actual <- collectRDD(sums)
expect_equal(actual, list(15, 40))
})
test_that("mapPartitions on RDD", {
sums <- mapPartitions(rdd, function(part) { sum(unlist(part)) })
- actual <- collect(sums)
+ actual <- collectRDD(sums)
expect_equal(actual, list(15, 40))
})
test_that("flatMap() on RDDs", {
flat <- flatMap(intRdd, function(x) { list(x, x) })
- actual <- collect(flat)
+ actual <- collectRDD(flat)
expect_equal(actual, rep(intPairs, each = 2))
})
test_that("filterRDD on RDD", {
filtered.rdd <- filterRDD(rdd, function(x) { x %% 2 == 0 })
- actual <- collect(filtered.rdd)
+ actual <- collectRDD(filtered.rdd)
expect_equal(actual, list(2, 4, 6, 8, 10))
filtered.rdd <- Filter(function(x) { x[[2]] < 0 }, intRdd)
- actual <- collect(filtered.rdd)
+ actual <- collectRDD(filtered.rdd)
expect_equal(actual, list(list(1L, -1)))
# Filter out all elements.
filtered.rdd <- filterRDD(rdd, function(x) { x > 10 })
- actual <- collect(filtered.rdd)
+ actual <- collectRDD(filtered.rdd)
expect_equal(actual, list())
})
@@ -110,7 +110,7 @@ test_that("several transformations on RDD (a benchmark on PipelinedRDD)", {
part <- as.list(unlist(part) * partIndex + i)
})
rdd2 <- lapply(rdd2, function(x) x + x)
- actual <- collect(rdd2)
+ actual <- collectRDD(rdd2)
expected <- list(24, 24, 24, 24, 24,
168, 170, 172, 174, 176)
expect_equal(actual, expected)
@@ -126,20 +126,20 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
part <- as.list(unlist(part) * partIndex)
})
- cache(rdd2)
+ cacheRDD(rdd2)
expect_true(rdd2@env$isCached)
rdd2 <- lapply(rdd2, function(x) x)
expect_false(rdd2@env$isCached)
- unpersist(rdd2)
+ unpersistRDD(rdd2)
expect_false(rdd2@env$isCached)
- persist(rdd2, "MEMORY_AND_DISK")
+ persistRDD(rdd2, "MEMORY_AND_DISK")
expect_true(rdd2@env$isCached)
rdd2 <- lapply(rdd2, function(x) x)
expect_false(rdd2@env$isCached)
- unpersist(rdd2)
+ unpersistRDD(rdd2)
expect_false(rdd2@env$isCached)
tempDir <- tempfile(pattern = "checkpoint")
@@ -152,7 +152,7 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
expect_false(rdd2@env$isCheckpointed)
# make sure the data is collectable
- collect(rdd2)
+ collectRDD(rdd2)
unlink(tempDir)
})
@@ -169,21 +169,21 @@ test_that("reduce on RDD", {
test_that("lapply with dependency", {
fa <- 5
multiples <- lapply(rdd, function(x) { fa * x })
- actual <- collect(multiples)
+ actual <- collectRDD(multiples)
expect_equal(actual, as.list(nums * 5))
})
test_that("lapplyPartitionsWithIndex on RDDs", {
func <- function(partIndex, part) { list(partIndex, Reduce("+", part)) }
- actual <- collect(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
+ actual <- collectRDD(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
expect_equal(actual, list(list(0, 15), list(1, 40)))
pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L)
partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 }
mkTup <- function(partIndex, part) { list(partIndex, part) }
- actual <- collect(lapplyPartitionsWithIndex(
- partitionBy(pairsRDD, 2L, partitionByParity),
+ actual <- collectRDD(lapplyPartitionsWithIndex(
+ partitionByRDD(pairsRDD, 2L, partitionByParity),
mkTup),
FALSE)
expect_equal(actual, list(list(0, list(list(1, 2), list(3, 4))),
@@ -191,7 +191,7 @@ test_that("lapplyPartitionsWithIndex on RDDs", {
})
test_that("sampleRDD() on RDDs", {
- expect_equal(unlist(collect(sampleRDD(rdd, FALSE, 1.0, 2014L))), nums)
+ expect_equal(unlist(collectRDD(sampleRDD(rdd, FALSE, 1.0, 2014L))), nums)
})
test_that("takeSample() on RDDs", {
@@ -238,7 +238,7 @@ test_that("takeSample() on RDDs", {
test_that("mapValues() on pairwise RDDs", {
multiples <- mapValues(intRdd, function(x) { x * 2 })
- actual <- collect(multiples)
+ actual <- collectRDD(multiples)
expected <- lapply(intPairs, function(x) {
list(x[[1]], x[[2]] * 2)
})
@@ -247,11 +247,11 @@ test_that("mapValues() on pairwise RDDs", {
test_that("flatMapValues() on pairwise RDDs", {
l <- parallelize(sc, list(list(1, c(1, 2)), list(2, c(3, 4))))
- actual <- collect(flatMapValues(l, function(x) { x }))
+ actual <- collectRDD(flatMapValues(l, function(x) { x }))
expect_equal(actual, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
# Generate x to x+1 for every value
- actual <- collect(flatMapValues(intRdd, function(x) { x: (x + 1) }))
+ actual <- collectRDD(flatMapValues(intRdd, function(x) { x: (x + 1) }))
expect_equal(actual,
list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101),
list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201)))
@@ -273,8 +273,8 @@ test_that("reduceByKeyLocally() on PairwiseRDDs", {
test_that("distinct() on RDDs", {
nums.rep2 <- rep(1:10, 2)
rdd.rep2 <- parallelize(sc, nums.rep2, 2L)
- uniques <- distinct(rdd.rep2)
- actual <- sort(unlist(collect(uniques)))
+ uniques <- distinctRDD(rdd.rep2)
+ actual <- sort(unlist(collectRDD(uniques)))
expect_equal(actual, nums)
})
@@ -296,7 +296,7 @@ test_that("sumRDD() on RDDs", {
test_that("keyBy on RDDs", {
func <- function(x) { x * x }
keys <- keyBy(rdd, func)
- actual <- collect(keys)
+ actual <- collectRDD(keys)
expect_equal(actual, lapply(nums, function(x) { list(func(x), x) }))
})
@@ -304,12 +304,12 @@ test_that("repartition/coalesce on RDDs", {
rdd <- parallelize(sc, 1:20, 4L) # each partition contains 5 elements
# repartition
- r1 <- repartition(rdd, 2)
+ r1 <- repartitionRDD(rdd, 2)
expect_equal(getNumPartitions(r1), 2L)
count <- length(collectPartition(r1, 0L))
expect_true(count >= 8 && count <= 12)
- r2 <- repartition(rdd, 6)
+ r2 <- repartitionRDD(rdd, 6)
expect_equal(getNumPartitions(r2), 6L)
count <- length(collectPartition(r2, 0L))
expect_true(count >= 0 && count <= 4)
@@ -323,12 +323,12 @@ test_that("repartition/coalesce on RDDs", {
test_that("sortBy() on RDDs", {
sortedRdd <- sortBy(rdd, function(x) { x * x }, ascending = FALSE)
- actual <- collect(sortedRdd)
+ actual <- collectRDD(sortedRdd)
expect_equal(actual, as.list(sort(nums, decreasing = TRUE)))
rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
sortedRdd2 <- sortBy(rdd2, function(x) { x * x })
- actual <- collect(sortedRdd2)
+ actual <- collectRDD(sortedRdd2)
expect_equal(actual, as.list(nums))
})
@@ -380,13 +380,13 @@ test_that("aggregateRDD() on RDDs", {
test_that("zipWithUniqueId() on RDDs", {
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
- actual <- collect(zipWithUniqueId(rdd))
+ actual <- collectRDD(zipWithUniqueId(rdd))
expected <- list(list("a", 0), list("b", 3), list("c", 1),
list("d", 4), list("e", 2))
expect_equal(actual, expected)
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
- actual <- collect(zipWithUniqueId(rdd))
+ actual <- collectRDD(zipWithUniqueId(rdd))
expected <- list(list("a", 0), list("b", 1), list("c", 2),
list("d", 3), list("e", 4))
expect_equal(actual, expected)
@@ -394,13 +394,13 @@ test_that("zipWithUniqueId() on RDDs", {
test_that("zipWithIndex() on RDDs", {
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
- actual <- collect(zipWithIndex(rdd))
+ actual <- collectRDD(zipWithIndex(rdd))
expected <- list(list("a", 0), list("b", 1), list("c", 2),
list("d", 3), list("e", 4))
expect_equal(actual, expected)
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
- actual <- collect(zipWithIndex(rdd))
+ actual <- collectRDD(zipWithIndex(rdd))
expected <- list(list("a", 0), list("b", 1), list("c", 2),
list("d", 3), list("e", 4))
expect_equal(actual, expected)
@@ -408,35 +408,35 @@ test_that("zipWithIndex() on RDDs", {
test_that("glom() on RDD", {
rdd <- parallelize(sc, as.list(1:4), 2L)
- actual <- collect(glom(rdd))
+ actual <- collectRDD(glom(rdd))
expect_equal(actual, list(list(1, 2), list(3, 4)))
})
test_that("keys() on RDDs", {
keys <- keys(intRdd)
- actual <- collect(keys)
+ actual <- collectRDD(keys)
expect_equal(actual, lapply(intPairs, function(x) { x[[1]] }))
})
test_that("values() on RDDs", {
values <- values(intRdd)
- actual <- collect(values)
+ actual <- collectRDD(values)
expect_equal(actual, lapply(intPairs, function(x) { x[[2]] }))
})
test_that("pipeRDD() on RDDs", {
- actual <- collect(pipeRDD(rdd, "more"))
+ actual <- collectRDD(pipeRDD(rdd, "more"))
expected <- as.list(as.character(1:10))
expect_equal(actual, expected)
trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n"))
- actual <- collect(pipeRDD(trailed.rdd, "sort"))
+ actual <- collectRDD(pipeRDD(trailed.rdd, "sort"))
expected <- list("", "1", "2", "3")
expect_equal(actual, expected)
rev.nums <- 9:0
rev.rdd <- parallelize(sc, rev.nums, 2L)
- actual <- collect(pipeRDD(rev.rdd, "sort"))
+ actual <- collectRDD(pipeRDD(rev.rdd, "sort"))
expected <- as.list(as.character(c(5:9, 0:4)))
expect_equal(actual, expected)
})
@@ -444,7 +444,7 @@ test_that("pipeRDD() on RDDs", {
test_that("zipRDD() on RDDs", {
rdd1 <- parallelize(sc, 0:4, 2)
rdd2 <- parallelize(sc, 1000:1004, 2)
- actual <- collect(zipRDD(rdd1, rdd2))
+ actual <- collectRDD(zipRDD(rdd1, rdd2))
expect_equal(actual,
list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004)))
@@ -453,17 +453,17 @@ test_that("zipRDD() on RDDs", {
writeLines(mockFile, fileName)
rdd <- textFile(sc, fileName, 1)
- actual <- collect(zipRDD(rdd, rdd))
+ actual <- collectRDD(zipRDD(rdd, rdd))
expected <- lapply(mockFile, function(x) { list(x, x) })
expect_equal(actual, expected)
rdd1 <- parallelize(sc, 0:1, 1)
- actual <- collect(zipRDD(rdd1, rdd))
+ actual <- collectRDD(zipRDD(rdd1, rdd))
expected <- lapply(0:1, function(x) { list(x, mockFile[x + 1]) })
expect_equal(actual, expected)
rdd1 <- map(rdd, function(x) { x })
- actual <- collect(zipRDD(rdd, rdd1))
+ actual <- collectRDD(zipRDD(rdd, rdd1))
expected <- lapply(mockFile, function(x) { list(x, x) })
expect_equal(actual, expected)
@@ -472,7 +472,7 @@ test_that("zipRDD() on RDDs", {
test_that("cartesian() on RDDs", {
rdd <- parallelize(sc, 1:3)
- actual <- collect(cartesian(rdd, rdd))
+ actual <- collectRDD(cartesian(rdd, rdd))
expect_equal(sortKeyValueList(actual),
list(
list(1, 1), list(1, 2), list(1, 3),
@@ -481,7 +481,7 @@ test_that("cartesian() on RDDs", {
# test case where one RDD is empty
emptyRdd <- parallelize(sc, list())
- actual <- collect(cartesian(rdd, emptyRdd))
+ actual <- collectRDD(cartesian(rdd, emptyRdd))
expect_equal(actual, list())
mockFile <- c("Spark is pretty.", "Spark is awesome.")
@@ -489,7 +489,7 @@ test_that("cartesian() on RDDs", {
writeLines(mockFile, fileName)
rdd <- textFile(sc, fileName)
- actual <- collect(cartesian(rdd, rdd))
+ actual <- collectRDD(cartesian(rdd, rdd))
expected <- list(
list("Spark is awesome.", "Spark is pretty."),
list("Spark is awesome.", "Spark is awesome."),
@@ -498,7 +498,7 @@ test_that("cartesian() on RDDs", {
expect_equal(sortKeyValueList(actual), expected)
rdd1 <- parallelize(sc, 0:1)
- actual <- collect(cartesian(rdd1, rdd))
+ actual <- collectRDD(cartesian(rdd1, rdd))
expect_equal(sortKeyValueList(actual),
list(
list(0, "Spark is pretty."),
@@ -507,7 +507,7 @@ test_that("cartesian() on RDDs", {
list(1, "Spark is awesome.")))
rdd1 <- map(rdd, function(x) { x })
- actual <- collect(cartesian(rdd, rdd1))
+ actual <- collectRDD(cartesian(rdd, rdd1))
expect_equal(sortKeyValueList(actual), expected)
unlink(fileName)
@@ -518,24 +518,24 @@ test_that("subtract() on RDDs", {
rdd1 <- parallelize(sc, l)
# subtract by itself
- actual <- collect(subtract(rdd1, rdd1))
+ actual <- collectRDD(subtract(rdd1, rdd1))
expect_equal(actual, list())
# subtract by an empty RDD
rdd2 <- parallelize(sc, list())
- actual <- collect(subtract(rdd1, rdd2))
+ actual <- collectRDD(subtract(rdd1, rdd2))
expect_equal(as.list(sort(as.vector(actual, mode = "integer"))),
l)
rdd2 <- parallelize(sc, list(2, 4))
- actual <- collect(subtract(rdd1, rdd2))
+ actual <- collectRDD(subtract(rdd1, rdd2))
expect_equal(as.list(sort(as.vector(actual, mode = "integer"))),
list(1, 1, 3))
l <- list("a", "a", "b", "b", "c", "d")
rdd1 <- parallelize(sc, l)
rdd2 <- parallelize(sc, list("b", "d"))
- actual <- collect(subtract(rdd1, rdd2))
+ actual <- collectRDD(subtract(rdd1, rdd2))
expect_equal(as.list(sort(as.vector(actual, mode = "character"))),
list("a", "a", "c"))
})
@@ -546,17 +546,17 @@ test_that("subtractByKey() on pairwise RDDs", {
rdd1 <- parallelize(sc, l)
# subtractByKey by itself
- actual <- collect(subtractByKey(rdd1, rdd1))
+ actual <- collectRDD(subtractByKey(rdd1, rdd1))
expect_equal(actual, list())
# subtractByKey by an empty RDD
rdd2 <- parallelize(sc, list())
- actual <- collect(subtractByKey(rdd1, rdd2))
+ actual <- collectRDD(subtractByKey(rdd1, rdd2))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(l))
rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
- actual <- collect(subtractByKey(rdd1, rdd2))
+ actual <- collectRDD(subtractByKey(rdd1, rdd2))
expect_equal(actual,
list(list("b", 4), list("b", 5)))
@@ -564,76 +564,76 @@ test_that("subtractByKey() on pairwise RDDs", {
list(2, 5), list(1, 2))
rdd1 <- parallelize(sc, l)
rdd2 <- parallelize(sc, list(list(1, 3), list(3, 1)))
- actual <- collect(subtractByKey(rdd1, rdd2))
+ actual <- collectRDD(subtractByKey(rdd1, rdd2))
expect_equal(actual,
list(list(2, 4), list(2, 5)))
})
test_that("intersection() on RDDs", {
# intersection with self
- actual <- collect(intersection(rdd, rdd))
+ actual <- collectRDD(intersection(rdd, rdd))
expect_equal(sort(as.integer(actual)), nums)
# intersection with an empty RDD
emptyRdd <- parallelize(sc, list())
- actual <- collect(intersection(rdd, emptyRdd))
+ actual <- collectRDD(intersection(rdd, emptyRdd))
expect_equal(actual, list())
rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
- actual <- collect(intersection(rdd1, rdd2))
+ actual <- collectRDD(intersection(rdd1, rdd2))
expect_equal(sort(as.integer(actual)), 1:3)
})
test_that("join() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
- actual <- collect(join(rdd1, rdd2, 2L))
+ actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list(1, list(1, 2)), list(1, list(1, 3)))))
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4)))
rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3)))
- actual <- collect(join(rdd1, rdd2, 2L))
+ actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list("a", list(1, 2)), list("a", list(1, 3)))))
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
- actual <- collect(join(rdd1, rdd2, 2L))
+ actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
expect_equal(actual, list())
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
- actual <- collect(join(rdd1, rdd2, 2L))
+ actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
expect_equal(actual, list())
})
test_that("leftOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
- actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
expected <- list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4)))
rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3)))
- actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
expected <- list(list("b", list(4, NULL)), list("a", list(1, 2)), list("a", list(1, 3)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
- actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
expected <- list(list(1, list(1, NULL)), list(2, list(2, NULL)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
- actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
expected <- list(list("b", list(2, NULL)), list("a", list(1, NULL)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
@@ -642,26 +642,26 @@ test_that("leftOuterJoin() on pairwise RDDs", {
test_that("rightOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
- actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3)))
rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4)))
- actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
- actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
- actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
})
@@ -669,14 +669,14 @@ test_that("rightOuterJoin() on pairwise RDDs", {
test_that("fullOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
- actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
expected <- list(list(1, list(2, 1)), list(1, list(3, 1)),
list(2, list(NULL, 4)), list(3, list(3, NULL)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3), list("c", 1)))
rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4)))
- actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)),
list("a", list(3, 1)), list("c", list(1, NULL)))
expect_equal(sortKeyValueList(actual),
@@ -684,14 +684,14 @@ test_that("fullOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
- actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)),
list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
- actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)),
list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
@@ -700,21 +700,21 @@ test_that("fullOuterJoin() on pairwise RDDs", {
test_that("sortByKey() on pairwise RDDs", {
numPairsRdd <- map(rdd, function(x) { list (x, x) })
sortedRdd <- sortByKey(numPairsRdd, ascending = FALSE)
- actual <- collect(sortedRdd)
+ actual <- collectRDD(sortedRdd)
numPairs <- lapply(nums, function(x) { list (x, x) })
expect_equal(actual, sortKeyValueList(numPairs, decreasing = TRUE))
rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
numPairsRdd2 <- map(rdd2, function(x) { list (x, x) })
sortedRdd2 <- sortByKey(numPairsRdd2)
- actual <- collect(sortedRdd2)
+ actual <- collectRDD(sortedRdd2)
expect_equal(actual, numPairs)
# sort by string keys
l <- list(list("a", 1), list("b", 2), list("1", 3), list("d", 4), list("2", 5))
rdd3 <- parallelize(sc, l, 2L)
sortedRdd3 <- sortByKey(rdd3)
- actual <- collect(sortedRdd3)
+ actual <- collectRDD(sortedRdd3)
expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
# test on the boundary cases
@@ -722,27 +722,27 @@ test_that("sortByKey() on pairwise RDDs", {
# boundary case 1: the RDD to be sorted has only 1 partition
rdd4 <- parallelize(sc, l, 1L)
sortedRdd4 <- sortByKey(rdd4)
- actual <- collect(sortedRdd4)
+ actual <- collectRDD(sortedRdd4)
expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
# boundary case 2: the sorted RDD has only 1 partition
rdd5 <- parallelize(sc, l, 2L)
sortedRdd5 <- sortByKey(rdd5, numPartitions = 1L)
- actual <- collect(sortedRdd5)
+ actual <- collectRDD(sortedRdd5)
expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
# boundary case 3: the RDD to be sorted has only 1 element
l2 <- list(list("a", 1))
rdd6 <- parallelize(sc, l2, 2L)
sortedRdd6 <- sortByKey(rdd6)
- actual <- collect(sortedRdd6)
+ actual <- collectRDD(sortedRdd6)
expect_equal(actual, l2)
# boundary case 4: the RDD to be sorted has 0 element
l3 <- list()
rdd7 <- parallelize(sc, l3, 2L)
sortedRdd7 <- sortByKey(rdd7)
- actual <- collect(sortedRdd7)
+ actual <- collectRDD(sortedRdd7)
expect_equal(actual, l3)
})
@@ -766,7 +766,7 @@ test_that("collectAsMap() on a pairwise RDD", {
test_that("show()", {
rdd <- parallelize(sc, list(1:10))
- expect_output(show(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+")
+ expect_output(showRDD(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+")
})
test_that("sampleByKey() on pairwise RDDs", {
diff --git a/R/pkg/inst/tests/testthat/test_shuffle.R b/R/pkg/inst/tests/testthat/test_shuffle.R
index 2586056773..d38efab0fd 100644
--- a/R/pkg/inst/tests/testthat/test_shuffle.R
+++ b/R/pkg/inst/tests/testthat/test_shuffle.R
@@ -39,7 +39,7 @@ strListRDD <- parallelize(sc, strList, 4)
test_that("groupByKey for integers", {
grouped <- groupByKey(intRdd, 2L)
- actual <- collect(grouped)
+ actual <- collectRDD(grouped)
expected <- list(list(2L, list(100, 1)), list(1L, list(-1, 200)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -48,7 +48,7 @@ test_that("groupByKey for integers", {
test_that("groupByKey for doubles", {
grouped <- groupByKey(doubleRdd, 2L)
- actual <- collect(grouped)
+ actual <- collectRDD(grouped)
expected <- list(list(1.5, list(-1, 200)), list(2.5, list(100, 1)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -57,7 +57,7 @@ test_that("groupByKey for doubles", {
test_that("reduceByKey for ints", {
reduced <- reduceByKey(intRdd, "+", 2L)
- actual <- collect(reduced)
+ actual <- collectRDD(reduced)
expected <- list(list(2L, 101), list(1L, 199))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -65,7 +65,7 @@ test_that("reduceByKey for ints", {
test_that("reduceByKey for doubles", {
reduced <- reduceByKey(doubleRdd, "+", 2L)
- actual <- collect(reduced)
+ actual <- collectRDD(reduced)
expected <- list(list(1.5, 199), list(2.5, 101))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -74,7 +74,7 @@ test_that("reduceByKey for doubles", {
test_that("combineByKey for ints", {
reduced <- combineByKey(intRdd, function(x) { x }, "+", "+", 2L)
- actual <- collect(reduced)
+ actual <- collectRDD(reduced)
expected <- list(list(2L, 101), list(1L, 199))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -82,7 +82,7 @@ test_that("combineByKey for ints", {
test_that("combineByKey for doubles", {
reduced <- combineByKey(doubleRdd, function(x) { x }, "+", "+", 2L)
- actual <- collect(reduced)
+ actual <- collectRDD(reduced)
expected <- list(list(1.5, 199), list(2.5, 101))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -94,7 +94,7 @@ test_that("combineByKey for characters", {
list("other", 3L), list("max", 4L)), 2L)
reduced <- combineByKey(stringKeyRDD,
function(x) { x }, "+", "+", 2L)
- actual <- collect(reduced)
+ actual <- collectRDD(reduced)
expected <- list(list("max", 5L), list("min", 2L), list("other", 3L))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -109,7 +109,7 @@ test_that("aggregateByKey", {
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
- actual <- collect(aggregatedRDD)
+ actual <- collectRDD(aggregatedRDD)
expected <- list(list(1, list(3, 2)), list(2, list(7, 2)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -122,7 +122,7 @@ test_that("aggregateByKey", {
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
- actual <- collect(aggregatedRDD)
+ actual <- collectRDD(aggregatedRDD)
expected <- list(list("a", list(3, 2)), list("b", list(7, 2)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -132,7 +132,7 @@ test_that("foldByKey", {
# test foldByKey for int keys
folded <- foldByKey(intRdd, 0, "+", 2L)
- actual <- collect(folded)
+ actual <- collectRDD(folded)
expected <- list(list(2L, 101), list(1L, 199))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -140,7 +140,7 @@ test_that("foldByKey", {
# test foldByKey for double keys
folded <- foldByKey(doubleRdd, 0, "+", 2L)
- actual <- collect(folded)
+ actual <- collectRDD(folded)
expected <- list(list(1.5, 199), list(2.5, 101))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -151,7 +151,7 @@ test_that("foldByKey", {
stringKeyRDD <- parallelize(sc, stringKeyPairs)
folded <- foldByKey(stringKeyRDD, 0, "+", 2L)
- actual <- collect(folded)
+ actual <- collectRDD(folded)
expected <- list(list("b", 101), list("a", 199))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -159,14 +159,14 @@ test_that("foldByKey", {
# test foldByKey for empty pair RDD
rdd <- parallelize(sc, list())
folded <- foldByKey(rdd, 0, "+", 2L)
- actual <- collect(folded)
+ actual <- collectRDD(folded)
expected <- list()
expect_equal(actual, expected)
# test foldByKey for RDD with only 1 pair
rdd <- parallelize(sc, list(list(1, 1)))
folded <- foldByKey(rdd, 0, "+", 2L)
- actual <- collect(folded)
+ actual <- collectRDD(folded)
expected <- list(list(1, 1))
expect_equal(actual, expected)
})
@@ -175,7 +175,7 @@ test_that("partitionBy() partitions data correctly", {
# Partition by magnitude
partitionByMagnitude <- function(key) { if (key >= 3) 1 else 0 }
- resultRDD <- partitionBy(numPairsRdd, 2L, partitionByMagnitude)
+ resultRDD <- partitionByRDD(numPairsRdd, 2L, partitionByMagnitude)
expected_first <- list(list(1, 100), list(2, 200)) # key less than 3
expected_second <- list(list(4, -1), list(3, 1), list(3, 0)) # key greater than or equal 3
@@ -191,7 +191,7 @@ test_that("partitionBy works with dependencies", {
partitionByParity <- function(key) { if (key %% 2 == kOne) 7 else 4 }
# Partition by parity
- resultRDD <- partitionBy(numPairsRdd, numPartitions = 2L, partitionByParity)
+ resultRDD <- partitionByRDD(numPairsRdd, numPartitions = 2L, partitionByParity)
# keys even; 100 %% 2 == 0
expected_first <- list(list(2, 200), list(4, -1))
@@ -208,7 +208,7 @@ test_that("test partitionBy with string keys", {
words <- flatMap(strListRDD, function(line) { strsplit(line, " ")[[1]] })
wordCount <- lapply(words, function(word) { list(word, 1L) })
- resultRDD <- partitionBy(wordCount, 2L)
+ resultRDD <- partitionByRDD(wordCount, 2L)
expected_first <- list(list("Dexter", 1), list("Dexter", 1))
expected_second <- list(list("and", 1), list("and", 1))
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 39ed4febe5..3ccb8b6d77 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -490,7 +490,7 @@ test_that("read/write json files", {
test_that("jsonRDD() on a RDD with json string", {
sqlContext <- suppressWarnings(sparkRSQL.init(sc))
rdd <- parallelize(sc, mockLines)
- expect_equal(count(rdd), 3)
+ expect_equal(countRDD(rdd), 3)
df <- suppressWarnings(jsonRDD(sqlContext, rdd))
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 3)
@@ -582,7 +582,7 @@ test_that("toRDD() returns an RRDD", {
df <- read.json(jsonPath)
testRDD <- toRDD(df)
expect_is(testRDD, "RDD")
- expect_equal(count(testRDD), 3)
+ expect_equal(countRDD(testRDD), 3)
})
test_that("union on two RDDs created from DataFrames returns an RRDD", {
@@ -592,7 +592,7 @@ test_that("union on two RDDs created from DataFrames returns an RRDD", {
unioned <- unionRDD(RDD1, RDD2)
expect_is(unioned, "RDD")
expect_equal(getSerializedMode(unioned), "byte")
- expect_equal(collect(unioned)[[2]]$name, "Andy")
+ expect_equal(collectRDD(unioned)[[2]]$name, "Andy")
})
test_that("union on mixed serialization types correctly returns a byte RRDD", {
@@ -614,14 +614,14 @@ test_that("union on mixed serialization types correctly returns a byte RRDD", {
unionByte <- unionRDD(rdd, dfRDD)
expect_is(unionByte, "RDD")
expect_equal(getSerializedMode(unionByte), "byte")
- expect_equal(collect(unionByte)[[1]], 1)
- expect_equal(collect(unionByte)[[12]]$name, "Andy")
+ expect_equal(collectRDD(unionByte)[[1]], 1)
+ expect_equal(collectRDD(unionByte)[[12]]$name, "Andy")
unionString <- unionRDD(textRDD, dfRDD)
expect_is(unionString, "RDD")
expect_equal(getSerializedMode(unionString), "byte")
- expect_equal(collect(unionString)[[1]], "Michael")
- expect_equal(collect(unionString)[[5]]$name, "Andy")
+ expect_equal(collectRDD(unionString)[[1]], "Michael")
+ expect_equal(collectRDD(unionString)[[5]]$name, "Andy")
})
test_that("objectFile() works with row serialization", {
@@ -633,7 +633,7 @@ test_that("objectFile() works with row serialization", {
expect_is(objectIn, "RDD")
expect_equal(getSerializedMode(objectIn), "byte")
- expect_equal(collect(objectIn)[[2]]$age, 30)
+ expect_equal(collectRDD(objectIn)[[2]]$age, 30)
})
test_that("lapply() on a DataFrame returns an RDD with the correct columns", {
@@ -643,7 +643,7 @@ test_that("lapply() on a DataFrame returns an RDD with the correct columns", {
row
})
expect_is(testRDD, "RDD")
- collected <- collect(testRDD)
+ collected <- collectRDD(testRDD)
expect_equal(collected[[1]]$name, "Michael")
expect_equal(collected[[2]]$newCol, 35)
})
@@ -715,10 +715,10 @@ test_that("multiple pipeline transformations result in an RDD with the correct v
row
})
expect_is(second, "RDD")
- expect_equal(count(second), 3)
- expect_equal(collect(second)[[2]]$age, 35)
- expect_true(collect(second)[[2]]$testCol)
- expect_false(collect(second)[[3]]$testCol)
+ expect_equal(countRDD(second), 3)
+ expect_equal(collectRDD(second)[[2]]$age, 35)
+ expect_true(collectRDD(second)[[2]]$testCol)
+ expect_false(collectRDD(second)[[3]]$testCol)
})
test_that("cache(), persist(), and unpersist() on a DataFrame", {
@@ -1608,7 +1608,7 @@ test_that("toJSON() returns an RDD of the correct values", {
testRDD <- toJSON(df)
expect_is(testRDD, "RDD")
expect_equal(getSerializedMode(testRDD), "string")
- expect_equal(collect(testRDD)[[1]], mockLines[1])
+ expect_equal(collectRDD(testRDD)[[1]], mockLines[1])
})
test_that("showDF()", {
diff --git a/R/pkg/inst/tests/testthat/test_take.R b/R/pkg/inst/tests/testthat/test_take.R
index 07f00c9915..aaa532856c 100644
--- a/R/pkg/inst/tests/testthat/test_take.R
+++ b/R/pkg/inst/tests/testthat/test_take.R
@@ -36,34 +36,34 @@ sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext",
test_that("take() gives back the original elements in correct count and order", {
numVectorRDD <- parallelize(sc, numVector, 10)
# case: number of elements to take is less than the size of the first partition
- expect_equal(take(numVectorRDD, 1), as.list(head(numVector, n = 1)))
+ expect_equal(takeRDD(numVectorRDD, 1), as.list(head(numVector, n = 1)))
# case: number of elements to take is the same as the size of the first partition
- expect_equal(take(numVectorRDD, 11), as.list(head(numVector, n = 11)))
+ expect_equal(takeRDD(numVectorRDD, 11), as.list(head(numVector, n = 11)))
# case: number of elements to take is greater than all elements
- expect_equal(take(numVectorRDD, length(numVector)), as.list(numVector))
- expect_equal(take(numVectorRDD, length(numVector) + 1), as.list(numVector))
+ expect_equal(takeRDD(numVectorRDD, length(numVector)), as.list(numVector))
+ expect_equal(takeRDD(numVectorRDD, length(numVector) + 1), as.list(numVector))
numListRDD <- parallelize(sc, numList, 1)
numListRDD2 <- parallelize(sc, numList, 4)
- expect_equal(take(numListRDD, 3), take(numListRDD2, 3))
- expect_equal(take(numListRDD, 5), take(numListRDD2, 5))
- expect_equal(take(numListRDD, 1), as.list(head(numList, n = 1)))
- expect_equal(take(numListRDD2, 999), numList)
+ expect_equal(takeRDD(numListRDD, 3), takeRDD(numListRDD2, 3))
+ expect_equal(takeRDD(numListRDD, 5), takeRDD(numListRDD2, 5))
+ expect_equal(takeRDD(numListRDD, 1), as.list(head(numList, n = 1)))
+ expect_equal(takeRDD(numListRDD2, 999), numList)
strVectorRDD <- parallelize(sc, strVector, 2)
strVectorRDD2 <- parallelize(sc, strVector, 3)
- expect_equal(take(strVectorRDD, 4), as.list(strVector))
- expect_equal(take(strVectorRDD2, 2), as.list(head(strVector, n = 2)))
+ expect_equal(takeRDD(strVectorRDD, 4), as.list(strVector))
+ expect_equal(takeRDD(strVectorRDD2, 2), as.list(head(strVector, n = 2)))
strListRDD <- parallelize(sc, strList, 4)
strListRDD2 <- parallelize(sc, strList, 1)
- expect_equal(take(strListRDD, 3), as.list(head(strList, n = 3)))
- expect_equal(take(strListRDD2, 1), as.list(head(strList, n = 1)))
+ expect_equal(takeRDD(strListRDD, 3), as.list(head(strList, n = 3)))
+ expect_equal(takeRDD(strListRDD2, 1), as.list(head(strList, n = 1)))
- expect_equal(length(take(strListRDD, 0)), 0)
- expect_equal(length(take(strVectorRDD, 0)), 0)
- expect_equal(length(take(numListRDD, 0)), 0)
- expect_equal(length(take(numVectorRDD, 0)), 0)
+ expect_equal(length(takeRDD(strListRDD, 0)), 0)
+ expect_equal(length(takeRDD(strVectorRDD, 0)), 0)
+ expect_equal(length(takeRDD(numListRDD, 0)), 0)
+ expect_equal(length(takeRDD(numVectorRDD, 0)), 0)
})
sparkR.session.stop()
diff --git a/R/pkg/inst/tests/testthat/test_textFile.R b/R/pkg/inst/tests/testthat/test_textFile.R
index b7dcbe472a..3b466066e9 100644
--- a/R/pkg/inst/tests/testthat/test_textFile.R
+++ b/R/pkg/inst/tests/testthat/test_textFile.R
@@ -29,8 +29,8 @@ test_that("textFile() on a local file returns an RDD", {
rdd <- textFile(sc, fileName)
expect_is(rdd, "RDD")
- expect_true(count(rdd) > 0)
- expect_equal(count(rdd), 2)
+ expect_true(countRDD(rdd) > 0)
+ expect_equal(countRDD(rdd), 2)
unlink(fileName)
})
@@ -40,7 +40,7 @@ test_that("textFile() followed by a collect() returns the same content", {
writeLines(mockFile, fileName)
rdd <- textFile(sc, fileName)
- expect_equal(collect(rdd), as.list(mockFile))
+ expect_equal(collectRDD(rdd), as.list(mockFile))
unlink(fileName)
})
@@ -55,7 +55,7 @@ test_that("textFile() word count works as expected", {
wordCount <- lapply(words, function(word) { list(word, 1L) })
counts <- reduceByKey(wordCount, "+", 2L)
- output <- collect(counts)
+ output <- collectRDD(counts)
expected <- list(list("pretty.", 1), list("is", 2), list("awesome.", 1),
list("Spark", 2))
expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
@@ -72,7 +72,7 @@ test_that("several transformations on RDD created by textFile()", {
# PipelinedRDD initially created from RDD
rdd <- lapply(rdd, function(x) paste(x, x))
}
- collect(rdd)
+ collectRDD(rdd)
unlink(fileName)
})
@@ -85,7 +85,7 @@ test_that("textFile() followed by a saveAsTextFile() returns the same content",
rdd <- textFile(sc, fileName1, 1L)
saveAsTextFile(rdd, fileName2)
rdd <- textFile(sc, fileName2)
- expect_equal(collect(rdd), as.list(mockFile))
+ expect_equal(collectRDD(rdd), as.list(mockFile))
unlink(fileName1)
unlink(fileName2)
@@ -97,7 +97,7 @@ test_that("saveAsTextFile() on a parallelized list works as expected", {
rdd <- parallelize(sc, l, 1L)
saveAsTextFile(rdd, fileName)
rdd <- textFile(sc, fileName)
- expect_equal(collect(rdd), lapply(l, function(x) {toString(x)}))
+ expect_equal(collectRDD(rdd), lapply(l, function(x) {toString(x)}))
unlink(fileName)
})
@@ -117,7 +117,7 @@ test_that("textFile() and saveAsTextFile() word count works as expected", {
saveAsTextFile(counts, fileName2)
rdd <- textFile(sc, fileName2)
- output <- collect(rdd)
+ output <- collectRDD(rdd)
expected <- list(list("awesome.", 1), list("Spark", 2),
list("pretty.", 1), list("is", 2))
expectedStr <- lapply(expected, function(x) { toString(x) })
@@ -134,7 +134,7 @@ test_that("textFile() on multiple paths", {
writeLines("Spark is awesome.", fileName2)
rdd <- textFile(sc, c(fileName1, fileName2))
- expect_equal(count(rdd), 2)
+ expect_equal(countRDD(rdd), 2)
unlink(fileName1)
unlink(fileName2)
@@ -147,16 +147,16 @@ test_that("Pipelined operations on RDDs created using textFile", {
rdd <- textFile(sc, fileName)
lengths <- lapply(rdd, function(x) { length(x) })
- expect_equal(collect(lengths), list(1, 1))
+ expect_equal(collectRDD(lengths), list(1, 1))
lengthsPipelined <- lapply(lengths, function(x) { x + 10 })
- expect_equal(collect(lengthsPipelined), list(11, 11))
+ expect_equal(collectRDD(lengthsPipelined), list(11, 11))
lengths30 <- lapply(lengthsPipelined, function(x) { x + 20 })
- expect_equal(collect(lengths30), list(31, 31))
+ expect_equal(collectRDD(lengths30), list(31, 31))
lengths20 <- lapply(lengths, function(x) { x + 20 })
- expect_equal(collect(lengths20), list(21, 21))
+ expect_equal(collectRDD(lengths20), list(21, 21))
unlink(fileName)
})
diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R
index 58ff3debfa..83e94a1432 100644
--- a/R/pkg/inst/tests/testthat/test_utils.R
+++ b/R/pkg/inst/tests/testthat/test_utils.R
@@ -24,7 +24,7 @@ sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext",
test_that("convertJListToRList() gives back (deserializes) the original JLists
of strings and integers", {
# It's hard to manually create a Java List using rJava, since it does not
- # support generics well. Instead, we rely on collect() returning a
+ # support generics well. Instead, we rely on collectRDD() returning a
# JList.
nums <- as.list(1:10)
rdd <- parallelize(sc, nums, 1L)
@@ -48,7 +48,7 @@ test_that("serializeToBytes on RDD", {
text.rdd <- textFile(sc, fileName)
expect_equal(getSerializedMode(text.rdd), "string")
ser.rdd <- serializeToBytes(text.rdd)
- expect_equal(collect(ser.rdd), as.list(mockFile))
+ expect_equal(collectRDD(ser.rdd), as.list(mockFile))
expect_equal(getSerializedMode(ser.rdd), "byte")
unlink(fileName)
@@ -128,7 +128,7 @@ test_that("cleanClosure on R functions", {
env <- environment(newF)
expect_equal(ls(env), "t")
expect_equal(get("t", envir = env, inherits = FALSE), t)
- actual <- collect(lapply(rdd, f))
+ actual <- collectRDD(lapply(rdd, f))
expected <- as.list(c(rep(FALSE, 4), rep(TRUE, 6)))
expect_equal(actual, expected)