aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/inst/tests/testthat/test_rdd.R
diff options
context:
space:
mode:
Diffstat (limited to 'R/pkg/inst/tests/testthat/test_rdd.R')
-rw-r--r--R/pkg/inst/tests/testthat/test_rdd.R172
1 files changed, 86 insertions, 86 deletions
diff --git a/R/pkg/inst/tests/testthat/test_rdd.R b/R/pkg/inst/tests/testthat/test_rdd.R
index 508a3a7dfd..a3d66c245a 100644
--- a/R/pkg/inst/tests/testthat/test_rdd.R
+++ b/R/pkg/inst/tests/testthat/test_rdd.R
@@ -34,14 +34,14 @@ test_that("get number of partitions in RDD", {
})
test_that("first on RDD", {
- expect_equal(first(rdd), 1)
+ expect_equal(firstRDD(rdd), 1)
newrdd <- lapply(rdd, function(x) x + 1)
- expect_equal(first(newrdd), 2)
+ expect_equal(firstRDD(newrdd), 2)
})
test_that("count and length on RDD", {
- expect_equal(count(rdd), 10)
- expect_equal(length(rdd), 10)
+ expect_equal(countRDD(rdd), 10)
+ expect_equal(lengthRDD(rdd), 10)
})
test_that("count by values and keys", {
@@ -57,40 +57,40 @@ test_that("count by values and keys", {
test_that("lapply on RDD", {
multiples <- lapply(rdd, function(x) { 2 * x })
- actual <- collect(multiples)
+ actual <- collectRDD(multiples)
expect_equal(actual, as.list(nums * 2))
})
test_that("lapplyPartition on RDD", {
sums <- lapplyPartition(rdd, function(part) { sum(unlist(part)) })
- actual <- collect(sums)
+ actual <- collectRDD(sums)
expect_equal(actual, list(15, 40))
})
test_that("mapPartitions on RDD", {
sums <- mapPartitions(rdd, function(part) { sum(unlist(part)) })
- actual <- collect(sums)
+ actual <- collectRDD(sums)
expect_equal(actual, list(15, 40))
})
test_that("flatMap() on RDDs", {
flat <- flatMap(intRdd, function(x) { list(x, x) })
- actual <- collect(flat)
+ actual <- collectRDD(flat)
expect_equal(actual, rep(intPairs, each = 2))
})
test_that("filterRDD on RDD", {
filtered.rdd <- filterRDD(rdd, function(x) { x %% 2 == 0 })
- actual <- collect(filtered.rdd)
+ actual <- collectRDD(filtered.rdd)
expect_equal(actual, list(2, 4, 6, 8, 10))
filtered.rdd <- Filter(function(x) { x[[2]] < 0 }, intRdd)
- actual <- collect(filtered.rdd)
+ actual <- collectRDD(filtered.rdd)
expect_equal(actual, list(list(1L, -1)))
# Filter out all elements.
filtered.rdd <- filterRDD(rdd, function(x) { x > 10 })
- actual <- collect(filtered.rdd)
+ actual <- collectRDD(filtered.rdd)
expect_equal(actual, list())
})
@@ -110,7 +110,7 @@ test_that("several transformations on RDD (a benchmark on PipelinedRDD)", {
part <- as.list(unlist(part) * partIndex + i)
})
rdd2 <- lapply(rdd2, function(x) x + x)
- actual <- collect(rdd2)
+ actual <- collectRDD(rdd2)
expected <- list(24, 24, 24, 24, 24,
168, 170, 172, 174, 176)
expect_equal(actual, expected)
@@ -126,20 +126,20 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
part <- as.list(unlist(part) * partIndex)
})
- cache(rdd2)
+ cacheRDD(rdd2)
expect_true(rdd2@env$isCached)
rdd2 <- lapply(rdd2, function(x) x)
expect_false(rdd2@env$isCached)
- unpersist(rdd2)
+ unpersistRDD(rdd2)
expect_false(rdd2@env$isCached)
- persist(rdd2, "MEMORY_AND_DISK")
+ persistRDD(rdd2, "MEMORY_AND_DISK")
expect_true(rdd2@env$isCached)
rdd2 <- lapply(rdd2, function(x) x)
expect_false(rdd2@env$isCached)
- unpersist(rdd2)
+ unpersistRDD(rdd2)
expect_false(rdd2@env$isCached)
tempDir <- tempfile(pattern = "checkpoint")
@@ -152,7 +152,7 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
expect_false(rdd2@env$isCheckpointed)
# make sure the data is collectable
- collect(rdd2)
+ collectRDD(rdd2)
unlink(tempDir)
})
@@ -169,21 +169,21 @@ test_that("reduce on RDD", {
test_that("lapply with dependency", {
fa <- 5
multiples <- lapply(rdd, function(x) { fa * x })
- actual <- collect(multiples)
+ actual <- collectRDD(multiples)
expect_equal(actual, as.list(nums * 5))
})
test_that("lapplyPartitionsWithIndex on RDDs", {
func <- function(partIndex, part) { list(partIndex, Reduce("+", part)) }
- actual <- collect(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
+ actual <- collectRDD(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
expect_equal(actual, list(list(0, 15), list(1, 40)))
pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L)
partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 }
mkTup <- function(partIndex, part) { list(partIndex, part) }
- actual <- collect(lapplyPartitionsWithIndex(
- partitionBy(pairsRDD, 2L, partitionByParity),
+ actual <- collectRDD(lapplyPartitionsWithIndex(
+ partitionByRDD(pairsRDD, 2L, partitionByParity),
mkTup),
FALSE)
expect_equal(actual, list(list(0, list(list(1, 2), list(3, 4))),
@@ -191,7 +191,7 @@ test_that("lapplyPartitionsWithIndex on RDDs", {
})
test_that("sampleRDD() on RDDs", {
- expect_equal(unlist(collect(sampleRDD(rdd, FALSE, 1.0, 2014L))), nums)
+ expect_equal(unlist(collectRDD(sampleRDD(rdd, FALSE, 1.0, 2014L))), nums)
})
test_that("takeSample() on RDDs", {
@@ -238,7 +238,7 @@ test_that("takeSample() on RDDs", {
test_that("mapValues() on pairwise RDDs", {
multiples <- mapValues(intRdd, function(x) { x * 2 })
- actual <- collect(multiples)
+ actual <- collectRDD(multiples)
expected <- lapply(intPairs, function(x) {
list(x[[1]], x[[2]] * 2)
})
@@ -247,11 +247,11 @@ test_that("mapValues() on pairwise RDDs", {
test_that("flatMapValues() on pairwise RDDs", {
l <- parallelize(sc, list(list(1, c(1, 2)), list(2, c(3, 4))))
- actual <- collect(flatMapValues(l, function(x) { x }))
+ actual <- collectRDD(flatMapValues(l, function(x) { x }))
expect_equal(actual, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
# Generate x to x+1 for every value
- actual <- collect(flatMapValues(intRdd, function(x) { x: (x + 1) }))
+ actual <- collectRDD(flatMapValues(intRdd, function(x) { x: (x + 1) }))
expect_equal(actual,
list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101),
list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201)))
@@ -273,8 +273,8 @@ test_that("reduceByKeyLocally() on PairwiseRDDs", {
test_that("distinct() on RDDs", {
nums.rep2 <- rep(1:10, 2)
rdd.rep2 <- parallelize(sc, nums.rep2, 2L)
- uniques <- distinct(rdd.rep2)
- actual <- sort(unlist(collect(uniques)))
+ uniques <- distinctRDD(rdd.rep2)
+ actual <- sort(unlist(collectRDD(uniques)))
expect_equal(actual, nums)
})
@@ -296,7 +296,7 @@ test_that("sumRDD() on RDDs", {
test_that("keyBy on RDDs", {
func <- function(x) { x * x }
keys <- keyBy(rdd, func)
- actual <- collect(keys)
+ actual <- collectRDD(keys)
expect_equal(actual, lapply(nums, function(x) { list(func(x), x) }))
})
@@ -304,12 +304,12 @@ test_that("repartition/coalesce on RDDs", {
rdd <- parallelize(sc, 1:20, 4L) # each partition contains 5 elements
# repartition
- r1 <- repartition(rdd, 2)
+ r1 <- repartitionRDD(rdd, 2)
expect_equal(getNumPartitions(r1), 2L)
count <- length(collectPartition(r1, 0L))
expect_true(count >= 8 && count <= 12)
- r2 <- repartition(rdd, 6)
+ r2 <- repartitionRDD(rdd, 6)
expect_equal(getNumPartitions(r2), 6L)
count <- length(collectPartition(r2, 0L))
expect_true(count >= 0 && count <= 4)
@@ -323,12 +323,12 @@ test_that("repartition/coalesce on RDDs", {
test_that("sortBy() on RDDs", {
sortedRdd <- sortBy(rdd, function(x) { x * x }, ascending = FALSE)
- actual <- collect(sortedRdd)
+ actual <- collectRDD(sortedRdd)
expect_equal(actual, as.list(sort(nums, decreasing = TRUE)))
rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
sortedRdd2 <- sortBy(rdd2, function(x) { x * x })
- actual <- collect(sortedRdd2)
+ actual <- collectRDD(sortedRdd2)
expect_equal(actual, as.list(nums))
})
@@ -380,13 +380,13 @@ test_that("aggregateRDD() on RDDs", {
test_that("zipWithUniqueId() on RDDs", {
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
- actual <- collect(zipWithUniqueId(rdd))
+ actual <- collectRDD(zipWithUniqueId(rdd))
expected <- list(list("a", 0), list("b", 3), list("c", 1),
list("d", 4), list("e", 2))
expect_equal(actual, expected)
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
- actual <- collect(zipWithUniqueId(rdd))
+ actual <- collectRDD(zipWithUniqueId(rdd))
expected <- list(list("a", 0), list("b", 1), list("c", 2),
list("d", 3), list("e", 4))
expect_equal(actual, expected)
@@ -394,13 +394,13 @@ test_that("zipWithUniqueId() on RDDs", {
test_that("zipWithIndex() on RDDs", {
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
- actual <- collect(zipWithIndex(rdd))
+ actual <- collectRDD(zipWithIndex(rdd))
expected <- list(list("a", 0), list("b", 1), list("c", 2),
list("d", 3), list("e", 4))
expect_equal(actual, expected)
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
- actual <- collect(zipWithIndex(rdd))
+ actual <- collectRDD(zipWithIndex(rdd))
expected <- list(list("a", 0), list("b", 1), list("c", 2),
list("d", 3), list("e", 4))
expect_equal(actual, expected)
@@ -408,35 +408,35 @@ test_that("zipWithIndex() on RDDs", {
test_that("glom() on RDD", {
rdd <- parallelize(sc, as.list(1:4), 2L)
- actual <- collect(glom(rdd))
+ actual <- collectRDD(glom(rdd))
expect_equal(actual, list(list(1, 2), list(3, 4)))
})
test_that("keys() on RDDs", {
keys <- keys(intRdd)
- actual <- collect(keys)
+ actual <- collectRDD(keys)
expect_equal(actual, lapply(intPairs, function(x) { x[[1]] }))
})
test_that("values() on RDDs", {
values <- values(intRdd)
- actual <- collect(values)
+ actual <- collectRDD(values)
expect_equal(actual, lapply(intPairs, function(x) { x[[2]] }))
})
test_that("pipeRDD() on RDDs", {
- actual <- collect(pipeRDD(rdd, "more"))
+ actual <- collectRDD(pipeRDD(rdd, "more"))
expected <- as.list(as.character(1:10))
expect_equal(actual, expected)
trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n"))
- actual <- collect(pipeRDD(trailed.rdd, "sort"))
+ actual <- collectRDD(pipeRDD(trailed.rdd, "sort"))
expected <- list("", "1", "2", "3")
expect_equal(actual, expected)
rev.nums <- 9:0
rev.rdd <- parallelize(sc, rev.nums, 2L)
- actual <- collect(pipeRDD(rev.rdd, "sort"))
+ actual <- collectRDD(pipeRDD(rev.rdd, "sort"))
expected <- as.list(as.character(c(5:9, 0:4)))
expect_equal(actual, expected)
})
@@ -444,7 +444,7 @@ test_that("pipeRDD() on RDDs", {
test_that("zipRDD() on RDDs", {
rdd1 <- parallelize(sc, 0:4, 2)
rdd2 <- parallelize(sc, 1000:1004, 2)
- actual <- collect(zipRDD(rdd1, rdd2))
+ actual <- collectRDD(zipRDD(rdd1, rdd2))
expect_equal(actual,
list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004)))
@@ -453,17 +453,17 @@ test_that("zipRDD() on RDDs", {
writeLines(mockFile, fileName)
rdd <- textFile(sc, fileName, 1)
- actual <- collect(zipRDD(rdd, rdd))
+ actual <- collectRDD(zipRDD(rdd, rdd))
expected <- lapply(mockFile, function(x) { list(x, x) })
expect_equal(actual, expected)
rdd1 <- parallelize(sc, 0:1, 1)
- actual <- collect(zipRDD(rdd1, rdd))
+ actual <- collectRDD(zipRDD(rdd1, rdd))
expected <- lapply(0:1, function(x) { list(x, mockFile[x + 1]) })
expect_equal(actual, expected)
rdd1 <- map(rdd, function(x) { x })
- actual <- collect(zipRDD(rdd, rdd1))
+ actual <- collectRDD(zipRDD(rdd, rdd1))
expected <- lapply(mockFile, function(x) { list(x, x) })
expect_equal(actual, expected)
@@ -472,7 +472,7 @@ test_that("zipRDD() on RDDs", {
test_that("cartesian() on RDDs", {
rdd <- parallelize(sc, 1:3)
- actual <- collect(cartesian(rdd, rdd))
+ actual <- collectRDD(cartesian(rdd, rdd))
expect_equal(sortKeyValueList(actual),
list(
list(1, 1), list(1, 2), list(1, 3),
@@ -481,7 +481,7 @@ test_that("cartesian() on RDDs", {
# test case where one RDD is empty
emptyRdd <- parallelize(sc, list())
- actual <- collect(cartesian(rdd, emptyRdd))
+ actual <- collectRDD(cartesian(rdd, emptyRdd))
expect_equal(actual, list())
mockFile <- c("Spark is pretty.", "Spark is awesome.")
@@ -489,7 +489,7 @@ test_that("cartesian() on RDDs", {
writeLines(mockFile, fileName)
rdd <- textFile(sc, fileName)
- actual <- collect(cartesian(rdd, rdd))
+ actual <- collectRDD(cartesian(rdd, rdd))
expected <- list(
list("Spark is awesome.", "Spark is pretty."),
list("Spark is awesome.", "Spark is awesome."),
@@ -498,7 +498,7 @@ test_that("cartesian() on RDDs", {
expect_equal(sortKeyValueList(actual), expected)
rdd1 <- parallelize(sc, 0:1)
- actual <- collect(cartesian(rdd1, rdd))
+ actual <- collectRDD(cartesian(rdd1, rdd))
expect_equal(sortKeyValueList(actual),
list(
list(0, "Spark is pretty."),
@@ -507,7 +507,7 @@ test_that("cartesian() on RDDs", {
list(1, "Spark is awesome.")))
rdd1 <- map(rdd, function(x) { x })
- actual <- collect(cartesian(rdd, rdd1))
+ actual <- collectRDD(cartesian(rdd, rdd1))
expect_equal(sortKeyValueList(actual), expected)
unlink(fileName)
@@ -518,24 +518,24 @@ test_that("subtract() on RDDs", {
rdd1 <- parallelize(sc, l)
# subtract by itself
- actual <- collect(subtract(rdd1, rdd1))
+ actual <- collectRDD(subtract(rdd1, rdd1))
expect_equal(actual, list())
# subtract by an empty RDD
rdd2 <- parallelize(sc, list())
- actual <- collect(subtract(rdd1, rdd2))
+ actual <- collectRDD(subtract(rdd1, rdd2))
expect_equal(as.list(sort(as.vector(actual, mode = "integer"))),
l)
rdd2 <- parallelize(sc, list(2, 4))
- actual <- collect(subtract(rdd1, rdd2))
+ actual <- collectRDD(subtract(rdd1, rdd2))
expect_equal(as.list(sort(as.vector(actual, mode = "integer"))),
list(1, 1, 3))
l <- list("a", "a", "b", "b", "c", "d")
rdd1 <- parallelize(sc, l)
rdd2 <- parallelize(sc, list("b", "d"))
- actual <- collect(subtract(rdd1, rdd2))
+ actual <- collectRDD(subtract(rdd1, rdd2))
expect_equal(as.list(sort(as.vector(actual, mode = "character"))),
list("a", "a", "c"))
})
@@ -546,17 +546,17 @@ test_that("subtractByKey() on pairwise RDDs", {
rdd1 <- parallelize(sc, l)
# subtractByKey by itself
- actual <- collect(subtractByKey(rdd1, rdd1))
+ actual <- collectRDD(subtractByKey(rdd1, rdd1))
expect_equal(actual, list())
# subtractByKey by an empty RDD
rdd2 <- parallelize(sc, list())
- actual <- collect(subtractByKey(rdd1, rdd2))
+ actual <- collectRDD(subtractByKey(rdd1, rdd2))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(l))
rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
- actual <- collect(subtractByKey(rdd1, rdd2))
+ actual <- collectRDD(subtractByKey(rdd1, rdd2))
expect_equal(actual,
list(list("b", 4), list("b", 5)))
@@ -564,76 +564,76 @@ test_that("subtractByKey() on pairwise RDDs", {
list(2, 5), list(1, 2))
rdd1 <- parallelize(sc, l)
rdd2 <- parallelize(sc, list(list(1, 3), list(3, 1)))
- actual <- collect(subtractByKey(rdd1, rdd2))
+ actual <- collectRDD(subtractByKey(rdd1, rdd2))
expect_equal(actual,
list(list(2, 4), list(2, 5)))
})
test_that("intersection() on RDDs", {
# intersection with self
- actual <- collect(intersection(rdd, rdd))
+ actual <- collectRDD(intersection(rdd, rdd))
expect_equal(sort(as.integer(actual)), nums)
# intersection with an empty RDD
emptyRdd <- parallelize(sc, list())
- actual <- collect(intersection(rdd, emptyRdd))
+ actual <- collectRDD(intersection(rdd, emptyRdd))
expect_equal(actual, list())
rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
- actual <- collect(intersection(rdd1, rdd2))
+ actual <- collectRDD(intersection(rdd1, rdd2))
expect_equal(sort(as.integer(actual)), 1:3)
})
test_that("join() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
- actual <- collect(join(rdd1, rdd2, 2L))
+ actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list(1, list(1, 2)), list(1, list(1, 3)))))
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4)))
rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3)))
- actual <- collect(join(rdd1, rdd2, 2L))
+ actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list("a", list(1, 2)), list("a", list(1, 3)))))
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
- actual <- collect(join(rdd1, rdd2, 2L))
+ actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
expect_equal(actual, list())
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
- actual <- collect(join(rdd1, rdd2, 2L))
+ actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
expect_equal(actual, list())
})
test_that("leftOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
- actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
expected <- list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4)))
rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3)))
- actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
expected <- list(list("b", list(4, NULL)), list("a", list(1, 2)), list("a", list(1, 3)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
- actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
expected <- list(list(1, list(1, NULL)), list(2, list(2, NULL)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
- actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
expected <- list(list("b", list(2, NULL)), list("a", list(1, NULL)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
@@ -642,26 +642,26 @@ test_that("leftOuterJoin() on pairwise RDDs", {
test_that("rightOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
- actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3)))
rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4)))
- actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
- actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
- actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
})
@@ -669,14 +669,14 @@ test_that("rightOuterJoin() on pairwise RDDs", {
test_that("fullOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
- actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
expected <- list(list(1, list(2, 1)), list(1, list(3, 1)),
list(2, list(NULL, 4)), list(3, list(3, NULL)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3), list("c", 1)))
rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4)))
- actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)),
list("a", list(3, 1)), list("c", list(1, NULL)))
expect_equal(sortKeyValueList(actual),
@@ -684,14 +684,14 @@ test_that("fullOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
- actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)),
list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
- actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)),
list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
@@ -700,21 +700,21 @@ test_that("fullOuterJoin() on pairwise RDDs", {
test_that("sortByKey() on pairwise RDDs", {
numPairsRdd <- map(rdd, function(x) { list (x, x) })
sortedRdd <- sortByKey(numPairsRdd, ascending = FALSE)
- actual <- collect(sortedRdd)
+ actual <- collectRDD(sortedRdd)
numPairs <- lapply(nums, function(x) { list (x, x) })
expect_equal(actual, sortKeyValueList(numPairs, decreasing = TRUE))
rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
numPairsRdd2 <- map(rdd2, function(x) { list (x, x) })
sortedRdd2 <- sortByKey(numPairsRdd2)
- actual <- collect(sortedRdd2)
+ actual <- collectRDD(sortedRdd2)
expect_equal(actual, numPairs)
# sort by string keys
l <- list(list("a", 1), list("b", 2), list("1", 3), list("d", 4), list("2", 5))
rdd3 <- parallelize(sc, l, 2L)
sortedRdd3 <- sortByKey(rdd3)
- actual <- collect(sortedRdd3)
+ actual <- collectRDD(sortedRdd3)
expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
# test on the boundary cases
@@ -722,27 +722,27 @@ test_that("sortByKey() on pairwise RDDs", {
# boundary case 1: the RDD to be sorted has only 1 partition
rdd4 <- parallelize(sc, l, 1L)
sortedRdd4 <- sortByKey(rdd4)
- actual <- collect(sortedRdd4)
+ actual <- collectRDD(sortedRdd4)
expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
# boundary case 2: the sorted RDD has only 1 partition
rdd5 <- parallelize(sc, l, 2L)
sortedRdd5 <- sortByKey(rdd5, numPartitions = 1L)
- actual <- collect(sortedRdd5)
+ actual <- collectRDD(sortedRdd5)
expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
# boundary case 3: the RDD to be sorted has only 1 element
l2 <- list(list("a", 1))
rdd6 <- parallelize(sc, l2, 2L)
sortedRdd6 <- sortByKey(rdd6)
- actual <- collect(sortedRdd6)
+ actual <- collectRDD(sortedRdd6)
expect_equal(actual, l2)
# boundary case 4: the RDD to be sorted has 0 element
l3 <- list()
rdd7 <- parallelize(sc, l3, 2L)
sortedRdd7 <- sortByKey(rdd7)
- actual <- collect(sortedRdd7)
+ actual <- collectRDD(sortedRdd7)
expect_equal(actual, l3)
})
@@ -766,7 +766,7 @@ test_that("collectAsMap() on a pairwise RDD", {
test_that("show()", {
rdd <- parallelize(sc, list(1:10))
- expect_output(show(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+")
+ expect_output(showRDD(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+")
})
test_that("sampleByKey() on pairwise RDDs", {