diff options
author | hlin09 <hlin09pu@gmail.com> | 2015-04-27 15:04:37 -0700 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@cs.berkeley.edu> | 2015-04-27 15:04:37 -0700 |
commit | ca9f4ebb8e510e521bf4df0331375ddb385fb9d2 (patch) | |
tree | 768b88696324e9e65fdfaaccacea92ee82bb250e /R/pkg/inst/tests/test_binary_function.R | |
parent | ef82bddc11d1aea42e22d2f85613a869cbe9a990 (diff) | |
download | spark-ca9f4ebb8e510e521bf4df0331375ddb385fb9d2.tar.gz spark-ca9f4ebb8e510e521bf4df0331375ddb385fb9d2.tar.bz2 spark-ca9f4ebb8e510e521bf4df0331375ddb385fb9d2.zip |
[SPARK-6991] [SPARKR] Adds support for zipPartitions.
Author: hlin09 <hlin09pu@gmail.com>
Closes #5568 from hlin09/zipPartitions and squashes the following commits:
12c08a5 [hlin09] Fix comments
d2d32db [hlin09] Merge branch 'master' into zipPartitions
ec56d2f [hlin09] Fix test.
27655d3 [hlin09] Adds support for zipPartitions.
Diffstat (limited to 'R/pkg/inst/tests/test_binary_function.R')
-rw-r--r-- | R/pkg/inst/tests/test_binary_function.R | 33 |
1 files changed, 33 insertions, 0 deletions
diff --git a/R/pkg/inst/tests/test_binary_function.R b/R/pkg/inst/tests/test_binary_function.R index c15553ba28..6785a7bdae 100644 --- a/R/pkg/inst/tests/test_binary_function.R +++ b/R/pkg/inst/tests/test_binary_function.R @@ -66,3 +66,36 @@ test_that("cogroup on two RDDs", { expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) }) + +test_that("zipPartitions() on RDDs", { + rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2 + rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4 + rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6 + actual <- collect(zipPartitions(rdd1, rdd2, rdd3, + func = function(x, y, z) { list(list(x, y, z))} )) + expect_equal(actual, + list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))) + + mockFile = c("Spark is pretty.", "Spark is awesome.") + fileName <- tempfile(pattern="spark-test", fileext=".tmp") + writeLines(mockFile, fileName) + + rdd <- textFile(sc, fileName, 1) + actual <- collect(zipPartitions(rdd, rdd, + func = function(x, y) { list(paste(x, y, sep = "\n")) })) + expected <- list(paste(mockFile, mockFile, sep = "\n")) + expect_equal(actual, expected) + + rdd1 <- parallelize(sc, 0:1, 1) + actual <- collect(zipPartitions(rdd1, rdd, + func = function(x, y) { list(x + nchar(y)) })) + expected <- list(0:1 + nchar(mockFile)) + expect_equal(actual, expected) + + rdd <- map(rdd, function(x) { x }) + actual <- collect(zipPartitions(rdd, rdd1, + func = function(x, y) { list(y + nchar(x)) })) + expect_equal(actual, expected) + + unlink(fileName) +}) |