# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # context("partitionBy, groupByKey, reduceByKey etc.") # JavaSparkContext handle sparkSession <- sparkR.session(enableHiveSupport = FALSE) sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) # Data intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200)) intRdd <- parallelize(sc, intPairs, 2L) doublePairs <- list(list(1.5, -1), list(2.5, 100), list(2.5, 1), list(1.5, 200)) doubleRdd <- parallelize(sc, doublePairs, 2L) numPairs <- list(list(1L, 100), list(2L, 200), list(4L, -1), list(3L, 1), list(3L, 0)) numPairsRdd <- parallelize(sc, numPairs, length(numPairs)) strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge and ", "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ") strListRDD <- parallelize(sc, strList, 4) test_that("groupByKey for integers", { grouped <- groupByKey(intRdd, 2L) actual <- collectRDD(grouped) expected <- list(list(2L, list(100, 1)), list(1L, list(-1, 200))) expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) }) test_that("groupByKey for doubles", { grouped <- groupByKey(doubleRdd, 2L) actual <- collectRDD(grouped) expected <- list(list(1.5, list(-1, 200)), list(2.5, list(100, 1))) expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) }) test_that("reduceByKey for ints", { reduced <- reduceByKey(intRdd, "+", 2L) actual <- collectRDD(reduced) expected <- list(list(2L, 101), list(1L, 199)) expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) }) test_that("reduceByKey for doubles", { reduced <- reduceByKey(doubleRdd, "+", 2L) actual <- collectRDD(reduced) expected <- list(list(1.5, 199), list(2.5, 101)) expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) }) test_that("combineByKey for ints", { reduced <- combineByKey(intRdd, function(x) { x }, "+", "+", 2L) actual <- collectRDD(reduced) expected <- list(list(2L, 101), list(1L, 199)) expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) }) test_that("combineByKey for doubles", { reduced <- combineByKey(doubleRdd, function(x) { x }, "+", "+", 2L) actual <- collectRDD(reduced) expected <- list(list(1.5, 199), list(2.5, 101)) expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) }) test_that("combineByKey for characters", { stringKeyRDD <- parallelize(sc, list(list("max", 1L), list("min", 2L), list("other", 3L), list("max", 4L)), 2L) reduced <- combineByKey(stringKeyRDD, function(x) { x }, "+", "+", 2L) actual <- collectRDD(reduced) expected <- list(list("max", 5L), list("min", 2L), list("other", 3L)) expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) }) test_that("aggregateByKey", { # test aggregateByKey for int keys rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4))) zeroValue <- list(0, 0) seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) } combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) } aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L) actual <- collectRDD(aggregatedRDD) expected <- list(list(1, list(3, 2)), list(2, list(7, 2))) expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) # test aggregateByKey for string keys rdd <- parallelize(sc, list(list("a", 1), list("a", 2), list("b", 3), list("b", 4))) zeroValue <- list(0, 0) seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) } combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) } aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L) actual <- collectRDD(aggregatedRDD) expected <- list(list("a", list(3, 2)), list("b", list(7, 2))) expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) }) test_that("foldByKey", { # test foldByKey for int keys folded <- foldByKey(intRdd, 0, "+", 2L) actual <- collectRDD(folded) expected <- list(list(2L, 101), list(1L, 199)) expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) # test foldByKey for double keys folded <- foldByKey(doubleRdd, 0, "+", 2L) actual <- collectRDD(folded) expected <- list(list(1.5, 199), list(2.5, 101)) expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) # test foldByKey for string keys stringKeyPairs <- list(list("a", -1), list("b", 100), list("b", 1), list("a", 200)) stringKeyRDD <- parallelize(sc, stringKeyPairs) folded <- foldByKey(stringKeyRDD, 0, "+", 2L) actual <- collectRDD(folded) expected <- list(list("b", 101), list("a", 199)) expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) # test foldByKey for empty pair RDD rdd <- parallelize(sc, list()) folded <- foldByKey(rdd, 0, "+", 2L) actual <- collectRDD(folded) expected <- list() expect_equal(actual, expected) # test foldByKey for RDD with only 1 pair rdd <- parallelize(sc, list(list(1, 1))) folded <- foldByKey(rdd, 0, "+", 2L) actual <- collectRDD(folded) expected <- list(list(1, 1)) expect_equal(actual, expected) }) test_that("partitionBy() partitions data correctly", { # Partition by magnitude partitionByMagnitude <- function(key) { if (key >= 3) 1 else 0 } resultRDD <- partitionByRDD(numPairsRdd, 2L, partitionByMagnitude) expected_first <- list(list(1, 100), list(2, 200)) # key less than 3 expected_second <- list(list(4, -1), list(3, 1), list(3, 0)) # key greater than or equal 3 actual_first <- collectPartition(resultRDD, 0L) actual_second <- collectPartition(resultRDD, 1L) expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first)) expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second)) }) test_that("partitionBy works with dependencies", { kOne <- 1 partitionByParity <- function(key) { if (key %% 2 == kOne) 7 else 4 } # Partition by parity resultRDD <- partitionByRDD(numPairsRdd, numPartitions = 2L, partitionByParity) # keys even; 100 %% 2 == 0 expected_first <- list(list(2, 200), list(4, -1)) # keys odd; 3 %% 2 == 1 expected_second <- list(list(1, 100), list(3, 1), list(3, 0)) actual_first <- collectPartition(resultRDD, 0L) actual_second <- collectPartition(resultRDD, 1L) expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first)) expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second)) }) test_that("test partitionBy with string keys", { words <- flatMap(strListRDD, function(line) { strsplit(line, " ")[[1]] }) wordCount <- lapply(words, function(word) { list(word, 1L) }) resultRDD <- partitionByRDD(wordCount, 2L) expected_first <- list(list("Dexter", 1), list("Dexter", 1)) expected_second <- list(list("and", 1), list("and", 1)) actual_first <- Filter(function(item) { item[[1]] == "Dexter" }, collectPartition(resultRDD, 0L)) actual_second <- Filter(function(item) { item[[1]] == "and" }, collectPartition(resultRDD, 1L)) expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first)) expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second)) }) sparkR.session.stop()