aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/inst/tests
diff options
context:
space:
mode:
authorhlin09 <hlin09pu@gmail.com>2015-04-27 15:04:37 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-04-27 15:04:37 -0700
commitca9f4ebb8e510e521bf4df0331375ddb385fb9d2 (patch)
tree768b88696324e9e65fdfaaccacea92ee82bb250e /R/pkg/inst/tests
parentef82bddc11d1aea42e22d2f85613a869cbe9a990 (diff)
downloadspark-ca9f4ebb8e510e521bf4df0331375ddb385fb9d2.tar.gz
spark-ca9f4ebb8e510e521bf4df0331375ddb385fb9d2.tar.bz2
spark-ca9f4ebb8e510e521bf4df0331375ddb385fb9d2.zip
[SPARK-6991] [SPARKR] Adds support for zipPartitions.
Author: hlin09 <hlin09pu@gmail.com> Closes #5568 from hlin09/zipPartitions and squashes the following commits: 12c08a5 [hlin09] Fix comments d2d32db [hlin09] Merge branch 'master' into zipPartitions ec56d2f [hlin09] Fix test. 27655d3 [hlin09] Adds support for zipPartitions.
Diffstat (limited to 'R/pkg/inst/tests')
-rw-r--r--R/pkg/inst/tests/test_binary_function.R33
1 files changed, 33 insertions, 0 deletions
diff --git a/R/pkg/inst/tests/test_binary_function.R b/R/pkg/inst/tests/test_binary_function.R
index c15553ba28..6785a7bdae 100644
--- a/R/pkg/inst/tests/test_binary_function.R
+++ b/R/pkg/inst/tests/test_binary_function.R
@@ -66,3 +66,36 @@ test_that("cogroup on two RDDs", {
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
})
+
+test_that("zipPartitions() on RDDs", {
+ rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
+ rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
+ rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
+ actual <- collect(zipPartitions(rdd1, rdd2, rdd3,
+ func = function(x, y, z) { list(list(x, y, z))} ))
+ expect_equal(actual,
+ list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))
+
+ mockFile = c("Spark is pretty.", "Spark is awesome.")
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName)
+
+ rdd <- textFile(sc, fileName, 1)
+ actual <- collect(zipPartitions(rdd, rdd,
+ func = function(x, y) { list(paste(x, y, sep = "\n")) }))
+ expected <- list(paste(mockFile, mockFile, sep = "\n"))
+ expect_equal(actual, expected)
+
+ rdd1 <- parallelize(sc, 0:1, 1)
+ actual <- collect(zipPartitions(rdd1, rdd,
+ func = function(x, y) { list(x + nchar(y)) }))
+ expected <- list(0:1 + nchar(mockFile))
+ expect_equal(actual, expected)
+
+ rdd <- map(rdd, function(x) { x })
+ actual <- collect(zipPartitions(rdd, rdd1,
+ func = function(x, y) { list(y + nchar(x)) }))
+ expect_equal(actual, expected)
+
+ unlink(fileName)
+})