aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/inst/tests/testthat
diff options
context:
space:
mode:
Diffstat (limited to 'R/pkg/inst/tests/testthat')
-rw-r--r--R/pkg/inst/tests/testthat/test_binaryFile.R8
-rw-r--r--R/pkg/inst/tests/testthat/test_binary_function.R18
-rw-r--r--R/pkg/inst/tests/testthat/test_broadcast.R4
-rw-r--r--R/pkg/inst/tests/testthat/test_context.R6
-rw-r--r--R/pkg/inst/tests/testthat/test_includePackage.R4
-rw-r--r--R/pkg/inst/tests/testthat/test_parallelize_collect.R26
-rw-r--r--R/pkg/inst/tests/testthat/test_rdd.R172
-rw-r--r--R/pkg/inst/tests/testthat/test_shuffle.R34
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R28
-rw-r--r--R/pkg/inst/tests/testthat/test_take.R32
-rw-r--r--R/pkg/inst/tests/testthat/test_textFile.R26
-rw-r--r--R/pkg/inst/tests/testthat/test_utils.R6
12 files changed, 182 insertions, 182 deletions
diff --git a/R/pkg/inst/tests/testthat/test_binaryFile.R b/R/pkg/inst/tests/testthat/test_binaryFile.R
index 56ac8eb728..b5c279e315 100644
--- a/R/pkg/inst/tests/testthat/test_binaryFile.R
+++ b/R/pkg/inst/tests/testthat/test_binaryFile.R
@@ -31,7 +31,7 @@ test_that("saveAsObjectFile()/objectFile() following textFile() works", {
rdd <- textFile(sc, fileName1, 1)
saveAsObjectFile(rdd, fileName2)
rdd <- objectFile(sc, fileName2)
- expect_equal(collect(rdd), as.list(mockFile))
+ expect_equal(collectRDD(rdd), as.list(mockFile))
unlink(fileName1)
unlink(fileName2, recursive = TRUE)
@@ -44,7 +44,7 @@ test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
rdd <- parallelize(sc, l, 1)
saveAsObjectFile(rdd, fileName)
rdd <- objectFile(sc, fileName)
- expect_equal(collect(rdd), l)
+ expect_equal(collectRDD(rdd), l)
unlink(fileName, recursive = TRUE)
})
@@ -64,7 +64,7 @@ test_that("saveAsObjectFile()/objectFile() following RDD transformations works",
saveAsObjectFile(counts, fileName2)
counts <- objectFile(sc, fileName2)
- output <- collect(counts)
+ output <- collectRDD(counts)
expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1),
list("is", 2))
expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
@@ -83,7 +83,7 @@ test_that("saveAsObjectFile()/objectFile() works with multiple paths", {
saveAsObjectFile(rdd2, fileName2)
rdd <- objectFile(sc, c(fileName1, fileName2))
- expect_equal(count(rdd), 2)
+ expect_equal(countRDD(rdd), 2)
unlink(fileName1, recursive = TRUE)
unlink(fileName2, recursive = TRUE)
diff --git a/R/pkg/inst/tests/testthat/test_binary_function.R b/R/pkg/inst/tests/testthat/test_binary_function.R
index ae7abe20cc..59cb2e6204 100644
--- a/R/pkg/inst/tests/testthat/test_binary_function.R
+++ b/R/pkg/inst/tests/testthat/test_binary_function.R
@@ -29,7 +29,7 @@ rdd <- parallelize(sc, nums, 2L)
mockFile <- c("Spark is pretty.", "Spark is awesome.")
test_that("union on two RDDs", {
- actual <- collect(unionRDD(rdd, rdd))
+ actual <- collectRDD(unionRDD(rdd, rdd))
expect_equal(actual, as.list(rep(nums, 2)))
fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
@@ -37,13 +37,13 @@ test_that("union on two RDDs", {
text.rdd <- textFile(sc, fileName)
union.rdd <- unionRDD(rdd, text.rdd)
- actual <- collect(union.rdd)
+ actual <- collectRDD(union.rdd)
expect_equal(actual, c(as.list(nums), mockFile))
expect_equal(getSerializedMode(union.rdd), "byte")
rdd <- map(text.rdd, function(x) {x})
union.rdd <- unionRDD(rdd, text.rdd)
- actual <- collect(union.rdd)
+ actual <- collectRDD(union.rdd)
expect_equal(actual, as.list(c(mockFile, mockFile)))
expect_equal(getSerializedMode(union.rdd), "byte")
@@ -54,14 +54,14 @@ test_that("cogroup on two RDDs", {
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
- actual <- collect(cogroup.rdd)
+ actual <- collectRDD(cogroup.rdd)
expect_equal(actual,
list(list(1, list(list(1), list(2, 3))), list(2, list(list(4), list()))))
rdd1 <- parallelize(sc, list(list("a", 1), list("a", 4)))
rdd2 <- parallelize(sc, list(list("b", 2), list("a", 3)))
cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
- actual <- collect(cogroup.rdd)
+ actual <- collectRDD(cogroup.rdd)
expected <- list(list("b", list(list(), list(2))), list("a", list(list(1, 4), list(3))))
expect_equal(sortKeyValueList(actual),
@@ -72,7 +72,7 @@ test_that("zipPartitions() on RDDs", {
rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
- actual <- collect(zipPartitions(rdd1, rdd2, rdd3,
+ actual <- collectRDD(zipPartitions(rdd1, rdd2, rdd3,
func = function(x, y, z) { list(list(x, y, z))} ))
expect_equal(actual,
list(list(1, c(1, 2), c(1, 2, 3)), list(2, c(3, 4), c(4, 5, 6))))
@@ -82,19 +82,19 @@ test_that("zipPartitions() on RDDs", {
writeLines(mockFile, fileName)
rdd <- textFile(sc, fileName, 1)
- actual <- collect(zipPartitions(rdd, rdd,
+ actual <- collectRDD(zipPartitions(rdd, rdd,
func = function(x, y) { list(paste(x, y, sep = "\n")) }))
expected <- list(paste(mockFile, mockFile, sep = "\n"))
expect_equal(actual, expected)
rdd1 <- parallelize(sc, 0:1, 1)
- actual <- collect(zipPartitions(rdd1, rdd,
+ actual <- collectRDD(zipPartitions(rdd1, rdd,
func = function(x, y) { list(x + nchar(y)) }))
expected <- list(0:1 + nchar(mockFile))
expect_equal(actual, expected)
rdd <- map(rdd, function(x) { x })
- actual <- collect(zipPartitions(rdd, rdd1,
+ actual <- collectRDD(zipPartitions(rdd, rdd1,
func = function(x, y) { list(y + nchar(x)) }))
expect_equal(actual, expected)
diff --git a/R/pkg/inst/tests/testthat/test_broadcast.R b/R/pkg/inst/tests/testthat/test_broadcast.R
index c7fefb5cf9..65f204d096 100644
--- a/R/pkg/inst/tests/testthat/test_broadcast.R
+++ b/R/pkg/inst/tests/testthat/test_broadcast.R
@@ -32,7 +32,7 @@ test_that("using broadcast variable", {
useBroadcast <- function(x) {
sum(SparkR:::value(randomMatBr) * x)
}
- actual <- collect(lapply(rrdd, useBroadcast))
+ actual <- collectRDD(lapply(rrdd, useBroadcast))
expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
expect_equal(actual, expected)
})
@@ -43,7 +43,7 @@ test_that("without using broadcast variable", {
useBroadcast <- function(x) {
sum(randomMat * x)
}
- actual <- collect(lapply(rrdd, useBroadcast))
+ actual <- collectRDD(lapply(rrdd, useBroadcast))
expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
expect_equal(actual, expected)
})
diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R
index 8bd134a58d..1ab7f319df 100644
--- a/R/pkg/inst/tests/testthat/test_context.R
+++ b/R/pkg/inst/tests/testthat/test_context.R
@@ -58,7 +58,7 @@ test_that("repeatedly starting and stopping SparkR", {
for (i in 1:4) {
sc <- suppressWarnings(sparkR.init())
rdd <- parallelize(sc, 1:20, 2L)
- expect_equal(count(rdd), 20)
+ expect_equal(countRDD(rdd), 20)
suppressWarnings(sparkR.stop())
}
})
@@ -90,8 +90,8 @@ test_that("rdd GC across sparkR.stop", {
rm(rdd2)
gc()
- count(rdd3)
- count(rdd4)
+ countRDD(rdd3)
+ countRDD(rdd4)
sparkR.session.stop()
})
diff --git a/R/pkg/inst/tests/testthat/test_includePackage.R b/R/pkg/inst/tests/testthat/test_includePackage.R
index ca2b900572..563ea298c2 100644
--- a/R/pkg/inst/tests/testthat/test_includePackage.R
+++ b/R/pkg/inst/tests/testthat/test_includePackage.R
@@ -37,7 +37,7 @@ test_that("include inside function", {
}
data <- lapplyPartition(rdd, generateData)
- actual <- collect(data)
+ actual <- collectRDD(data)
}
})
@@ -53,7 +53,7 @@ test_that("use include package", {
includePackage(sc, plyr)
data <- lapplyPartition(rdd, generateData)
- actual <- collect(data)
+ actual <- collectRDD(data)
}
})
diff --git a/R/pkg/inst/tests/testthat/test_parallelize_collect.R b/R/pkg/inst/tests/testthat/test_parallelize_collect.R
index 959d7ab9e6..55972e1ba4 100644
--- a/R/pkg/inst/tests/testthat/test_parallelize_collect.R
+++ b/R/pkg/inst/tests/testthat/test_parallelize_collect.R
@@ -67,22 +67,22 @@ test_that("parallelize() on simple vectors and lists returns an RDD", {
test_that("collect(), following a parallelize(), gives back the original collections", {
numVectorRDD <- parallelize(jsc, numVector, 10)
- expect_equal(collect(numVectorRDD), as.list(numVector))
+ expect_equal(collectRDD(numVectorRDD), as.list(numVector))
numListRDD <- parallelize(jsc, numList, 1)
numListRDD2 <- parallelize(jsc, numList, 4)
- expect_equal(collect(numListRDD), as.list(numList))
- expect_equal(collect(numListRDD2), as.list(numList))
+ expect_equal(collectRDD(numListRDD), as.list(numList))
+ expect_equal(collectRDD(numListRDD2), as.list(numList))
strVectorRDD <- parallelize(jsc, strVector, 2)
strVectorRDD2 <- parallelize(jsc, strVector, 3)
- expect_equal(collect(strVectorRDD), as.list(strVector))
- expect_equal(collect(strVectorRDD2), as.list(strVector))
+ expect_equal(collectRDD(strVectorRDD), as.list(strVector))
+ expect_equal(collectRDD(strVectorRDD2), as.list(strVector))
strListRDD <- parallelize(jsc, strList, 4)
strListRDD2 <- parallelize(jsc, strList, 1)
- expect_equal(collect(strListRDD), as.list(strList))
- expect_equal(collect(strListRDD2), as.list(strList))
+ expect_equal(collectRDD(strListRDD), as.list(strList))
+ expect_equal(collectRDD(strListRDD2), as.list(strList))
})
test_that("regression: collect() following a parallelize() does not drop elements", {
@@ -90,7 +90,7 @@ test_that("regression: collect() following a parallelize() does not drop element
collLen <- 10
numPart <- 6
expected <- runif(collLen)
- actual <- collect(parallelize(jsc, expected, numPart))
+ actual <- collectRDD(parallelize(jsc, expected, numPart))
expect_equal(actual, as.list(expected))
})
@@ -99,14 +99,14 @@ test_that("parallelize() and collect() work for lists of pairs (pairwise data)",
numPairsRDDD1 <- parallelize(jsc, numPairs, 1)
numPairsRDDD2 <- parallelize(jsc, numPairs, 2)
numPairsRDDD3 <- parallelize(jsc, numPairs, 3)
- expect_equal(collect(numPairsRDDD1), numPairs)
- expect_equal(collect(numPairsRDDD2), numPairs)
- expect_equal(collect(numPairsRDDD3), numPairs)
+ expect_equal(collectRDD(numPairsRDDD1), numPairs)
+ expect_equal(collectRDD(numPairsRDDD2), numPairs)
+ expect_equal(collectRDD(numPairsRDDD3), numPairs)
# can also leave out the parameter name, if the params are supplied in order
strPairsRDDD1 <- parallelize(jsc, strPairs, 1)
strPairsRDDD2 <- parallelize(jsc, strPairs, 2)
- expect_equal(collect(strPairsRDDD1), strPairs)
- expect_equal(collect(strPairsRDDD2), strPairs)
+ expect_equal(collectRDD(strPairsRDDD1), strPairs)
+ expect_equal(collectRDD(strPairsRDDD2), strPairs)
})
sparkR.session.stop()
diff --git a/R/pkg/inst/tests/testthat/test_rdd.R b/R/pkg/inst/tests/testthat/test_rdd.R
index 508a3a7dfd..a3d66c245a 100644
--- a/R/pkg/inst/tests/testthat/test_rdd.R
+++ b/R/pkg/inst/tests/testthat/test_rdd.R
@@ -34,14 +34,14 @@ test_that("get number of partitions in RDD", {
})
test_that("first on RDD", {
- expect_equal(first(rdd), 1)
+ expect_equal(firstRDD(rdd), 1)
newrdd <- lapply(rdd, function(x) x + 1)
- expect_equal(first(newrdd), 2)
+ expect_equal(firstRDD(newrdd), 2)
})
test_that("count and length on RDD", {
- expect_equal(count(rdd), 10)
- expect_equal(length(rdd), 10)
+ expect_equal(countRDD(rdd), 10)
+ expect_equal(lengthRDD(rdd), 10)
})
test_that("count by values and keys", {
@@ -57,40 +57,40 @@ test_that("count by values and keys", {
test_that("lapply on RDD", {
multiples <- lapply(rdd, function(x) { 2 * x })
- actual <- collect(multiples)
+ actual <- collectRDD(multiples)
expect_equal(actual, as.list(nums * 2))
})
test_that("lapplyPartition on RDD", {
sums <- lapplyPartition(rdd, function(part) { sum(unlist(part)) })
- actual <- collect(sums)
+ actual <- collectRDD(sums)
expect_equal(actual, list(15, 40))
})
test_that("mapPartitions on RDD", {
sums <- mapPartitions(rdd, function(part) { sum(unlist(part)) })
- actual <- collect(sums)
+ actual <- collectRDD(sums)
expect_equal(actual, list(15, 40))
})
test_that("flatMap() on RDDs", {
flat <- flatMap(intRdd, function(x) { list(x, x) })
- actual <- collect(flat)
+ actual <- collectRDD(flat)
expect_equal(actual, rep(intPairs, each = 2))
})
test_that("filterRDD on RDD", {
filtered.rdd <- filterRDD(rdd, function(x) { x %% 2 == 0 })
- actual <- collect(filtered.rdd)
+ actual <- collectRDD(filtered.rdd)
expect_equal(actual, list(2, 4, 6, 8, 10))
filtered.rdd <- Filter(function(x) { x[[2]] < 0 }, intRdd)
- actual <- collect(filtered.rdd)
+ actual <- collectRDD(filtered.rdd)
expect_equal(actual, list(list(1L, -1)))
# Filter out all elements.
filtered.rdd <- filterRDD(rdd, function(x) { x > 10 })
- actual <- collect(filtered.rdd)
+ actual <- collectRDD(filtered.rdd)
expect_equal(actual, list())
})
@@ -110,7 +110,7 @@ test_that("several transformations on RDD (a benchmark on PipelinedRDD)", {
part <- as.list(unlist(part) * partIndex + i)
})
rdd2 <- lapply(rdd2, function(x) x + x)
- actual <- collect(rdd2)
+ actual <- collectRDD(rdd2)
expected <- list(24, 24, 24, 24, 24,
168, 170, 172, 174, 176)
expect_equal(actual, expected)
@@ -126,20 +126,20 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
part <- as.list(unlist(part) * partIndex)
})
- cache(rdd2)
+ cacheRDD(rdd2)
expect_true(rdd2@env$isCached)
rdd2 <- lapply(rdd2, function(x) x)
expect_false(rdd2@env$isCached)
- unpersist(rdd2)
+ unpersistRDD(rdd2)
expect_false(rdd2@env$isCached)
- persist(rdd2, "MEMORY_AND_DISK")
+ persistRDD(rdd2, "MEMORY_AND_DISK")
expect_true(rdd2@env$isCached)
rdd2 <- lapply(rdd2, function(x) x)
expect_false(rdd2@env$isCached)
- unpersist(rdd2)
+ unpersistRDD(rdd2)
expect_false(rdd2@env$isCached)
tempDir <- tempfile(pattern = "checkpoint")
@@ -152,7 +152,7 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
expect_false(rdd2@env$isCheckpointed)
# make sure the data is collectable
- collect(rdd2)
+ collectRDD(rdd2)
unlink(tempDir)
})
@@ -169,21 +169,21 @@ test_that("reduce on RDD", {
test_that("lapply with dependency", {
fa <- 5
multiples <- lapply(rdd, function(x) { fa * x })
- actual <- collect(multiples)
+ actual <- collectRDD(multiples)
expect_equal(actual, as.list(nums * 5))
})
test_that("lapplyPartitionsWithIndex on RDDs", {
func <- function(partIndex, part) { list(partIndex, Reduce("+", part)) }
- actual <- collect(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
+ actual <- collectRDD(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
expect_equal(actual, list(list(0, 15), list(1, 40)))
pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L)
partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 }
mkTup <- function(partIndex, part) { list(partIndex, part) }
- actual <- collect(lapplyPartitionsWithIndex(
- partitionBy(pairsRDD, 2L, partitionByParity),
+ actual <- collectRDD(lapplyPartitionsWithIndex(
+ partitionByRDD(pairsRDD, 2L, partitionByParity),
mkTup),
FALSE)
expect_equal(actual, list(list(0, list(list(1, 2), list(3, 4))),
@@ -191,7 +191,7 @@ test_that("lapplyPartitionsWithIndex on RDDs", {
})
test_that("sampleRDD() on RDDs", {
- expect_equal(unlist(collect(sampleRDD(rdd, FALSE, 1.0, 2014L))), nums)
+ expect_equal(unlist(collectRDD(sampleRDD(rdd, FALSE, 1.0, 2014L))), nums)
})
test_that("takeSample() on RDDs", {
@@ -238,7 +238,7 @@ test_that("takeSample() on RDDs", {
test_that("mapValues() on pairwise RDDs", {
multiples <- mapValues(intRdd, function(x) { x * 2 })
- actual <- collect(multiples)
+ actual <- collectRDD(multiples)
expected <- lapply(intPairs, function(x) {
list(x[[1]], x[[2]] * 2)
})
@@ -247,11 +247,11 @@ test_that("mapValues() on pairwise RDDs", {
test_that("flatMapValues() on pairwise RDDs", {
l <- parallelize(sc, list(list(1, c(1, 2)), list(2, c(3, 4))))
- actual <- collect(flatMapValues(l, function(x) { x }))
+ actual <- collectRDD(flatMapValues(l, function(x) { x }))
expect_equal(actual, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
# Generate x to x+1 for every value
- actual <- collect(flatMapValues(intRdd, function(x) { x: (x + 1) }))
+ actual <- collectRDD(flatMapValues(intRdd, function(x) { x: (x + 1) }))
expect_equal(actual,
list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101),
list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201)))
@@ -273,8 +273,8 @@ test_that("reduceByKeyLocally() on PairwiseRDDs", {
test_that("distinct() on RDDs", {
nums.rep2 <- rep(1:10, 2)
rdd.rep2 <- parallelize(sc, nums.rep2, 2L)
- uniques <- distinct(rdd.rep2)
- actual <- sort(unlist(collect(uniques)))
+ uniques <- distinctRDD(rdd.rep2)
+ actual <- sort(unlist(collectRDD(uniques)))
expect_equal(actual, nums)
})
@@ -296,7 +296,7 @@ test_that("sumRDD() on RDDs", {
test_that("keyBy on RDDs", {
func <- function(x) { x * x }
keys <- keyBy(rdd, func)
- actual <- collect(keys)
+ actual <- collectRDD(keys)
expect_equal(actual, lapply(nums, function(x) { list(func(x), x) }))
})
@@ -304,12 +304,12 @@ test_that("repartition/coalesce on RDDs", {
rdd <- parallelize(sc, 1:20, 4L) # each partition contains 5 elements
# repartition
- r1 <- repartition(rdd, 2)
+ r1 <- repartitionRDD(rdd, 2)
expect_equal(getNumPartitions(r1), 2L)
count <- length(collectPartition(r1, 0L))
expect_true(count >= 8 && count <= 12)
- r2 <- repartition(rdd, 6)
+ r2 <- repartitionRDD(rdd, 6)
expect_equal(getNumPartitions(r2), 6L)
count <- length(collectPartition(r2, 0L))
expect_true(count >= 0 && count <= 4)
@@ -323,12 +323,12 @@ test_that("repartition/coalesce on RDDs", {
test_that("sortBy() on RDDs", {
sortedRdd <- sortBy(rdd, function(x) { x * x }, ascending = FALSE)
- actual <- collect(sortedRdd)
+ actual <- collectRDD(sortedRdd)
expect_equal(actual, as.list(sort(nums, decreasing = TRUE)))
rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
sortedRdd2 <- sortBy(rdd2, function(x) { x * x })
- actual <- collect(sortedRdd2)
+ actual <- collectRDD(sortedRdd2)
expect_equal(actual, as.list(nums))
})
@@ -380,13 +380,13 @@ test_that("aggregateRDD() on RDDs", {
test_that("zipWithUniqueId() on RDDs", {
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
- actual <- collect(zipWithUniqueId(rdd))
+ actual <- collectRDD(zipWithUniqueId(rdd))
expected <- list(list("a", 0), list("b", 3), list("c", 1),
list("d", 4), list("e", 2))
expect_equal(actual, expected)
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
- actual <- collect(zipWithUniqueId(rdd))
+ actual <- collectRDD(zipWithUniqueId(rdd))
expected <- list(list("a", 0), list("b", 1), list("c", 2),
list("d", 3), list("e", 4))
expect_equal(actual, expected)
@@ -394,13 +394,13 @@ test_that("zipWithUniqueId() on RDDs", {
test_that("zipWithIndex() on RDDs", {
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
- actual <- collect(zipWithIndex(rdd))
+ actual <- collectRDD(zipWithIndex(rdd))
expected <- list(list("a", 0), list("b", 1), list("c", 2),
list("d", 3), list("e", 4))
expect_equal(actual, expected)
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
- actual <- collect(zipWithIndex(rdd))
+ actual <- collectRDD(zipWithIndex(rdd))
expected <- list(list("a", 0), list("b", 1), list("c", 2),
list("d", 3), list("e", 4))
expect_equal(actual, expected)
@@ -408,35 +408,35 @@ test_that("zipWithIndex() on RDDs", {
test_that("glom() on RDD", {
rdd <- parallelize(sc, as.list(1:4), 2L)
- actual <- collect(glom(rdd))
+ actual <- collectRDD(glom(rdd))
expect_equal(actual, list(list(1, 2), list(3, 4)))
})
test_that("keys() on RDDs", {
keys <- keys(intRdd)
- actual <- collect(keys)
+ actual <- collectRDD(keys)
expect_equal(actual, lapply(intPairs, function(x) { x[[1]] }))
})
test_that("values() on RDDs", {
values <- values(intRdd)
- actual <- collect(values)
+ actual <- collectRDD(values)
expect_equal(actual, lapply(intPairs, function(x) { x[[2]] }))
})
test_that("pipeRDD() on RDDs", {
- actual <- collect(pipeRDD(rdd, "more"))
+ actual <- collectRDD(pipeRDD(rdd, "more"))
expected <- as.list(as.character(1:10))
expect_equal(actual, expected)
trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n"))
- actual <- collect(pipeRDD(trailed.rdd, "sort"))
+ actual <- collectRDD(pipeRDD(trailed.rdd, "sort"))
expected <- list("", "1", "2", "3")
expect_equal(actual, expected)
rev.nums <- 9:0
rev.rdd <- parallelize(sc, rev.nums, 2L)
- actual <- collect(pipeRDD(rev.rdd, "sort"))
+ actual <- collectRDD(pipeRDD(rev.rdd, "sort"))
expected <- as.list(as.character(c(5:9, 0:4)))
expect_equal(actual, expected)
})
@@ -444,7 +444,7 @@ test_that("pipeRDD() on RDDs", {
test_that("zipRDD() on RDDs", {
rdd1 <- parallelize(sc, 0:4, 2)
rdd2 <- parallelize(sc, 1000:1004, 2)
- actual <- collect(zipRDD(rdd1, rdd2))
+ actual <- collectRDD(zipRDD(rdd1, rdd2))
expect_equal(actual,
list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004)))
@@ -453,17 +453,17 @@ test_that("zipRDD() on RDDs", {
writeLines(mockFile, fileName)
rdd <- textFile(sc, fileName, 1)
- actual <- collect(zipRDD(rdd, rdd))
+ actual <- collectRDD(zipRDD(rdd, rdd))
expected <- lapply(mockFile, function(x) { list(x, x) })
expect_equal(actual, expected)
rdd1 <- parallelize(sc, 0:1, 1)
- actual <- collect(zipRDD(rdd1, rdd))
+ actual <- collectRDD(zipRDD(rdd1, rdd))
expected <- lapply(0:1, function(x) { list(x, mockFile[x + 1]) })
expect_equal(actual, expected)
rdd1 <- map(rdd, function(x) { x })
- actual <- collect(zipRDD(rdd, rdd1))
+ actual <- collectRDD(zipRDD(rdd, rdd1))
expected <- lapply(mockFile, function(x) { list(x, x) })
expect_equal(actual, expected)
@@ -472,7 +472,7 @@ test_that("zipRDD() on RDDs", {
test_that("cartesian() on RDDs", {
rdd <- parallelize(sc, 1:3)
- actual <- collect(cartesian(rdd, rdd))
+ actual <- collectRDD(cartesian(rdd, rdd))
expect_equal(sortKeyValueList(actual),
list(
list(1, 1), list(1, 2), list(1, 3),
@@ -481,7 +481,7 @@ test_that("cartesian() on RDDs", {
# test case where one RDD is empty
emptyRdd <- parallelize(sc, list())
- actual <- collect(cartesian(rdd, emptyRdd))
+ actual <- collectRDD(cartesian(rdd, emptyRdd))
expect_equal(actual, list())
mockFile <- c("Spark is pretty.", "Spark is awesome.")
@@ -489,7 +489,7 @@ test_that("cartesian() on RDDs", {
writeLines(mockFile, fileName)
rdd <- textFile(sc, fileName)
- actual <- collect(cartesian(rdd, rdd))
+ actual <- collectRDD(cartesian(rdd, rdd))
expected <- list(
list("Spark is awesome.", "Spark is pretty."),
list("Spark is awesome.", "Spark is awesome."),
@@ -498,7 +498,7 @@ test_that("cartesian() on RDDs", {
expect_equal(sortKeyValueList(actual), expected)
rdd1 <- parallelize(sc, 0:1)
- actual <- collect(cartesian(rdd1, rdd))
+ actual <- collectRDD(cartesian(rdd1, rdd))
expect_equal(sortKeyValueList(actual),
list(
list(0, "Spark is pretty."),
@@ -507,7 +507,7 @@ test_that("cartesian() on RDDs", {
list(1, "Spark is awesome.")))
rdd1 <- map(rdd, function(x) { x })
- actual <- collect(cartesian(rdd, rdd1))
+ actual <- collectRDD(cartesian(rdd, rdd1))
expect_equal(sortKeyValueList(actual), expected)
unlink(fileName)
@@ -518,24 +518,24 @@ test_that("subtract() on RDDs", {
rdd1 <- parallelize(sc, l)
# subtract by itself
- actual <- collect(subtract(rdd1, rdd1))
+ actual <- collectRDD(subtract(rdd1, rdd1))
expect_equal(actual, list())
# subtract by an empty RDD
rdd2 <- parallelize(sc, list())
- actual <- collect(subtract(rdd1, rdd2))
+ actual <- collectRDD(subtract(rdd1, rdd2))
expect_equal(as.list(sort(as.vector(actual, mode = "integer"))),
l)
rdd2 <- parallelize(sc, list(2, 4))
- actual <- collect(subtract(rdd1, rdd2))
+ actual <- collectRDD(subtract(rdd1, rdd2))
expect_equal(as.list(sort(as.vector(actual, mode = "integer"))),
list(1, 1, 3))
l <- list("a", "a", "b", "b", "c", "d")
rdd1 <- parallelize(sc, l)
rdd2 <- parallelize(sc, list("b", "d"))
- actual <- collect(subtract(rdd1, rdd2))
+ actual <- collectRDD(subtract(rdd1, rdd2))
expect_equal(as.list(sort(as.vector(actual, mode = "character"))),
list("a", "a", "c"))
})
@@ -546,17 +546,17 @@ test_that("subtractByKey() on pairwise RDDs", {
rdd1 <- parallelize(sc, l)
# subtractByKey by itself
- actual <- collect(subtractByKey(rdd1, rdd1))
+ actual <- collectRDD(subtractByKey(rdd1, rdd1))
expect_equal(actual, list())
# subtractByKey by an empty RDD
rdd2 <- parallelize(sc, list())
- actual <- collect(subtractByKey(rdd1, rdd2))
+ actual <- collectRDD(subtractByKey(rdd1, rdd2))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(l))
rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
- actual <- collect(subtractByKey(rdd1, rdd2))
+ actual <- collectRDD(subtractByKey(rdd1, rdd2))
expect_equal(actual,
list(list("b", 4), list("b", 5)))
@@ -564,76 +564,76 @@ test_that("subtractByKey() on pairwise RDDs", {
list(2, 5), list(1, 2))
rdd1 <- parallelize(sc, l)
rdd2 <- parallelize(sc, list(list(1, 3), list(3, 1)))
- actual <- collect(subtractByKey(rdd1, rdd2))
+ actual <- collectRDD(subtractByKey(rdd1, rdd2))
expect_equal(actual,
list(list(2, 4), list(2, 5)))
})
test_that("intersection() on RDDs", {
# intersection with self
- actual <- collect(intersection(rdd, rdd))
+ actual <- collectRDD(intersection(rdd, rdd))
expect_equal(sort(as.integer(actual)), nums)
# intersection with an empty RDD
emptyRdd <- parallelize(sc, list())
- actual <- collect(intersection(rdd, emptyRdd))
+ actual <- collectRDD(intersection(rdd, emptyRdd))
expect_equal(actual, list())
rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
- actual <- collect(intersection(rdd1, rdd2))
+ actual <- collectRDD(intersection(rdd1, rdd2))
expect_equal(sort(as.integer(actual)), 1:3)
})
test_that("join() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
- actual <- collect(join(rdd1, rdd2, 2L))
+ actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list(1, list(1, 2)), list(1, list(1, 3)))))
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4)))
rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3)))
- actual <- collect(join(rdd1, rdd2, 2L))
+ actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list("a", list(1, 2)), list("a", list(1, 3)))))
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
- actual <- collect(join(rdd1, rdd2, 2L))
+ actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
expect_equal(actual, list())
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
- actual <- collect(join(rdd1, rdd2, 2L))
+ actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
expect_equal(actual, list())
})
test_that("leftOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
- actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
expected <- list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4)))
rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3)))
- actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
expected <- list(list("b", list(4, NULL)), list("a", list(1, 2)), list("a", list(1, 3)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
- actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
expected <- list(list(1, list(1, NULL)), list(2, list(2, NULL)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
- actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
expected <- list(list("b", list(2, NULL)), list("a", list(1, NULL)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
@@ -642,26 +642,26 @@ test_that("leftOuterJoin() on pairwise RDDs", {
test_that("rightOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
- actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3)))
rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4)))
- actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
- actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
- actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
})
@@ -669,14 +669,14 @@ test_that("rightOuterJoin() on pairwise RDDs", {
test_that("fullOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
- actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
expected <- list(list(1, list(2, 1)), list(1, list(3, 1)),
list(2, list(NULL, 4)), list(3, list(3, NULL)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3), list("c", 1)))
rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4)))
- actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)),
list("a", list(3, 1)), list("c", list(1, NULL)))
expect_equal(sortKeyValueList(actual),
@@ -684,14 +684,14 @@ test_that("fullOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
- actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)),
list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
- actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)),
list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
@@ -700,21 +700,21 @@ test_that("fullOuterJoin() on pairwise RDDs", {
test_that("sortByKey() on pairwise RDDs", {
numPairsRdd <- map(rdd, function(x) { list (x, x) })
sortedRdd <- sortByKey(numPairsRdd, ascending = FALSE)
- actual <- collect(sortedRdd)
+ actual <- collectRDD(sortedRdd)
numPairs <- lapply(nums, function(x) { list (x, x) })
expect_equal(actual, sortKeyValueList(numPairs, decreasing = TRUE))
rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
numPairsRdd2 <- map(rdd2, function(x) { list (x, x) })
sortedRdd2 <- sortByKey(numPairsRdd2)
- actual <- collect(sortedRdd2)
+ actual <- collectRDD(sortedRdd2)
expect_equal(actual, numPairs)
# sort by string keys
l <- list(list("a", 1), list("b", 2), list("1", 3), list("d", 4), list("2", 5))
rdd3 <- parallelize(sc, l, 2L)
sortedRdd3 <- sortByKey(rdd3)
- actual <- collect(sortedRdd3)
+ actual <- collectRDD(sortedRdd3)
expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
# test on the boundary cases
@@ -722,27 +722,27 @@ test_that("sortByKey() on pairwise RDDs", {
# boundary case 1: the RDD to be sorted has only 1 partition
rdd4 <- parallelize(sc, l, 1L)
sortedRdd4 <- sortByKey(rdd4)
- actual <- collect(sortedRdd4)
+ actual <- collectRDD(sortedRdd4)
expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
# boundary case 2: the sorted RDD has only 1 partition
rdd5 <- parallelize(sc, l, 2L)
sortedRdd5 <- sortByKey(rdd5, numPartitions = 1L)
- actual <- collect(sortedRdd5)
+ actual <- collectRDD(sortedRdd5)
expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
# boundary case 3: the RDD to be sorted has only 1 element
l2 <- list(list("a", 1))
rdd6 <- parallelize(sc, l2, 2L)
sortedRdd6 <- sortByKey(rdd6)
- actual <- collect(sortedRdd6)
+ actual <- collectRDD(sortedRdd6)
expect_equal(actual, l2)
# boundary case 4: the RDD to be sorted has 0 element
l3 <- list()
rdd7 <- parallelize(sc, l3, 2L)
sortedRdd7 <- sortByKey(rdd7)
- actual <- collect(sortedRdd7)
+ actual <- collectRDD(sortedRdd7)
expect_equal(actual, l3)
})
@@ -766,7 +766,7 @@ test_that("collectAsMap() on a pairwise RDD", {
test_that("show()", {
rdd <- parallelize(sc, list(1:10))
- expect_output(show(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+")
+ expect_output(showRDD(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+")
})
test_that("sampleByKey() on pairwise RDDs", {
diff --git a/R/pkg/inst/tests/testthat/test_shuffle.R b/R/pkg/inst/tests/testthat/test_shuffle.R
index 2586056773..d38efab0fd 100644
--- a/R/pkg/inst/tests/testthat/test_shuffle.R
+++ b/R/pkg/inst/tests/testthat/test_shuffle.R
@@ -39,7 +39,7 @@ strListRDD <- parallelize(sc, strList, 4)
test_that("groupByKey for integers", {
grouped <- groupByKey(intRdd, 2L)
- actual <- collect(grouped)
+ actual <- collectRDD(grouped)
expected <- list(list(2L, list(100, 1)), list(1L, list(-1, 200)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -48,7 +48,7 @@ test_that("groupByKey for integers", {
test_that("groupByKey for doubles", {
grouped <- groupByKey(doubleRdd, 2L)
- actual <- collect(grouped)
+ actual <- collectRDD(grouped)
expected <- list(list(1.5, list(-1, 200)), list(2.5, list(100, 1)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -57,7 +57,7 @@ test_that("groupByKey for doubles", {
test_that("reduceByKey for ints", {
reduced <- reduceByKey(intRdd, "+", 2L)
- actual <- collect(reduced)
+ actual <- collectRDD(reduced)
expected <- list(list(2L, 101), list(1L, 199))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -65,7 +65,7 @@ test_that("reduceByKey for ints", {
test_that("reduceByKey for doubles", {
reduced <- reduceByKey(doubleRdd, "+", 2L)
- actual <- collect(reduced)
+ actual <- collectRDD(reduced)
expected <- list(list(1.5, 199), list(2.5, 101))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -74,7 +74,7 @@ test_that("reduceByKey for doubles", {
test_that("combineByKey for ints", {
reduced <- combineByKey(intRdd, function(x) { x }, "+", "+", 2L)
- actual <- collect(reduced)
+ actual <- collectRDD(reduced)
expected <- list(list(2L, 101), list(1L, 199))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -82,7 +82,7 @@ test_that("combineByKey for ints", {
test_that("combineByKey for doubles", {
reduced <- combineByKey(doubleRdd, function(x) { x }, "+", "+", 2L)
- actual <- collect(reduced)
+ actual <- collectRDD(reduced)
expected <- list(list(1.5, 199), list(2.5, 101))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -94,7 +94,7 @@ test_that("combineByKey for characters", {
list("other", 3L), list("max", 4L)), 2L)
reduced <- combineByKey(stringKeyRDD,
function(x) { x }, "+", "+", 2L)
- actual <- collect(reduced)
+ actual <- collectRDD(reduced)
expected <- list(list("max", 5L), list("min", 2L), list("other", 3L))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -109,7 +109,7 @@ test_that("aggregateByKey", {
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
- actual <- collect(aggregatedRDD)
+ actual <- collectRDD(aggregatedRDD)
expected <- list(list(1, list(3, 2)), list(2, list(7, 2)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -122,7 +122,7 @@ test_that("aggregateByKey", {
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
- actual <- collect(aggregatedRDD)
+ actual <- collectRDD(aggregatedRDD)
expected <- list(list("a", list(3, 2)), list("b", list(7, 2)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -132,7 +132,7 @@ test_that("foldByKey", {
# test foldByKey for int keys
folded <- foldByKey(intRdd, 0, "+", 2L)
- actual <- collect(folded)
+ actual <- collectRDD(folded)
expected <- list(list(2L, 101), list(1L, 199))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -140,7 +140,7 @@ test_that("foldByKey", {
# test foldByKey for double keys
folded <- foldByKey(doubleRdd, 0, "+", 2L)
- actual <- collect(folded)
+ actual <- collectRDD(folded)
expected <- list(list(1.5, 199), list(2.5, 101))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -151,7 +151,7 @@ test_that("foldByKey", {
stringKeyRDD <- parallelize(sc, stringKeyPairs)
folded <- foldByKey(stringKeyRDD, 0, "+", 2L)
- actual <- collect(folded)
+ actual <- collectRDD(folded)
expected <- list(list("b", 101), list("a", 199))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -159,14 +159,14 @@ test_that("foldByKey", {
# test foldByKey for empty pair RDD
rdd <- parallelize(sc, list())
folded <- foldByKey(rdd, 0, "+", 2L)
- actual <- collect(folded)
+ actual <- collectRDD(folded)
expected <- list()
expect_equal(actual, expected)
# test foldByKey for RDD with only 1 pair
rdd <- parallelize(sc, list(list(1, 1)))
folded <- foldByKey(rdd, 0, "+", 2L)
- actual <- collect(folded)
+ actual <- collectRDD(folded)
expected <- list(list(1, 1))
expect_equal(actual, expected)
})
@@ -175,7 +175,7 @@ test_that("partitionBy() partitions data correctly", {
# Partition by magnitude
partitionByMagnitude <- function(key) { if (key >= 3) 1 else 0 }
- resultRDD <- partitionBy(numPairsRdd, 2L, partitionByMagnitude)
+ resultRDD <- partitionByRDD(numPairsRdd, 2L, partitionByMagnitude)
expected_first <- list(list(1, 100), list(2, 200)) # key less than 3
expected_second <- list(list(4, -1), list(3, 1), list(3, 0)) # key greater than or equal 3
@@ -191,7 +191,7 @@ test_that("partitionBy works with dependencies", {
partitionByParity <- function(key) { if (key %% 2 == kOne) 7 else 4 }
# Partition by parity
- resultRDD <- partitionBy(numPairsRdd, numPartitions = 2L, partitionByParity)
+ resultRDD <- partitionByRDD(numPairsRdd, numPartitions = 2L, partitionByParity)
# keys even; 100 %% 2 == 0
expected_first <- list(list(2, 200), list(4, -1))
@@ -208,7 +208,7 @@ test_that("test partitionBy with string keys", {
words <- flatMap(strListRDD, function(line) { strsplit(line, " ")[[1]] })
wordCount <- lapply(words, function(word) { list(word, 1L) })
- resultRDD <- partitionBy(wordCount, 2L)
+ resultRDD <- partitionByRDD(wordCount, 2L)
expected_first <- list(list("Dexter", 1), list("Dexter", 1))
expected_second <- list(list("and", 1), list("and", 1))
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 39ed4febe5..3ccb8b6d77 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -490,7 +490,7 @@ test_that("read/write json files", {
test_that("jsonRDD() on a RDD with json string", {
sqlContext <- suppressWarnings(sparkRSQL.init(sc))
rdd <- parallelize(sc, mockLines)
- expect_equal(count(rdd), 3)
+ expect_equal(countRDD(rdd), 3)
df <- suppressWarnings(jsonRDD(sqlContext, rdd))
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 3)
@@ -582,7 +582,7 @@ test_that("toRDD() returns an RRDD", {
df <- read.json(jsonPath)
testRDD <- toRDD(df)
expect_is(testRDD, "RDD")
- expect_equal(count(testRDD), 3)
+ expect_equal(countRDD(testRDD), 3)
})
test_that("union on two RDDs created from DataFrames returns an RRDD", {
@@ -592,7 +592,7 @@ test_that("union on two RDDs created from DataFrames returns an RRDD", {
unioned <- unionRDD(RDD1, RDD2)
expect_is(unioned, "RDD")
expect_equal(getSerializedMode(unioned), "byte")
- expect_equal(collect(unioned)[[2]]$name, "Andy")
+ expect_equal(collectRDD(unioned)[[2]]$name, "Andy")
})
test_that("union on mixed serialization types correctly returns a byte RRDD", {
@@ -614,14 +614,14 @@ test_that("union on mixed serialization types correctly returns a byte RRDD", {
unionByte <- unionRDD(rdd, dfRDD)
expect_is(unionByte, "RDD")
expect_equal(getSerializedMode(unionByte), "byte")
- expect_equal(collect(unionByte)[[1]], 1)
- expect_equal(collect(unionByte)[[12]]$name, "Andy")
+ expect_equal(collectRDD(unionByte)[[1]], 1)
+ expect_equal(collectRDD(unionByte)[[12]]$name, "Andy")
unionString <- unionRDD(textRDD, dfRDD)
expect_is(unionString, "RDD")
expect_equal(getSerializedMode(unionString), "byte")
- expect_equal(collect(unionString)[[1]], "Michael")
- expect_equal(collect(unionString)[[5]]$name, "Andy")
+ expect_equal(collectRDD(unionString)[[1]], "Michael")
+ expect_equal(collectRDD(unionString)[[5]]$name, "Andy")
})
test_that("objectFile() works with row serialization", {
@@ -633,7 +633,7 @@ test_that("objectFile() works with row serialization", {
expect_is(objectIn, "RDD")
expect_equal(getSerializedMode(objectIn), "byte")
- expect_equal(collect(objectIn)[[2]]$age, 30)
+ expect_equal(collectRDD(objectIn)[[2]]$age, 30)
})
test_that("lapply() on a DataFrame returns an RDD with the correct columns", {
@@ -643,7 +643,7 @@ test_that("lapply() on a DataFrame returns an RDD with the correct columns", {
row
})
expect_is(testRDD, "RDD")
- collected <- collect(testRDD)
+ collected <- collectRDD(testRDD)
expect_equal(collected[[1]]$name, "Michael")
expect_equal(collected[[2]]$newCol, 35)
})
@@ -715,10 +715,10 @@ test_that("multiple pipeline transformations result in an RDD with the correct v
row
})
expect_is(second, "RDD")
- expect_equal(count(second), 3)
- expect_equal(collect(second)[[2]]$age, 35)
- expect_true(collect(second)[[2]]$testCol)
- expect_false(collect(second)[[3]]$testCol)
+ expect_equal(countRDD(second), 3)
+ expect_equal(collectRDD(second)[[2]]$age, 35)
+ expect_true(collectRDD(second)[[2]]$testCol)
+ expect_false(collectRDD(second)[[3]]$testCol)
})
test_that("cache(), persist(), and unpersist() on a DataFrame", {
@@ -1608,7 +1608,7 @@ test_that("toJSON() returns an RDD of the correct values", {
testRDD <- toJSON(df)
expect_is(testRDD, "RDD")
expect_equal(getSerializedMode(testRDD), "string")
- expect_equal(collect(testRDD)[[1]], mockLines[1])
+ expect_equal(collectRDD(testRDD)[[1]], mockLines[1])
})
test_that("showDF()", {
diff --git a/R/pkg/inst/tests/testthat/test_take.R b/R/pkg/inst/tests/testthat/test_take.R
index 07f00c9915..aaa532856c 100644
--- a/R/pkg/inst/tests/testthat/test_take.R
+++ b/R/pkg/inst/tests/testthat/test_take.R
@@ -36,34 +36,34 @@ sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext",
test_that("take() gives back the original elements in correct count and order", {
numVectorRDD <- parallelize(sc, numVector, 10)
# case: number of elements to take is less than the size of the first partition
- expect_equal(take(numVectorRDD, 1), as.list(head(numVector, n = 1)))
+ expect_equal(takeRDD(numVectorRDD, 1), as.list(head(numVector, n = 1)))
# case: number of elements to take is the same as the size of the first partition
- expect_equal(take(numVectorRDD, 11), as.list(head(numVector, n = 11)))
+ expect_equal(takeRDD(numVectorRDD, 11), as.list(head(numVector, n = 11)))
# case: number of elements to take is greater than all elements
- expect_equal(take(numVectorRDD, length(numVector)), as.list(numVector))
- expect_equal(take(numVectorRDD, length(numVector) + 1), as.list(numVector))
+ expect_equal(takeRDD(numVectorRDD, length(numVector)), as.list(numVector))
+ expect_equal(takeRDD(numVectorRDD, length(numVector) + 1), as.list(numVector))
numListRDD <- parallelize(sc, numList, 1)
numListRDD2 <- parallelize(sc, numList, 4)
- expect_equal(take(numListRDD, 3), take(numListRDD2, 3))
- expect_equal(take(numListRDD, 5), take(numListRDD2, 5))
- expect_equal(take(numListRDD, 1), as.list(head(numList, n = 1)))
- expect_equal(take(numListRDD2, 999), numList)
+ expect_equal(takeRDD(numListRDD, 3), takeRDD(numListRDD2, 3))
+ expect_equal(takeRDD(numListRDD, 5), takeRDD(numListRDD2, 5))
+ expect_equal(takeRDD(numListRDD, 1), as.list(head(numList, n = 1)))
+ expect_equal(takeRDD(numListRDD2, 999), numList)
strVectorRDD <- parallelize(sc, strVector, 2)
strVectorRDD2 <- parallelize(sc, strVector, 3)
- expect_equal(take(strVectorRDD, 4), as.list(strVector))
- expect_equal(take(strVectorRDD2, 2), as.list(head(strVector, n = 2)))
+ expect_equal(takeRDD(strVectorRDD, 4), as.list(strVector))
+ expect_equal(takeRDD(strVectorRDD2, 2), as.list(head(strVector, n = 2)))
strListRDD <- parallelize(sc, strList, 4)
strListRDD2 <- parallelize(sc, strList, 1)
- expect_equal(take(strListRDD, 3), as.list(head(strList, n = 3)))
- expect_equal(take(strListRDD2, 1), as.list(head(strList, n = 1)))
+ expect_equal(takeRDD(strListRDD, 3), as.list(head(strList, n = 3)))
+ expect_equal(takeRDD(strListRDD2, 1), as.list(head(strList, n = 1)))
- expect_equal(length(take(strListRDD, 0)), 0)
- expect_equal(length(take(strVectorRDD, 0)), 0)
- expect_equal(length(take(numListRDD, 0)), 0)
- expect_equal(length(take(numVectorRDD, 0)), 0)
+ expect_equal(length(takeRDD(strListRDD, 0)), 0)
+ expect_equal(length(takeRDD(strVectorRDD, 0)), 0)
+ expect_equal(length(takeRDD(numListRDD, 0)), 0)
+ expect_equal(length(takeRDD(numVectorRDD, 0)), 0)
})
sparkR.session.stop()
diff --git a/R/pkg/inst/tests/testthat/test_textFile.R b/R/pkg/inst/tests/testthat/test_textFile.R
index b7dcbe472a..3b466066e9 100644
--- a/R/pkg/inst/tests/testthat/test_textFile.R
+++ b/R/pkg/inst/tests/testthat/test_textFile.R
@@ -29,8 +29,8 @@ test_that("textFile() on a local file returns an RDD", {
rdd <- textFile(sc, fileName)
expect_is(rdd, "RDD")
- expect_true(count(rdd) > 0)
- expect_equal(count(rdd), 2)
+ expect_true(countRDD(rdd) > 0)
+ expect_equal(countRDD(rdd), 2)
unlink(fileName)
})
@@ -40,7 +40,7 @@ test_that("textFile() followed by a collect() returns the same content", {
writeLines(mockFile, fileName)
rdd <- textFile(sc, fileName)
- expect_equal(collect(rdd), as.list(mockFile))
+ expect_equal(collectRDD(rdd), as.list(mockFile))
unlink(fileName)
})
@@ -55,7 +55,7 @@ test_that("textFile() word count works as expected", {
wordCount <- lapply(words, function(word) { list(word, 1L) })
counts <- reduceByKey(wordCount, "+", 2L)
- output <- collect(counts)
+ output <- collectRDD(counts)
expected <- list(list("pretty.", 1), list("is", 2), list("awesome.", 1),
list("Spark", 2))
expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
@@ -72,7 +72,7 @@ test_that("several transformations on RDD created by textFile()", {
# PipelinedRDD initially created from RDD
rdd <- lapply(rdd, function(x) paste(x, x))
}
- collect(rdd)
+ collectRDD(rdd)
unlink(fileName)
})
@@ -85,7 +85,7 @@ test_that("textFile() followed by a saveAsTextFile() returns the same content",
rdd <- textFile(sc, fileName1, 1L)
saveAsTextFile(rdd, fileName2)
rdd <- textFile(sc, fileName2)
- expect_equal(collect(rdd), as.list(mockFile))
+ expect_equal(collectRDD(rdd), as.list(mockFile))
unlink(fileName1)
unlink(fileName2)
@@ -97,7 +97,7 @@ test_that("saveAsTextFile() on a parallelized list works as expected", {
rdd <- parallelize(sc, l, 1L)
saveAsTextFile(rdd, fileName)
rdd <- textFile(sc, fileName)
- expect_equal(collect(rdd), lapply(l, function(x) {toString(x)}))
+ expect_equal(collectRDD(rdd), lapply(l, function(x) {toString(x)}))
unlink(fileName)
})
@@ -117,7 +117,7 @@ test_that("textFile() and saveAsTextFile() word count works as expected", {
saveAsTextFile(counts, fileName2)
rdd <- textFile(sc, fileName2)
- output <- collect(rdd)
+ output <- collectRDD(rdd)
expected <- list(list("awesome.", 1), list("Spark", 2),
list("pretty.", 1), list("is", 2))
expectedStr <- lapply(expected, function(x) { toString(x) })
@@ -134,7 +134,7 @@ test_that("textFile() on multiple paths", {
writeLines("Spark is awesome.", fileName2)
rdd <- textFile(sc, c(fileName1, fileName2))
- expect_equal(count(rdd), 2)
+ expect_equal(countRDD(rdd), 2)
unlink(fileName1)
unlink(fileName2)
@@ -147,16 +147,16 @@ test_that("Pipelined operations on RDDs created using textFile", {
rdd <- textFile(sc, fileName)
lengths <- lapply(rdd, function(x) { length(x) })
- expect_equal(collect(lengths), list(1, 1))
+ expect_equal(collectRDD(lengths), list(1, 1))
lengthsPipelined <- lapply(lengths, function(x) { x + 10 })
- expect_equal(collect(lengthsPipelined), list(11, 11))
+ expect_equal(collectRDD(lengthsPipelined), list(11, 11))
lengths30 <- lapply(lengthsPipelined, function(x) { x + 20 })
- expect_equal(collect(lengths30), list(31, 31))
+ expect_equal(collectRDD(lengths30), list(31, 31))
lengths20 <- lapply(lengths, function(x) { x + 20 })
- expect_equal(collect(lengths20), list(21, 21))
+ expect_equal(collectRDD(lengths20), list(21, 21))
unlink(fileName)
})
diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R
index 58ff3debfa..83e94a1432 100644
--- a/R/pkg/inst/tests/testthat/test_utils.R
+++ b/R/pkg/inst/tests/testthat/test_utils.R
@@ -24,7 +24,7 @@ sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext",
test_that("convertJListToRList() gives back (deserializes) the original JLists
of strings and integers", {
# It's hard to manually create a Java List using rJava, since it does not
- # support generics well. Instead, we rely on collect() returning a
+ # support generics well. Instead, we rely on collectRDD() returning a
# JList.
nums <- as.list(1:10)
rdd <- parallelize(sc, nums, 1L)
@@ -48,7 +48,7 @@ test_that("serializeToBytes on RDD", {
text.rdd <- textFile(sc, fileName)
expect_equal(getSerializedMode(text.rdd), "string")
ser.rdd <- serializeToBytes(text.rdd)
- expect_equal(collect(ser.rdd), as.list(mockFile))
+ expect_equal(collectRDD(ser.rdd), as.list(mockFile))
expect_equal(getSerializedMode(ser.rdd), "byte")
unlink(fileName)
@@ -128,7 +128,7 @@ test_that("cleanClosure on R functions", {
env <- environment(newF)
expect_equal(ls(env), "t")
expect_equal(get("t", envir = env, inherits = FALSE), t)
- actual <- collect(lapply(rdd, f))
+ actual <- collectRDD(lapply(rdd, f))
expected <- as.list(c(rep(FALSE, 4), rep(TRUE, 6)))
expect_equal(actual, expected)