aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/inst
diff options
context:
space:
mode:
Diffstat (limited to 'R/pkg/inst')
-rw-r--r--R/pkg/inst/profile/general.R22
-rw-r--r--R/pkg/inst/profile/shell.R31
-rw-r--r--R/pkg/inst/tests/test_binaryFile.R90
-rw-r--r--R/pkg/inst/tests/test_binary_function.R68
-rw-r--r--R/pkg/inst/tests/test_broadcast.R48
-rw-r--r--R/pkg/inst/tests/test_context.R50
-rw-r--r--R/pkg/inst/tests/test_includePackage.R57
-rw-r--r--R/pkg/inst/tests/test_parallelize_collect.R109
-rw-r--r--R/pkg/inst/tests/test_rdd.R644
-rw-r--r--R/pkg/inst/tests/test_shuffle.R209
-rw-r--r--R/pkg/inst/tests/test_sparkSQL.R695
-rw-r--r--R/pkg/inst/tests/test_take.R67
-rw-r--r--R/pkg/inst/tests/test_textFile.R162
-rw-r--r--R/pkg/inst/tests/test_utils.R137
-rw-r--r--R/pkg/inst/worker/daemon.R52
-rw-r--r--R/pkg/inst/worker/worker.R128
16 files changed, 2569 insertions, 0 deletions
diff --git a/R/pkg/inst/profile/general.R b/R/pkg/inst/profile/general.R
new file mode 100644
index 0000000000..8fe711b622
--- /dev/null
+++ b/R/pkg/inst/profile/general.R
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+.First <- function() {
+ home <- Sys.getenv("SPARK_HOME")
+ .libPaths(c(file.path(home, "R", "lib"), .libPaths()))
+ Sys.setenv(NOAWT=1)
+}
diff --git a/R/pkg/inst/profile/shell.R b/R/pkg/inst/profile/shell.R
new file mode 100644
index 0000000000..7a7f203115
--- /dev/null
+++ b/R/pkg/inst/profile/shell.R
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+.First <- function() {
+ home <- Sys.getenv("SPARK_HOME")
+ .libPaths(c(file.path(home, "R", "lib"), .libPaths()))
+ Sys.setenv(NOAWT=1)
+
+ library(utils)
+ library(SparkR)
+ sc <- sparkR.init(Sys.getenv("MASTER", unset = ""))
+ assign("sc", sc, envir=.GlobalEnv)
+ sqlCtx <- sparkRSQL.init(sc)
+ assign("sqlCtx", sqlCtx, envir=.GlobalEnv)
+ cat("\n Welcome to SparkR!")
+ cat("\n Spark context is available as sc, SQL context is available as sqlCtx\n")
+}
diff --git a/R/pkg/inst/tests/test_binaryFile.R b/R/pkg/inst/tests/test_binaryFile.R
new file mode 100644
index 0000000000..4bb5f58d83
--- /dev/null
+++ b/R/pkg/inst/tests/test_binaryFile.R
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("functions on binary files")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+mockFile = c("Spark is pretty.", "Spark is awesome.")
+
+test_that("saveAsObjectFile()/objectFile() following textFile() works", {
+ fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
+ fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName1)
+
+ rdd <- textFile(sc, fileName1)
+ saveAsObjectFile(rdd, fileName2)
+ rdd <- objectFile(sc, fileName2)
+ expect_equal(collect(rdd), as.list(mockFile))
+
+ unlink(fileName1)
+ unlink(fileName2, recursive = TRUE)
+})
+
+test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+
+ l <- list(1, 2, 3)
+ rdd <- parallelize(sc, l)
+ saveAsObjectFile(rdd, fileName)
+ rdd <- objectFile(sc, fileName)
+ expect_equal(collect(rdd), l)
+
+ unlink(fileName, recursive = TRUE)
+})
+
+test_that("saveAsObjectFile()/objectFile() following RDD transformations works", {
+ fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
+ fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName1)
+
+ rdd <- textFile(sc, fileName1)
+
+ words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
+ wordCount <- lapply(words, function(word) { list(word, 1L) })
+
+ counts <- reduceByKey(wordCount, "+", 2L)
+
+ saveAsObjectFile(counts, fileName2)
+ counts <- objectFile(sc, fileName2)
+
+ output <- collect(counts)
+ expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1),
+ list("is", 2))
+ expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
+
+ unlink(fileName1)
+ unlink(fileName2, recursive = TRUE)
+})
+
+test_that("saveAsObjectFile()/objectFile() works with multiple paths", {
+ fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
+ fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
+
+ rdd1 <- parallelize(sc, "Spark is pretty.")
+ saveAsObjectFile(rdd1, fileName1)
+ rdd2 <- parallelize(sc, "Spark is awesome.")
+ saveAsObjectFile(rdd2, fileName2)
+
+ rdd <- objectFile(sc, c(fileName1, fileName2))
+ expect_true(count(rdd) == 2)
+
+ unlink(fileName1, recursive = TRUE)
+ unlink(fileName2, recursive = TRUE)
+})
+
diff --git a/R/pkg/inst/tests/test_binary_function.R b/R/pkg/inst/tests/test_binary_function.R
new file mode 100644
index 0000000000..c15553ba28
--- /dev/null
+++ b/R/pkg/inst/tests/test_binary_function.R
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("binary functions")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+# Data
+nums <- 1:10
+rdd <- parallelize(sc, nums, 2L)
+
+# File content
+mockFile <- c("Spark is pretty.", "Spark is awesome.")
+
+test_that("union on two RDDs", {
+ actual <- collect(unionRDD(rdd, rdd))
+ expect_equal(actual, as.list(rep(nums, 2)))
+
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName)
+
+ text.rdd <- textFile(sc, fileName)
+ union.rdd <- unionRDD(rdd, text.rdd)
+ actual <- collect(union.rdd)
+ expect_equal(actual, c(as.list(nums), mockFile))
+ expect_true(getSerializedMode(union.rdd) == "byte")
+
+ rdd<- map(text.rdd, function(x) {x})
+ union.rdd <- unionRDD(rdd, text.rdd)
+ actual <- collect(union.rdd)
+ expect_equal(actual, as.list(c(mockFile, mockFile)))
+ expect_true(getSerializedMode(union.rdd) == "byte")
+
+ unlink(fileName)
+})
+
+test_that("cogroup on two RDDs", {
+ rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+ rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+ cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
+ actual <- collect(cogroup.rdd)
+ expect_equal(actual,
+ list(list(1, list(list(1), list(2, 3))), list(2, list(list(4), list()))))
+
+ rdd1 <- parallelize(sc, list(list("a", 1), list("a", 4)))
+ rdd2 <- parallelize(sc, list(list("b", 2), list("a", 3)))
+ cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
+ actual <- collect(cogroup.rdd)
+
+ expected <- list(list("b", list(list(), list(2))), list("a", list(list(1, 4), list(3))))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+})
diff --git a/R/pkg/inst/tests/test_broadcast.R b/R/pkg/inst/tests/test_broadcast.R
new file mode 100644
index 0000000000..fee91a427d
--- /dev/null
+++ b/R/pkg/inst/tests/test_broadcast.R
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("broadcast variables")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+# Partitioned data
+nums <- 1:2
+rrdd <- parallelize(sc, nums, 2L)
+
+test_that("using broadcast variable", {
+ randomMat <- matrix(nrow=10, ncol=10, data=rnorm(100))
+ randomMatBr <- broadcast(sc, randomMat)
+
+ useBroadcast <- function(x) {
+ sum(value(randomMatBr) * x)
+ }
+ actual <- collect(lapply(rrdd, useBroadcast))
+ expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
+ expect_equal(actual, expected)
+})
+
+test_that("without using broadcast variable", {
+ randomMat <- matrix(nrow=10, ncol=10, data=rnorm(100))
+
+ useBroadcast <- function(x) {
+ sum(randomMat * x)
+ }
+ actual <- collect(lapply(rrdd, useBroadcast))
+ expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
+ expect_equal(actual, expected)
+})
diff --git a/R/pkg/inst/tests/test_context.R b/R/pkg/inst/tests/test_context.R
new file mode 100644
index 0000000000..e4aab37436
--- /dev/null
+++ b/R/pkg/inst/tests/test_context.R
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("test functions in sparkR.R")
+
+test_that("repeatedly starting and stopping SparkR", {
+ for (i in 1:4) {
+ sc <- sparkR.init()
+ rdd <- parallelize(sc, 1:20, 2L)
+ expect_equal(count(rdd), 20)
+ sparkR.stop()
+ }
+})
+
+test_that("rdd GC across sparkR.stop", {
+ sparkR.stop()
+ sc <- sparkR.init() # sc should get id 0
+ rdd1 <- parallelize(sc, 1:20, 2L) # rdd1 should get id 1
+ rdd2 <- parallelize(sc, 1:10, 2L) # rdd2 should get id 2
+ sparkR.stop()
+
+ sc <- sparkR.init() # sc should get id 0 again
+
+ # GC rdd1 before creating rdd3 and rdd2 after
+ rm(rdd1)
+ gc()
+
+ rdd3 <- parallelize(sc, 1:20, 2L) # rdd3 should get id 1 now
+ rdd4 <- parallelize(sc, 1:10, 2L) # rdd4 should get id 2 now
+
+ rm(rdd2)
+ gc()
+
+ count(rdd3)
+ count(rdd4)
+})
diff --git a/R/pkg/inst/tests/test_includePackage.R b/R/pkg/inst/tests/test_includePackage.R
new file mode 100644
index 0000000000..8152b448d0
--- /dev/null
+++ b/R/pkg/inst/tests/test_includePackage.R
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("include R packages")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+# Partitioned data
+nums <- 1:2
+rdd <- parallelize(sc, nums, 2L)
+
+test_that("include inside function", {
+ # Only run the test if plyr is installed.
+ if ("plyr" %in% rownames(installed.packages())) {
+ suppressPackageStartupMessages(library(plyr))
+ generateData <- function(x) {
+ suppressPackageStartupMessages(library(plyr))
+ attach(airquality)
+ result <- transform(Ozone, logOzone = log(Ozone))
+ result
+ }
+
+ data <- lapplyPartition(rdd, generateData)
+ actual <- collect(data)
+ }
+})
+
+test_that("use include package", {
+ # Only run the test if plyr is installed.
+ if ("plyr" %in% rownames(installed.packages())) {
+ suppressPackageStartupMessages(library(plyr))
+ generateData <- function(x) {
+ attach(airquality)
+ result <- transform(Ozone, logOzone = log(Ozone))
+ result
+ }
+
+ includePackage(sc, plyr)
+ data <- lapplyPartition(rdd, generateData)
+ actual <- collect(data)
+ }
+})
diff --git a/R/pkg/inst/tests/test_parallelize_collect.R b/R/pkg/inst/tests/test_parallelize_collect.R
new file mode 100644
index 0000000000..fff028657d
--- /dev/null
+++ b/R/pkg/inst/tests/test_parallelize_collect.R
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("parallelize() and collect()")
+
+# Mock data
+numVector <- c(-10:97)
+numList <- list(sqrt(1), sqrt(2), sqrt(3), 4 ** 10)
+strVector <- c("Dexter Morgan: I suppose I should be upset, even feel",
+ "violated, but I'm not. No, in fact, I think this is a friendly",
+ "message, like \"Hey, wanna play?\" and yes, I want to play. ",
+ "I really, really do.")
+strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge, ",
+ "other times it helps me control the chaos.",
+ "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ",
+ "raising me. But they're both dead now. I didn't kill them. Honest.")
+
+numPairs <- list(list(1, 1), list(1, 2), list(2, 2), list(2, 3))
+strPairs <- list(list(strList, strList), list(strList, strList))
+
+# JavaSparkContext handle
+jsc <- sparkR.init()
+
+# Tests
+
+test_that("parallelize() on simple vectors and lists returns an RDD", {
+ numVectorRDD <- parallelize(jsc, numVector, 1)
+ numVectorRDD2 <- parallelize(jsc, numVector, 10)
+ numListRDD <- parallelize(jsc, numList, 1)
+ numListRDD2 <- parallelize(jsc, numList, 4)
+ strVectorRDD <- parallelize(jsc, strVector, 2)
+ strVectorRDD2 <- parallelize(jsc, strVector, 3)
+ strListRDD <- parallelize(jsc, strList, 4)
+ strListRDD2 <- parallelize(jsc, strList, 1)
+
+ rdds <- c(numVectorRDD,
+ numVectorRDD2,
+ numListRDD,
+ numListRDD2,
+ strVectorRDD,
+ strVectorRDD2,
+ strListRDD,
+ strListRDD2)
+
+ for (rdd in rdds) {
+ expect_true(inherits(rdd, "RDD"))
+ expect_true(.hasSlot(rdd, "jrdd")
+ && inherits(rdd@jrdd, "jobj")
+ && isInstanceOf(rdd@jrdd, "org.apache.spark.api.java.JavaRDD"))
+ }
+})
+
+test_that("collect(), following a parallelize(), gives back the original collections", {
+ numVectorRDD <- parallelize(jsc, numVector, 10)
+ expect_equal(collect(numVectorRDD), as.list(numVector))
+
+ numListRDD <- parallelize(jsc, numList, 1)
+ numListRDD2 <- parallelize(jsc, numList, 4)
+ expect_equal(collect(numListRDD), as.list(numList))
+ expect_equal(collect(numListRDD2), as.list(numList))
+
+ strVectorRDD <- parallelize(jsc, strVector, 2)
+ strVectorRDD2 <- parallelize(jsc, strVector, 3)
+ expect_equal(collect(strVectorRDD), as.list(strVector))
+ expect_equal(collect(strVectorRDD2), as.list(strVector))
+
+ strListRDD <- parallelize(jsc, strList, 4)
+ strListRDD2 <- parallelize(jsc, strList, 1)
+ expect_equal(collect(strListRDD), as.list(strList))
+ expect_equal(collect(strListRDD2), as.list(strList))
+})
+
+test_that("regression: collect() following a parallelize() does not drop elements", {
+ # 10 %/% 6 = 1, ceiling(10 / 6) = 2
+ collLen <- 10
+ numPart <- 6
+ expected <- runif(collLen)
+ actual <- collect(parallelize(jsc, expected, numPart))
+ expect_equal(actual, as.list(expected))
+})
+
+test_that("parallelize() and collect() work for lists of pairs (pairwise data)", {
+ # use the pairwise logical to indicate pairwise data
+ numPairsRDDD1 <- parallelize(jsc, numPairs, 1)
+ numPairsRDDD2 <- parallelize(jsc, numPairs, 2)
+ numPairsRDDD3 <- parallelize(jsc, numPairs, 3)
+ expect_equal(collect(numPairsRDDD1), numPairs)
+ expect_equal(collect(numPairsRDDD2), numPairs)
+ expect_equal(collect(numPairsRDDD3), numPairs)
+ # can also leave out the parameter name, if the params are supplied in order
+ strPairsRDDD1 <- parallelize(jsc, strPairs, 1)
+ strPairsRDDD2 <- parallelize(jsc, strPairs, 2)
+ expect_equal(collect(strPairsRDDD1), strPairs)
+ expect_equal(collect(strPairsRDDD2), strPairs)
+})
diff --git a/R/pkg/inst/tests/test_rdd.R b/R/pkg/inst/tests/test_rdd.R
new file mode 100644
index 0000000000..f75e0817b9
--- /dev/null
+++ b/R/pkg/inst/tests/test_rdd.R
@@ -0,0 +1,644 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("basic RDD functions")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+# Data
+nums <- 1:10
+rdd <- parallelize(sc, nums, 2L)
+
+intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
+intRdd <- parallelize(sc, intPairs, 2L)
+
+test_that("get number of partitions in RDD", {
+ expect_equal(numPartitions(rdd), 2)
+ expect_equal(numPartitions(intRdd), 2)
+})
+
+test_that("first on RDD", {
+ expect_true(first(rdd) == 1)
+ newrdd <- lapply(rdd, function(x) x + 1)
+ expect_true(first(newrdd) == 2)
+})
+
+test_that("count and length on RDD", {
+ expect_equal(count(rdd), 10)
+ expect_equal(length(rdd), 10)
+})
+
+test_that("count by values and keys", {
+ mods <- lapply(rdd, function(x) { x %% 3 })
+ actual <- countByValue(mods)
+ expected <- list(list(0, 3L), list(1, 4L), list(2, 3L))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ actual <- countByKey(intRdd)
+ expected <- list(list(2L, 2L), list(1L, 2L))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("lapply on RDD", {
+ multiples <- lapply(rdd, function(x) { 2 * x })
+ actual <- collect(multiples)
+ expect_equal(actual, as.list(nums * 2))
+})
+
+test_that("lapplyPartition on RDD", {
+ sums <- lapplyPartition(rdd, function(part) { sum(unlist(part)) })
+ actual <- collect(sums)
+ expect_equal(actual, list(15, 40))
+})
+
+test_that("mapPartitions on RDD", {
+ sums <- mapPartitions(rdd, function(part) { sum(unlist(part)) })
+ actual <- collect(sums)
+ expect_equal(actual, list(15, 40))
+})
+
+test_that("flatMap() on RDDs", {
+ flat <- flatMap(intRdd, function(x) { list(x, x) })
+ actual <- collect(flat)
+ expect_equal(actual, rep(intPairs, each=2))
+})
+
+test_that("filterRDD on RDD", {
+ filtered.rdd <- filterRDD(rdd, function(x) { x %% 2 == 0 })
+ actual <- collect(filtered.rdd)
+ expect_equal(actual, list(2, 4, 6, 8, 10))
+
+ filtered.rdd <- Filter(function(x) { x[[2]] < 0 }, intRdd)
+ actual <- collect(filtered.rdd)
+ expect_equal(actual, list(list(1L, -1)))
+
+ # Filter out all elements.
+ filtered.rdd <- filterRDD(rdd, function(x) { x > 10 })
+ actual <- collect(filtered.rdd)
+ expect_equal(actual, list())
+})
+
+test_that("lookup on RDD", {
+ vals <- lookup(intRdd, 1L)
+ expect_equal(vals, list(-1, 200))
+
+ vals <- lookup(intRdd, 3L)
+ expect_equal(vals, list())
+})
+
+test_that("several transformations on RDD (a benchmark on PipelinedRDD)", {
+ rdd2 <- rdd
+ for (i in 1:12)
+ rdd2 <- lapplyPartitionsWithIndex(
+ rdd2, function(split, part) {
+ part <- as.list(unlist(part) * split + i)
+ })
+ rdd2 <- lapply(rdd2, function(x) x + x)
+ actual <- collect(rdd2)
+ expected <- list(24, 24, 24, 24, 24,
+ 168, 170, 172, 174, 176)
+ expect_equal(actual, expected)
+})
+
+test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkpoint()", {
+ # RDD
+ rdd2 <- rdd
+ # PipelinedRDD
+ rdd2 <- lapplyPartitionsWithIndex(
+ rdd2,
+ function(split, part) {
+ part <- as.list(unlist(part) * split)
+ })
+
+ cache(rdd2)
+ expect_true(rdd2@env$isCached)
+ rdd2 <- lapply(rdd2, function(x) x)
+ expect_false(rdd2@env$isCached)
+
+ unpersist(rdd2)
+ expect_false(rdd2@env$isCached)
+
+ persist(rdd2, "MEMORY_AND_DISK")
+ expect_true(rdd2@env$isCached)
+ rdd2 <- lapply(rdd2, function(x) x)
+ expect_false(rdd2@env$isCached)
+
+ unpersist(rdd2)
+ expect_false(rdd2@env$isCached)
+
+ setCheckpointDir(sc, "checkpoints")
+ checkpoint(rdd2)
+ expect_true(rdd2@env$isCheckpointed)
+
+ rdd2 <- lapply(rdd2, function(x) x)
+ expect_false(rdd2@env$isCached)
+ expect_false(rdd2@env$isCheckpointed)
+
+ # make sure the data is collectable
+ collect(rdd2)
+
+ unlink("checkpoints")
+})
+
+test_that("reduce on RDD", {
+ sum <- reduce(rdd, "+")
+ expect_equal(sum, 55)
+
+ # Also test with an inline function
+ sumInline <- reduce(rdd, function(x, y) { x + y })
+ expect_equal(sumInline, 55)
+})
+
+test_that("lapply with dependency", {
+ fa <- 5
+ multiples <- lapply(rdd, function(x) { fa * x })
+ actual <- collect(multiples)
+
+ expect_equal(actual, as.list(nums * 5))
+})
+
+test_that("lapplyPartitionsWithIndex on RDDs", {
+ func <- function(splitIndex, part) { list(splitIndex, Reduce("+", part)) }
+ actual <- collect(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
+ expect_equal(actual, list(list(0, 15), list(1, 40)))
+
+ pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L)
+ partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 }
+ mkTup <- function(splitIndex, part) { list(splitIndex, part) }
+ actual <- collect(lapplyPartitionsWithIndex(
+ partitionBy(pairsRDD, 2L, partitionByParity),
+ mkTup),
+ FALSE)
+ expect_equal(actual, list(list(0, list(list(1, 2), list(3, 4))),
+ list(1, list(list(4, 8)))))
+})
+
+test_that("sampleRDD() on RDDs", {
+ expect_equal(unlist(collect(sampleRDD(rdd, FALSE, 1.0, 2014L))), nums)
+})
+
+test_that("takeSample() on RDDs", {
+ # ported from RDDSuite.scala, modified seeds
+ data <- parallelize(sc, 1:100, 2L)
+ for (seed in 4:5) {
+ s <- takeSample(data, FALSE, 20L, seed)
+ expect_equal(length(s), 20L)
+ expect_equal(length(unique(s)), 20L)
+ for (elem in s) {
+ expect_true(elem >= 1 && elem <= 100)
+ }
+ }
+ for (seed in 4:5) {
+ s <- takeSample(data, FALSE, 200L, seed)
+ expect_equal(length(s), 100L)
+ expect_equal(length(unique(s)), 100L)
+ for (elem in s) {
+ expect_true(elem >= 1 && elem <= 100)
+ }
+ }
+ for (seed in 4:5) {
+ s <- takeSample(data, TRUE, 20L, seed)
+ expect_equal(length(s), 20L)
+ for (elem in s) {
+ expect_true(elem >= 1 && elem <= 100)
+ }
+ }
+ for (seed in 4:5) {
+ s <- takeSample(data, TRUE, 100L, seed)
+ expect_equal(length(s), 100L)
+ # Chance of getting all distinct elements is astronomically low, so test we
+ # got < 100
+ expect_true(length(unique(s)) < 100L)
+ }
+ for (seed in 4:5) {
+ s <- takeSample(data, TRUE, 200L, seed)
+ expect_equal(length(s), 200L)
+ # Chance of getting all distinct elements is still quite low, so test we
+ # got < 100
+ expect_true(length(unique(s)) < 100L)
+ }
+})
+
+test_that("mapValues() on pairwise RDDs", {
+ multiples <- mapValues(intRdd, function(x) { x * 2 })
+ actual <- collect(multiples)
+ expected <- lapply(intPairs, function(x) {
+ list(x[[1]], x[[2]] * 2)
+ })
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("flatMapValues() on pairwise RDDs", {
+ l <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
+ actual <- collect(flatMapValues(l, function(x) { x }))
+ expect_equal(actual, list(list(1,1), list(1,2), list(2,3), list(2,4)))
+
+ # Generate x to x+1 for every value
+ actual <- collect(flatMapValues(intRdd, function(x) { x:(x + 1) }))
+ expect_equal(actual,
+ list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101),
+ list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201)))
+})
+
+test_that("reduceByKeyLocally() on PairwiseRDDs", {
+ pairs <- parallelize(sc, list(list(1, 2), list(1.1, 3), list(1, 4)), 2L)
+ actual <- reduceByKeyLocally(pairs, "+")
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list(1, 6), list(1.1, 3))))
+
+ pairs <- parallelize(sc, list(list("abc", 1.2), list(1.1, 0), list("abc", 1.3),
+ list("bb", 5)), 4L)
+ actual <- reduceByKeyLocally(pairs, "+")
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list("abc", 2.5), list(1.1, 0), list("bb", 5))))
+})
+
+test_that("distinct() on RDDs", {
+ nums.rep2 <- rep(1:10, 2)
+ rdd.rep2 <- parallelize(sc, nums.rep2, 2L)
+ uniques <- distinct(rdd.rep2)
+ actual <- sort(unlist(collect(uniques)))
+ expect_equal(actual, nums)
+})
+
+test_that("maximum() on RDDs", {
+ max <- maximum(rdd)
+ expect_equal(max, 10)
+})
+
+test_that("minimum() on RDDs", {
+ min <- minimum(rdd)
+ expect_equal(min, 1)
+})
+
+test_that("sumRDD() on RDDs", {
+ sum <- sumRDD(rdd)
+ expect_equal(sum, 55)
+})
+
+test_that("keyBy on RDDs", {
+ func <- function(x) { x*x }
+ keys <- keyBy(rdd, func)
+ actual <- collect(keys)
+ expect_equal(actual, lapply(nums, function(x) { list(func(x), x) }))
+})
+
+test_that("repartition/coalesce on RDDs", {
+ rdd <- parallelize(sc, 1:20, 4L) # each partition contains 5 elements
+
+ # repartition
+ r1 <- repartition(rdd, 2)
+ expect_equal(numPartitions(r1), 2L)
+ count <- length(collectPartition(r1, 0L))
+ expect_true(count >= 8 && count <= 12)
+
+ r2 <- repartition(rdd, 6)
+ expect_equal(numPartitions(r2), 6L)
+ count <- length(collectPartition(r2, 0L))
+ expect_true(count >=0 && count <= 4)
+
+ # coalesce
+ r3 <- coalesce(rdd, 1)
+ expect_equal(numPartitions(r3), 1L)
+ count <- length(collectPartition(r3, 0L))
+ expect_equal(count, 20)
+})
+
+test_that("sortBy() on RDDs", {
+ sortedRdd <- sortBy(rdd, function(x) { x * x }, ascending = FALSE)
+ actual <- collect(sortedRdd)
+ expect_equal(actual, as.list(sort(nums, decreasing = TRUE)))
+
+ rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
+ sortedRdd2 <- sortBy(rdd2, function(x) { x * x })
+ actual <- collect(sortedRdd2)
+ expect_equal(actual, as.list(nums))
+})
+
+test_that("takeOrdered() on RDDs", {
+ l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
+ rdd <- parallelize(sc, l)
+ actual <- takeOrdered(rdd, 6L)
+ expect_equal(actual, as.list(sort(unlist(l)))[1:6])
+
+ l <- list("e", "d", "c", "d", "a")
+ rdd <- parallelize(sc, l)
+ actual <- takeOrdered(rdd, 3L)
+ expect_equal(actual, as.list(sort(unlist(l)))[1:3])
+})
+
+test_that("top() on RDDs", {
+ l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
+ rdd <- parallelize(sc, l)
+ actual <- top(rdd, 6L)
+ expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:6])
+
+ l <- list("e", "d", "c", "d", "a")
+ rdd <- parallelize(sc, l)
+ actual <- top(rdd, 3L)
+ expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:3])
+})
+
+test_that("fold() on RDDs", {
+ actual <- fold(rdd, 0, "+")
+ expect_equal(actual, Reduce("+", nums, 0))
+
+ rdd <- parallelize(sc, list())
+ actual <- fold(rdd, 0, "+")
+ expect_equal(actual, 0)
+})
+
+test_that("aggregateRDD() on RDDs", {
+ rdd <- parallelize(sc, list(1, 2, 3, 4))
+ zeroValue <- list(0, 0)
+ seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+ combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+ actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
+ expect_equal(actual, list(10, 4))
+
+ rdd <- parallelize(sc, list())
+ actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
+ expect_equal(actual, list(0, 0))
+})
+
+test_that("zipWithUniqueId() on RDDs", {
+ rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
+ actual <- collect(zipWithUniqueId(rdd))
+ expected <- list(list("a", 0), list("b", 3), list("c", 1),
+ list("d", 4), list("e", 2))
+ expect_equal(actual, expected)
+
+ rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
+ actual <- collect(zipWithUniqueId(rdd))
+ expected <- list(list("a", 0), list("b", 1), list("c", 2),
+ list("d", 3), list("e", 4))
+ expect_equal(actual, expected)
+})
+
+test_that("zipWithIndex() on RDDs", {
+ rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
+ actual <- collect(zipWithIndex(rdd))
+ expected <- list(list("a", 0), list("b", 1), list("c", 2),
+ list("d", 3), list("e", 4))
+ expect_equal(actual, expected)
+
+ rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
+ actual <- collect(zipWithIndex(rdd))
+ expected <- list(list("a", 0), list("b", 1), list("c", 2),
+ list("d", 3), list("e", 4))
+ expect_equal(actual, expected)
+})
+
+test_that("glom() on RDD", {
+ rdd <- parallelize(sc, as.list(1:4), 2L)
+ actual <- collect(glom(rdd))
+ expect_equal(actual, list(list(1, 2), list(3, 4)))
+})
+
+test_that("keys() on RDDs", {
+ keys <- keys(intRdd)
+ actual <- collect(keys)
+ expect_equal(actual, lapply(intPairs, function(x) { x[[1]] }))
+})
+
+test_that("values() on RDDs", {
+ values <- values(intRdd)
+ actual <- collect(values)
+ expect_equal(actual, lapply(intPairs, function(x) { x[[2]] }))
+})
+
+test_that("pipeRDD() on RDDs", {
+ actual <- collect(pipeRDD(rdd, "more"))
+ expected <- as.list(as.character(1:10))
+ expect_equal(actual, expected)
+
+ trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n"))
+ actual <- collect(pipeRDD(trailed.rdd, "sort"))
+ expected <- list("", "1", "2", "3")
+ expect_equal(actual, expected)
+
+ rev.nums <- 9:0
+ rev.rdd <- parallelize(sc, rev.nums, 2L)
+ actual <- collect(pipeRDD(rev.rdd, "sort"))
+ expected <- as.list(as.character(c(5:9, 0:4)))
+ expect_equal(actual, expected)
+})
+
+test_that("zipRDD() on RDDs", {
+ rdd1 <- parallelize(sc, 0:4, 2)
+ rdd2 <- parallelize(sc, 1000:1004, 2)
+ actual <- collect(zipRDD(rdd1, rdd2))
+ expect_equal(actual,
+ list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004)))
+
+ mockFile = c("Spark is pretty.", "Spark is awesome.")
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName)
+
+ rdd <- textFile(sc, fileName, 1)
+ actual <- collect(zipRDD(rdd, rdd))
+ expected <- lapply(mockFile, function(x) { list(x ,x) })
+ expect_equal(actual, expected)
+
+ rdd1 <- parallelize(sc, 0:1, 1)
+ actual <- collect(zipRDD(rdd1, rdd))
+ expected <- lapply(0:1, function(x) { list(x, mockFile[x + 1]) })
+ expect_equal(actual, expected)
+
+ rdd1 <- map(rdd, function(x) { x })
+ actual <- collect(zipRDD(rdd, rdd1))
+ expected <- lapply(mockFile, function(x) { list(x, x) })
+ expect_equal(actual, expected)
+
+ unlink(fileName)
+})
+
+test_that("join() on pairwise RDDs", {
+ rdd1 <- parallelize(sc, list(list(1,1), list(2,4)))
+ rdd2 <- parallelize(sc, list(list(1,2), list(1,3)))
+ actual <- collect(join(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list(1, list(1, 2)), list(1, list(1, 3)))))
+
+ rdd1 <- parallelize(sc, list(list("a",1), list("b",4)))
+ rdd2 <- parallelize(sc, list(list("a",2), list("a",3)))
+ actual <- collect(join(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list("a", list(1, 2)), list("a", list(1, 3)))))
+
+ rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
+ rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
+ actual <- collect(join(rdd1, rdd2, 2L))
+ expect_equal(actual, list())
+
+ rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
+ rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
+ actual <- collect(join(rdd1, rdd2, 2L))
+ expect_equal(actual, list())
+})
+
+test_that("leftOuterJoin() on pairwise RDDs", {
+ rdd1 <- parallelize(sc, list(list(1,1), list(2,4)))
+ rdd2 <- parallelize(sc, list(list(1,2), list(1,3)))
+ actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list("a",1), list("b",4)))
+ rdd2 <- parallelize(sc, list(list("a",2), list("a",3)))
+ actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list("b", list(4, NULL)), list("a", list(1, 2)), list("a", list(1, 3)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
+ rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
+ actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list(1, list(1, NULL)), list(2, list(2, NULL)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
+ rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
+ actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list("b", list(2, NULL)), list("a", list(1, NULL)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+})
+
+test_that("rightOuterJoin() on pairwise RDDs", {
+ rdd1 <- parallelize(sc, list(list(1,2), list(1,3)))
+ rdd2 <- parallelize(sc, list(list(1,1), list(2,4)))
+ actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list("a",2), list("a",3)))
+ rdd2 <- parallelize(sc, list(list("a",1), list("b",4)))
+ actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
+ rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
+ actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
+
+ rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
+ rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
+ actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
+})
+
+test_that("fullOuterJoin() on pairwise RDDs", {
+ rdd1 <- parallelize(sc, list(list(1,2), list(1,3), list(3,3)))
+ rdd2 <- parallelize(sc, list(list(1,1), list(2,4)))
+ actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)), list(3, list(3, NULL)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list("a",2), list("a",3), list("c", 1)))
+ rdd2 <- parallelize(sc, list(list("a",1), list("b",4)))
+ actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)), list("c", list(1, NULL)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
+ rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
+ actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)), list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
+
+ rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
+ rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
+ actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)), list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
+})
+
+test_that("sortByKey() on pairwise RDDs", {
+ numPairsRdd <- map(rdd, function(x) { list (x, x) })
+ sortedRdd <- sortByKey(numPairsRdd, ascending = FALSE)
+ actual <- collect(sortedRdd)
+ numPairs <- lapply(nums, function(x) { list (x, x) })
+ expect_equal(actual, sortKeyValueList(numPairs, decreasing = TRUE))
+
+ rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
+ numPairsRdd2 <- map(rdd2, function(x) { list (x, x) })
+ sortedRdd2 <- sortByKey(numPairsRdd2)
+ actual <- collect(sortedRdd2)
+ expect_equal(actual, numPairs)
+
+ # sort by string keys
+ l <- list(list("a", 1), list("b", 2), list("1", 3), list("d", 4), list("2", 5))
+ rdd3 <- parallelize(sc, l, 2L)
+ sortedRdd3 <- sortByKey(rdd3)
+ actual <- collect(sortedRdd3)
+ expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
+
+ # test on the boundary cases
+
+ # boundary case 1: the RDD to be sorted has only 1 partition
+ rdd4 <- parallelize(sc, l, 1L)
+ sortedRdd4 <- sortByKey(rdd4)
+ actual <- collect(sortedRdd4)
+ expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
+
+ # boundary case 2: the sorted RDD has only 1 partition
+ rdd5 <- parallelize(sc, l, 2L)
+ sortedRdd5 <- sortByKey(rdd5, numPartitions = 1L)
+ actual <- collect(sortedRdd5)
+ expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
+
+ # boundary case 3: the RDD to be sorted has only 1 element
+ l2 <- list(list("a", 1))
+ rdd6 <- parallelize(sc, l2, 2L)
+ sortedRdd6 <- sortByKey(rdd6)
+ actual <- collect(sortedRdd6)
+ expect_equal(actual, l2)
+
+ # boundary case 4: the RDD to be sorted has 0 element
+ l3 <- list()
+ rdd7 <- parallelize(sc, l3, 2L)
+ sortedRdd7 <- sortByKey(rdd7)
+ actual <- collect(sortedRdd7)
+ expect_equal(actual, l3)
+})
+
+test_that("collectAsMap() on a pairwise RDD", {
+ rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
+ vals <- collectAsMap(rdd)
+ expect_equal(vals, list(`1` = 2, `3` = 4))
+
+ rdd <- parallelize(sc, list(list("a", 1), list("b", 2)))
+ vals <- collectAsMap(rdd)
+ expect_equal(vals, list(a = 1, b = 2))
+
+ rdd <- parallelize(sc, list(list(1.1, 2.2), list(1.2, 2.4)))
+ vals <- collectAsMap(rdd)
+ expect_equal(vals, list(`1.1` = 2.2, `1.2` = 2.4))
+
+ rdd <- parallelize(sc, list(list(1, "a"), list(2, "b")))
+ vals <- collectAsMap(rdd)
+ expect_equal(vals, list(`1` = "a", `2` = "b"))
+})
diff --git a/R/pkg/inst/tests/test_shuffle.R b/R/pkg/inst/tests/test_shuffle.R
new file mode 100644
index 0000000000..d1da8232ae
--- /dev/null
+++ b/R/pkg/inst/tests/test_shuffle.R
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("partitionBy, groupByKey, reduceByKey etc.")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+# Data
+intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
+intRdd <- parallelize(sc, intPairs, 2L)
+
+doublePairs <- list(list(1.5, -1), list(2.5, 100), list(2.5, 1), list(1.5, 200))
+doubleRdd <- parallelize(sc, doublePairs, 2L)
+
+numPairs <- list(list(1L, 100), list(2L, 200), list(4L, -1), list(3L, 1),
+ list(3L, 0))
+numPairsRdd <- parallelize(sc, numPairs, length(numPairs))
+
+strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge and ",
+ "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ")
+strListRDD <- parallelize(sc, strList, 4)
+
+test_that("groupByKey for integers", {
+ grouped <- groupByKey(intRdd, 2L)
+
+ actual <- collect(grouped)
+
+ expected <- list(list(2L, list(100, 1)), list(1L, list(-1, 200)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("groupByKey for doubles", {
+ grouped <- groupByKey(doubleRdd, 2L)
+
+ actual <- collect(grouped)
+
+ expected <- list(list(1.5, list(-1, 200)), list(2.5, list(100, 1)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("reduceByKey for ints", {
+ reduced <- reduceByKey(intRdd, "+", 2L)
+
+ actual <- collect(reduced)
+
+ expected <- list(list(2L, 101), list(1L, 199))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("reduceByKey for doubles", {
+ reduced <- reduceByKey(doubleRdd, "+", 2L)
+ actual <- collect(reduced)
+
+ expected <- list(list(1.5, 199), list(2.5, 101))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("combineByKey for ints", {
+ reduced <- combineByKey(intRdd, function(x) { x }, "+", "+", 2L)
+
+ actual <- collect(reduced)
+
+ expected <- list(list(2L, 101), list(1L, 199))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("combineByKey for doubles", {
+ reduced <- combineByKey(doubleRdd, function(x) { x }, "+", "+", 2L)
+ actual <- collect(reduced)
+
+ expected <- list(list(1.5, 199), list(2.5, 101))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("aggregateByKey", {
+ # test aggregateByKey for int keys
+ rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
+
+ zeroValue <- list(0, 0)
+ seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+ combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+ aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
+
+ actual <- collect(aggregatedRDD)
+
+ expected <- list(list(1, list(3, 2)), list(2, list(7, 2)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ # test aggregateByKey for string keys
+ rdd <- parallelize(sc, list(list("a", 1), list("a", 2), list("b", 3), list("b", 4)))
+
+ zeroValue <- list(0, 0)
+ seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+ combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+ aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
+
+ actual <- collect(aggregatedRDD)
+
+ expected <- list(list("a", list(3, 2)), list("b", list(7, 2)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("foldByKey", {
+ # test foldByKey for int keys
+ folded <- foldByKey(intRdd, 0, "+", 2L)
+
+ actual <- collect(folded)
+
+ expected <- list(list(2L, 101), list(1L, 199))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ # test foldByKey for double keys
+ folded <- foldByKey(doubleRdd, 0, "+", 2L)
+
+ actual <- collect(folded)
+
+ expected <- list(list(1.5, 199), list(2.5, 101))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ # test foldByKey for string keys
+ stringKeyPairs <- list(list("a", -1), list("b", 100), list("b", 1), list("a", 200))
+
+ stringKeyRDD <- parallelize(sc, stringKeyPairs)
+ folded <- foldByKey(stringKeyRDD, 0, "+", 2L)
+
+ actual <- collect(folded)
+
+ expected <- list(list("b", 101), list("a", 199))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ # test foldByKey for empty pair RDD
+ rdd <- parallelize(sc, list())
+ folded <- foldByKey(rdd, 0, "+", 2L)
+ actual <- collect(folded)
+ expected <- list()
+ expect_equal(actual, expected)
+
+ # test foldByKey for RDD with only 1 pair
+ rdd <- parallelize(sc, list(list(1, 1)))
+ folded <- foldByKey(rdd, 0, "+", 2L)
+ actual <- collect(folded)
+ expected <- list(list(1, 1))
+ expect_equal(actual, expected)
+})
+
+test_that("partitionBy() partitions data correctly", {
+ # Partition by magnitude
+ partitionByMagnitude <- function(key) { if (key >= 3) 1 else 0 }
+
+ resultRDD <- partitionBy(numPairsRdd, 2L, partitionByMagnitude)
+
+ expected_first <- list(list(1, 100), list(2, 200)) # key < 3
+ expected_second <- list(list(4, -1), list(3, 1), list(3, 0)) # key >= 3
+ actual_first <- collectPartition(resultRDD, 0L)
+ actual_second <- collectPartition(resultRDD, 1L)
+
+ expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
+ expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
+})
+
+test_that("partitionBy works with dependencies", {
+ kOne <- 1
+ partitionByParity <- function(key) { if (key %% 2 == kOne) 7 else 4 }
+
+ # Partition by parity
+ resultRDD <- partitionBy(numPairsRdd, numPartitions = 2L, partitionByParity)
+
+ # keys even; 100 %% 2 == 0
+ expected_first <- list(list(2, 200), list(4, -1))
+ # keys odd; 3 %% 2 == 1
+ expected_second <- list(list(1, 100), list(3, 1), list(3, 0))
+ actual_first <- collectPartition(resultRDD, 0L)
+ actual_second <- collectPartition(resultRDD, 1L)
+
+ expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
+ expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
+})
+
+test_that("test partitionBy with string keys", {
+ words <- flatMap(strListRDD, function(line) { strsplit(line, " ")[[1]] })
+ wordCount <- lapply(words, function(word) { list(word, 1L) })
+
+ resultRDD <- partitionBy(wordCount, 2L)
+ expected_first <- list(list("Dexter", 1), list("Dexter", 1))
+ expected_second <- list(list("and", 1), list("and", 1))
+
+ actual_first <- Filter(function(item) { item[[1]] == "Dexter" },
+ collectPartition(resultRDD, 0L))
+ actual_second <- Filter(function(item) { item[[1]] == "and" },
+ collectPartition(resultRDD, 1L))
+
+ expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
+ expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
+})
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
new file mode 100644
index 0000000000..cf5cf6d169
--- /dev/null
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -0,0 +1,695 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("SparkSQL functions")
+
+# Tests for SparkSQL functions in SparkR
+
+sc <- sparkR.init()
+
+sqlCtx <- sparkRSQL.init(sc)
+
+mockLines <- c("{\"name\":\"Michael\"}",
+ "{\"name\":\"Andy\", \"age\":30}",
+ "{\"name\":\"Justin\", \"age\":19}")
+jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
+parquetPath <- tempfile(pattern="sparkr-test", fileext=".parquet")
+writeLines(mockLines, jsonPath)
+
+test_that("infer types", {
+ expect_equal(infer_type(1L), "integer")
+ expect_equal(infer_type(1.0), "double")
+ expect_equal(infer_type("abc"), "string")
+ expect_equal(infer_type(TRUE), "boolean")
+ expect_equal(infer_type(as.Date("2015-03-11")), "date")
+ expect_equal(infer_type(as.POSIXlt("2015-03-11 12:13:04.043")), "timestamp")
+ expect_equal(infer_type(c(1L, 2L)),
+ list(type = 'array', elementType = "integer", containsNull = TRUE))
+ expect_equal(infer_type(list(1L, 2L)),
+ list(type = 'array', elementType = "integer", containsNull = TRUE))
+ expect_equal(infer_type(list(a = 1L, b = "2")),
+ list(type = "struct",
+ fields = list(list(name = "a", type = "integer", nullable = TRUE),
+ list(name = "b", type = "string", nullable = TRUE))))
+ e <- new.env()
+ assign("a", 1L, envir = e)
+ expect_equal(infer_type(e),
+ list(type = "map", keyType = "string", valueType = "integer",
+ valueContainsNull = TRUE))
+})
+
+test_that("create DataFrame from RDD", {
+ rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) })
+ df <- createDataFrame(sqlCtx, rdd, list("a", "b"))
+ expect_true(inherits(df, "DataFrame"))
+ expect_true(count(df) == 10)
+ expect_equal(columns(df), c("a", "b"))
+ expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
+
+ df <- createDataFrame(sqlCtx, rdd)
+ expect_true(inherits(df, "DataFrame"))
+ expect_equal(columns(df), c("_1", "_2"))
+
+ fields <- list(list(name = "a", type = "integer", nullable = TRUE),
+ list(name = "b", type = "string", nullable = TRUE))
+ schema <- list(type = "struct", fields = fields)
+ df <- createDataFrame(sqlCtx, rdd, schema)
+ expect_true(inherits(df, "DataFrame"))
+ expect_equal(columns(df), c("a", "b"))
+ expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
+
+ rdd <- lapply(parallelize(sc, 1:10), function(x) { list(a = x, b = as.character(x)) })
+ df <- createDataFrame(sqlCtx, rdd)
+ expect_true(inherits(df, "DataFrame"))
+ expect_true(count(df) == 10)
+ expect_equal(columns(df), c("a", "b"))
+ expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
+})
+
+test_that("toDF", {
+ rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) })
+ df <- toDF(rdd, list("a", "b"))
+ expect_true(inherits(df, "DataFrame"))
+ expect_true(count(df) == 10)
+ expect_equal(columns(df), c("a", "b"))
+ expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
+
+ df <- toDF(rdd)
+ expect_true(inherits(df, "DataFrame"))
+ expect_equal(columns(df), c("_1", "_2"))
+
+ fields <- list(list(name = "a", type = "integer", nullable = TRUE),
+ list(name = "b", type = "string", nullable = TRUE))
+ schema <- list(type = "struct", fields = fields)
+ df <- toDF(rdd, schema)
+ expect_true(inherits(df, "DataFrame"))
+ expect_equal(columns(df), c("a", "b"))
+ expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
+
+ rdd <- lapply(parallelize(sc, 1:10), function(x) { list(a = x, b = as.character(x)) })
+ df <- toDF(rdd)
+ expect_true(inherits(df, "DataFrame"))
+ expect_true(count(df) == 10)
+ expect_equal(columns(df), c("a", "b"))
+ expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
+})
+
+test_that("create DataFrame from list or data.frame", {
+ l <- list(list(1, 2), list(3, 4))
+ df <- createDataFrame(sqlCtx, l, c("a", "b"))
+ expect_equal(columns(df), c("a", "b"))
+
+ l <- list(list(a=1, b=2), list(a=3, b=4))
+ df <- createDataFrame(sqlCtx, l)
+ expect_equal(columns(df), c("a", "b"))
+
+ a <- 1:3
+ b <- c("a", "b", "c")
+ ldf <- data.frame(a, b)
+ df <- createDataFrame(sqlCtx, ldf)
+ expect_equal(columns(df), c("a", "b"))
+ expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
+ expect_equal(count(df), 3)
+ ldf2 <- collect(df)
+ expect_equal(ldf$a, ldf2$a)
+})
+
+test_that("create DataFrame with different data types", {
+ l <- list(a = 1L, b = 2, c = TRUE, d = "ss", e = as.Date("2012-12-13"),
+ f = as.POSIXct("2015-03-15 12:13:14.056"))
+ df <- createDataFrame(sqlCtx, list(l))
+ expect_equal(dtypes(df), list(c("a", "int"), c("b", "double"), c("c", "boolean"),
+ c("d", "string"), c("e", "date"), c("f", "timestamp")))
+ expect_equal(count(df), 1)
+ expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE))
+})
+
+# TODO: enable this test after fix serialization for nested object
+#test_that("create DataFrame with nested array and struct", {
+# e <- new.env()
+# assign("n", 3L, envir = e)
+# l <- list(1:10, list("a", "b"), e, list(a="aa", b=3L))
+# df <- createDataFrame(sqlCtx, list(l), c("a", "b", "c", "d"))
+# expect_equal(dtypes(df), list(c("a", "array<int>"), c("b", "array<string>"),
+# c("c", "map<string,int>"), c("d", "struct<a:string,b:int>")))
+# expect_equal(count(df), 1)
+# ldf <- collect(df)
+# expect_equal(ldf[1,], l[[1]])
+#})
+
+test_that("jsonFile() on a local file returns a DataFrame", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ expect_true(inherits(df, "DataFrame"))
+ expect_true(count(df) == 3)
+})
+
+test_that("jsonRDD() on a RDD with json string", {
+ rdd <- parallelize(sc, mockLines)
+ expect_true(count(rdd) == 3)
+ df <- jsonRDD(sqlCtx, rdd)
+ expect_true(inherits(df, "DataFrame"))
+ expect_true(count(df) == 3)
+
+ rdd2 <- flatMap(rdd, function(x) c(x, x))
+ df <- jsonRDD(sqlCtx, rdd2)
+ expect_true(inherits(df, "DataFrame"))
+ expect_true(count(df) == 6)
+})
+
+test_that("test cache, uncache and clearCache", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ registerTempTable(df, "table1")
+ cacheTable(sqlCtx, "table1")
+ uncacheTable(sqlCtx, "table1")
+ clearCache(sqlCtx)
+ dropTempTable(sqlCtx, "table1")
+})
+
+test_that("test tableNames and tables", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ registerTempTable(df, "table1")
+ expect_true(length(tableNames(sqlCtx)) == 1)
+ df <- tables(sqlCtx)
+ expect_true(count(df) == 1)
+ dropTempTable(sqlCtx, "table1")
+})
+
+test_that("registerTempTable() results in a queryable table and sql() results in a new DataFrame", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ registerTempTable(df, "table1")
+ newdf <- sql(sqlCtx, "SELECT * FROM table1 where name = 'Michael'")
+ expect_true(inherits(newdf, "DataFrame"))
+ expect_true(count(newdf) == 1)
+ dropTempTable(sqlCtx, "table1")
+})
+
+test_that("insertInto() on a registered table", {
+ df <- loadDF(sqlCtx, jsonPath, "json")
+ saveDF(df, parquetPath, "parquet", "overwrite")
+ dfParquet <- loadDF(sqlCtx, parquetPath, "parquet")
+
+ lines <- c("{\"name\":\"Bob\", \"age\":24}",
+ "{\"name\":\"James\", \"age\":35}")
+ jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".tmp")
+ parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
+ writeLines(lines, jsonPath2)
+ df2 <- loadDF(sqlCtx, jsonPath2, "json")
+ saveDF(df2, parquetPath2, "parquet", "overwrite")
+ dfParquet2 <- loadDF(sqlCtx, parquetPath2, "parquet")
+
+ registerTempTable(dfParquet, "table1")
+ insertInto(dfParquet2, "table1")
+ expect_true(count(sql(sqlCtx, "select * from table1")) == 5)
+ expect_true(first(sql(sqlCtx, "select * from table1 order by age"))$name == "Michael")
+ dropTempTable(sqlCtx, "table1")
+
+ registerTempTable(dfParquet, "table1")
+ insertInto(dfParquet2, "table1", overwrite = TRUE)
+ expect_true(count(sql(sqlCtx, "select * from table1")) == 2)
+ expect_true(first(sql(sqlCtx, "select * from table1 order by age"))$name == "Bob")
+ dropTempTable(sqlCtx, "table1")
+})
+
+test_that("table() returns a new DataFrame", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ registerTempTable(df, "table1")
+ tabledf <- table(sqlCtx, "table1")
+ expect_true(inherits(tabledf, "DataFrame"))
+ expect_true(count(tabledf) == 3)
+ dropTempTable(sqlCtx, "table1")
+})
+
+test_that("toRDD() returns an RRDD", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ testRDD <- toRDD(df)
+ expect_true(inherits(testRDD, "RDD"))
+ expect_true(count(testRDD) == 3)
+})
+
+test_that("union on two RDDs created from DataFrames returns an RRDD", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ RDD1 <- toRDD(df)
+ RDD2 <- toRDD(df)
+ unioned <- unionRDD(RDD1, RDD2)
+ expect_true(inherits(unioned, "RDD"))
+ expect_true(SparkR:::getSerializedMode(unioned) == "byte")
+ expect_true(collect(unioned)[[2]]$name == "Andy")
+})
+
+test_that("union on mixed serialization types correctly returns a byte RRDD", {
+ # Byte RDD
+ nums <- 1:10
+ rdd <- parallelize(sc, nums, 2L)
+
+ # String RDD
+ textLines <- c("Michael",
+ "Andy, 30",
+ "Justin, 19")
+ textPath <- tempfile(pattern="sparkr-textLines", fileext=".tmp")
+ writeLines(textLines, textPath)
+ textRDD <- textFile(sc, textPath)
+
+ df <- jsonFile(sqlCtx, jsonPath)
+ dfRDD <- toRDD(df)
+
+ unionByte <- unionRDD(rdd, dfRDD)
+ expect_true(inherits(unionByte, "RDD"))
+ expect_true(SparkR:::getSerializedMode(unionByte) == "byte")
+ expect_true(collect(unionByte)[[1]] == 1)
+ expect_true(collect(unionByte)[[12]]$name == "Andy")
+
+ unionString <- unionRDD(textRDD, dfRDD)
+ expect_true(inherits(unionString, "RDD"))
+ expect_true(SparkR:::getSerializedMode(unionString) == "byte")
+ expect_true(collect(unionString)[[1]] == "Michael")
+ expect_true(collect(unionString)[[5]]$name == "Andy")
+})
+
+test_that("objectFile() works with row serialization", {
+ objectPath <- tempfile(pattern="spark-test", fileext=".tmp")
+ df <- jsonFile(sqlCtx, jsonPath)
+ dfRDD <- toRDD(df)
+ saveAsObjectFile(coalesce(dfRDD, 1L), objectPath)
+ objectIn <- objectFile(sc, objectPath)
+
+ expect_true(inherits(objectIn, "RDD"))
+ expect_equal(SparkR:::getSerializedMode(objectIn), "byte")
+ expect_equal(collect(objectIn)[[2]]$age, 30)
+})
+
+test_that("lapply() on a DataFrame returns an RDD with the correct columns", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ testRDD <- lapply(df, function(row) {
+ row$newCol <- row$age + 5
+ row
+ })
+ expect_true(inherits(testRDD, "RDD"))
+ collected <- collect(testRDD)
+ expect_true(collected[[1]]$name == "Michael")
+ expect_true(collected[[2]]$newCol == "35")
+})
+
+test_that("collect() returns a data.frame", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ rdf <- collect(df)
+ expect_true(is.data.frame(rdf))
+ expect_true(names(rdf)[1] == "age")
+ expect_true(nrow(rdf) == 3)
+ expect_true(ncol(rdf) == 2)
+})
+
+test_that("limit() returns DataFrame with the correct number of rows", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ dfLimited <- limit(df, 2)
+ expect_true(inherits(dfLimited, "DataFrame"))
+ expect_true(count(dfLimited) == 2)
+})
+
+test_that("collect() and take() on a DataFrame return the same number of rows and columns", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ expect_true(nrow(collect(df)) == nrow(take(df, 10)))
+ expect_true(ncol(collect(df)) == ncol(take(df, 10)))
+})
+
+test_that("multiple pipeline transformations starting with a DataFrame result in an RDD with the correct values", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ first <- lapply(df, function(row) {
+ row$age <- row$age + 5
+ row
+ })
+ second <- lapply(first, function(row) {
+ row$testCol <- if (row$age == 35 && !is.na(row$age)) TRUE else FALSE
+ row
+ })
+ expect_true(inherits(second, "RDD"))
+ expect_true(count(second) == 3)
+ expect_true(collect(second)[[2]]$age == 35)
+ expect_true(collect(second)[[2]]$testCol)
+ expect_false(collect(second)[[3]]$testCol)
+})
+
+test_that("cache(), persist(), and unpersist() on a DataFrame", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ expect_false(df@env$isCached)
+ cache(df)
+ expect_true(df@env$isCached)
+
+ unpersist(df)
+ expect_false(df@env$isCached)
+
+ persist(df, "MEMORY_AND_DISK")
+ expect_true(df@env$isCached)
+
+ unpersist(df)
+ expect_false(df@env$isCached)
+
+ # make sure the data is collectable
+ expect_true(is.data.frame(collect(df)))
+})
+
+test_that("schema(), dtypes(), columns(), names() return the correct values/format", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ testSchema <- schema(df)
+ expect_true(length(testSchema$fields()) == 2)
+ expect_true(testSchema$fields()[[1]]$dataType.toString() == "LongType")
+ expect_true(testSchema$fields()[[2]]$dataType.simpleString() == "string")
+ expect_true(testSchema$fields()[[1]]$name() == "age")
+
+ testTypes <- dtypes(df)
+ expect_true(length(testTypes[[1]]) == 2)
+ expect_true(testTypes[[1]][1] == "age")
+
+ testCols <- columns(df)
+ expect_true(length(testCols) == 2)
+ expect_true(testCols[2] == "name")
+
+ testNames <- names(df)
+ expect_true(length(testNames) == 2)
+ expect_true(testNames[2] == "name")
+})
+
+test_that("head() and first() return the correct data", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ testHead <- head(df)
+ expect_true(nrow(testHead) == 3)
+ expect_true(ncol(testHead) == 2)
+
+ testHead2 <- head(df, 2)
+ expect_true(nrow(testHead2) == 2)
+ expect_true(ncol(testHead2) == 2)
+
+ testFirst <- first(df)
+ expect_true(nrow(testFirst) == 1)
+})
+
+test_that("distinct() on DataFrames", {
+ lines <- c("{\"name\":\"Michael\"}",
+ "{\"name\":\"Andy\", \"age\":30}",
+ "{\"name\":\"Justin\", \"age\":19}",
+ "{\"name\":\"Justin\", \"age\":19}")
+ jsonPathWithDup <- tempfile(pattern="sparkr-test", fileext=".tmp")
+ writeLines(lines, jsonPathWithDup)
+
+ df <- jsonFile(sqlCtx, jsonPathWithDup)
+ uniques <- distinct(df)
+ expect_true(inherits(uniques, "DataFrame"))
+ expect_true(count(uniques) == 3)
+})
+
+test_that("sampleDF on a DataFrame", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ sampled <- sampleDF(df, FALSE, 1.0)
+ expect_equal(nrow(collect(sampled)), count(df))
+ expect_true(inherits(sampled, "DataFrame"))
+ sampled2 <- sampleDF(df, FALSE, 0.1)
+ expect_true(count(sampled2) < 3)
+})
+
+test_that("select operators", {
+ df <- select(jsonFile(sqlCtx, jsonPath), "name", "age")
+ expect_true(inherits(df$name, "Column"))
+ expect_true(inherits(df[[2]], "Column"))
+ expect_true(inherits(df[["age"]], "Column"))
+
+ expect_true(inherits(df[,1], "DataFrame"))
+ expect_equal(columns(df[,1]), c("name"))
+ expect_equal(columns(df[,"age"]), c("age"))
+ df2 <- df[,c("age", "name")]
+ expect_true(inherits(df2, "DataFrame"))
+ expect_equal(columns(df2), c("age", "name"))
+
+ df$age2 <- df$age
+ expect_equal(columns(df), c("name", "age", "age2"))
+ expect_equal(count(where(df, df$age2 == df$age)), 2)
+ df$age2 <- df$age * 2
+ expect_equal(columns(df), c("name", "age", "age2"))
+ expect_equal(count(where(df, df$age2 == df$age * 2)), 2)
+})
+
+test_that("select with column", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ df1 <- select(df, "name")
+ expect_true(columns(df1) == c("name"))
+ expect_true(count(df1) == 3)
+
+ df2 <- select(df, df$age)
+ expect_true(columns(df2) == c("age"))
+ expect_true(count(df2) == 3)
+})
+
+test_that("selectExpr() on a DataFrame", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ selected <- selectExpr(df, "age * 2")
+ expect_true(names(selected) == "(age * 2)")
+ expect_equal(collect(selected), collect(select(df, df$age * 2L)))
+
+ selected2 <- selectExpr(df, "name as newName", "abs(age) as age")
+ expect_equal(names(selected2), c("newName", "age"))
+ expect_true(count(selected2) == 3)
+})
+
+test_that("column calculation", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ d <- collect(select(df, alias(df$age + 1, "age2")))
+ expect_true(names(d) == c("age2"))
+ df2 <- select(df, lower(df$name), abs(df$age))
+ expect_true(inherits(df2, "DataFrame"))
+ expect_true(count(df2) == 3)
+})
+
+test_that("load() from json file", {
+ df <- loadDF(sqlCtx, jsonPath, "json")
+ expect_true(inherits(df, "DataFrame"))
+ expect_true(count(df) == 3)
+})
+
+test_that("save() as parquet file", {
+ df <- loadDF(sqlCtx, jsonPath, "json")
+ saveDF(df, parquetPath, "parquet", mode="overwrite")
+ df2 <- loadDF(sqlCtx, parquetPath, "parquet")
+ expect_true(inherits(df2, "DataFrame"))
+ expect_true(count(df2) == 3)
+})
+
+test_that("test HiveContext", {
+ hiveCtx <- tryCatch({
+ newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
+ }, error = function(err) {
+ skip("Hive is not build with SparkSQL, skipped")
+ })
+ df <- createExternalTable(hiveCtx, "json", jsonPath, "json")
+ expect_true(inherits(df, "DataFrame"))
+ expect_true(count(df) == 3)
+ df2 <- sql(hiveCtx, "select * from json")
+ expect_true(inherits(df2, "DataFrame"))
+ expect_true(count(df2) == 3)
+
+ jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
+ saveAsTable(df, "json", "json", "append", path = jsonPath2)
+ df3 <- sql(hiveCtx, "select * from json")
+ expect_true(inherits(df3, "DataFrame"))
+ expect_true(count(df3) == 6)
+})
+
+test_that("column operators", {
+ c <- SparkR:::col("a")
+ c2 <- (- c + 1 - 2) * 3 / 4.0
+ c3 <- (c + c2 - c2) * c2 %% c2
+ c4 <- (c > c2) & (c2 <= c3) | (c == c2) & (c2 != c3)
+})
+
+test_that("column functions", {
+ c <- SparkR:::col("a")
+ c2 <- min(c) + max(c) + sum(c) + avg(c) + count(c) + abs(c) + sqrt(c)
+ c3 <- lower(c) + upper(c) + first(c) + last(c)
+ c4 <- approxCountDistinct(c) + countDistinct(c) + cast(c, "string")
+})
+
+test_that("string operators", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ expect_equal(count(where(df, like(df$name, "A%"))), 1)
+ expect_equal(count(where(df, startsWith(df$name, "A"))), 1)
+ expect_equal(first(select(df, substr(df$name, 1, 2)))[[1]], "Mi")
+ expect_equal(collect(select(df, cast(df$age, "string")))[[2, 1]], "30")
+})
+
+test_that("group by", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ df1 <- agg(df, name = "max", age = "sum")
+ expect_true(1 == count(df1))
+ df1 <- agg(df, age2 = max(df$age))
+ expect_true(1 == count(df1))
+ expect_equal(columns(df1), c("age2"))
+
+ gd <- groupBy(df, "name")
+ expect_true(inherits(gd, "GroupedData"))
+ df2 <- count(gd)
+ expect_true(inherits(df2, "DataFrame"))
+ expect_true(3 == count(df2))
+
+ df3 <- agg(gd, age = "sum")
+ expect_true(inherits(df3, "DataFrame"))
+ expect_true(3 == count(df3))
+
+ df3 <- agg(gd, age = sum(df$age))
+ expect_true(inherits(df3, "DataFrame"))
+ expect_true(3 == count(df3))
+ expect_equal(columns(df3), c("name", "age"))
+
+ df4 <- sum(gd, "age")
+ expect_true(inherits(df4, "DataFrame"))
+ expect_true(3 == count(df4))
+ expect_true(3 == count(mean(gd, "age")))
+ expect_true(3 == count(max(gd, "age")))
+})
+
+test_that("sortDF() and orderBy() on a DataFrame", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ sorted <- sortDF(df, df$age)
+ expect_true(collect(sorted)[1,2] == "Michael")
+
+ sorted2 <- sortDF(df, "name")
+ expect_true(collect(sorted2)[2,"age"] == 19)
+
+ sorted3 <- orderBy(df, asc(df$age))
+ expect_true(is.na(first(sorted3)$age))
+ expect_true(collect(sorted3)[2, "age"] == 19)
+
+ sorted4 <- orderBy(df, desc(df$name))
+ expect_true(first(sorted4)$name == "Michael")
+ expect_true(collect(sorted4)[3,"name"] == "Andy")
+})
+
+test_that("filter() on a DataFrame", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ filtered <- filter(df, "age > 20")
+ expect_true(count(filtered) == 1)
+ expect_true(collect(filtered)$name == "Andy")
+ filtered2 <- where(df, df$name != "Michael")
+ expect_true(count(filtered2) == 2)
+ expect_true(collect(filtered2)$age[2] == 19)
+})
+
+test_that("join() on a DataFrame", {
+ df <- jsonFile(sqlCtx, jsonPath)
+
+ mockLines2 <- c("{\"name\":\"Michael\", \"test\": \"yes\"}",
+ "{\"name\":\"Andy\", \"test\": \"no\"}",
+ "{\"name\":\"Justin\", \"test\": \"yes\"}",
+ "{\"name\":\"Bob\", \"test\": \"yes\"}")
+ jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
+ writeLines(mockLines2, jsonPath2)
+ df2 <- jsonFile(sqlCtx, jsonPath2)
+
+ joined <- join(df, df2)
+ expect_equal(names(joined), c("age", "name", "name", "test"))
+ expect_true(count(joined) == 12)
+
+ joined2 <- join(df, df2, df$name == df2$name)
+ expect_equal(names(joined2), c("age", "name", "name", "test"))
+ expect_true(count(joined2) == 3)
+
+ joined3 <- join(df, df2, df$name == df2$name, "right_outer")
+ expect_equal(names(joined3), c("age", "name", "name", "test"))
+ expect_true(count(joined3) == 4)
+ expect_true(is.na(collect(orderBy(joined3, joined3$age))$age[2]))
+
+ joined4 <- select(join(df, df2, df$name == df2$name, "outer"),
+ alias(df$age + 5, "newAge"), df$name, df2$test)
+ expect_equal(names(joined4), c("newAge", "name", "test"))
+ expect_true(count(joined4) == 4)
+ expect_equal(collect(orderBy(joined4, joined4$name))$newAge[3], 24)
+})
+
+test_that("toJSON() returns an RDD of the correct values", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ testRDD <- toJSON(df)
+ expect_true(inherits(testRDD, "RDD"))
+ expect_true(SparkR:::getSerializedMode(testRDD) == "string")
+ expect_equal(collect(testRDD)[[1]], mockLines[1])
+})
+
+test_that("showDF()", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ expect_output(showDF(df), "age name \nnull Michael\n30 Andy \n19 Justin ")
+})
+
+test_that("isLocal()", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ expect_false(isLocal(df))
+})
+
+test_that("unionAll(), subtract(), and intersect() on a DataFrame", {
+ df <- jsonFile(sqlCtx, jsonPath)
+
+ lines <- c("{\"name\":\"Bob\", \"age\":24}",
+ "{\"name\":\"Andy\", \"age\":30}",
+ "{\"name\":\"James\", \"age\":35}")
+ jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
+ writeLines(lines, jsonPath2)
+ df2 <- loadDF(sqlCtx, jsonPath2, "json")
+
+ unioned <- sortDF(unionAll(df, df2), df$age)
+ expect_true(inherits(unioned, "DataFrame"))
+ expect_true(count(unioned) == 6)
+ expect_true(first(unioned)$name == "Michael")
+
+ subtracted <- sortDF(subtract(df, df2), desc(df$age))
+ expect_true(inherits(unioned, "DataFrame"))
+ expect_true(count(subtracted) == 2)
+ expect_true(first(subtracted)$name == "Justin")
+
+ intersected <- sortDF(intersect(df, df2), df$age)
+ expect_true(inherits(unioned, "DataFrame"))
+ expect_true(count(intersected) == 1)
+ expect_true(first(intersected)$name == "Andy")
+})
+
+test_that("withColumn() and withColumnRenamed()", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ newDF <- withColumn(df, "newAge", df$age + 2)
+ expect_true(length(columns(newDF)) == 3)
+ expect_true(columns(newDF)[3] == "newAge")
+ expect_true(first(filter(newDF, df$name != "Michael"))$newAge == 32)
+
+ newDF2 <- withColumnRenamed(df, "age", "newerAge")
+ expect_true(length(columns(newDF2)) == 2)
+ expect_true(columns(newDF2)[1] == "newerAge")
+})
+
+test_that("saveDF() on DataFrame and works with parquetFile", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ saveDF(df, parquetPath, "parquet", mode="overwrite")
+ parquetDF <- parquetFile(sqlCtx, parquetPath)
+ expect_true(inherits(parquetDF, "DataFrame"))
+ expect_equal(count(df), count(parquetDF))
+})
+
+test_that("parquetFile works with multiple input paths", {
+ df <- jsonFile(sqlCtx, jsonPath)
+ saveDF(df, parquetPath, "parquet", mode="overwrite")
+ parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
+ saveDF(df, parquetPath2, "parquet", mode="overwrite")
+ parquetDF <- parquetFile(sqlCtx, parquetPath, parquetPath2)
+ expect_true(inherits(parquetDF, "DataFrame"))
+ expect_true(count(parquetDF) == count(df)*2)
+})
+
+unlink(parquetPath)
+unlink(jsonPath)
diff --git a/R/pkg/inst/tests/test_take.R b/R/pkg/inst/tests/test_take.R
new file mode 100644
index 0000000000..7f4c7c315d
--- /dev/null
+++ b/R/pkg/inst/tests/test_take.R
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("tests RDD function take()")
+
+# Mock data
+numVector <- c(-10:97)
+numList <- list(sqrt(1), sqrt(2), sqrt(3), 4 ** 10)
+strVector <- c("Dexter Morgan: I suppose I should be upset, even feel",
+ "violated, but I'm not. No, in fact, I think this is a friendly",
+ "message, like \"Hey, wanna play?\" and yes, I want to play. ",
+ "I really, really do.")
+strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge, ",
+ "other times it helps me control the chaos.",
+ "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ",
+ "raising me. But they're both dead now. I didn't kill them. Honest.")
+
+# JavaSparkContext handle
+jsc <- sparkR.init()
+
+test_that("take() gives back the original elements in correct count and order", {
+ numVectorRDD <- parallelize(jsc, numVector, 10)
+ # case: number of elements to take is less than the size of the first partition
+ expect_equal(take(numVectorRDD, 1), as.list(head(numVector, n = 1)))
+ # case: number of elements to take is the same as the size of the first partition
+ expect_equal(take(numVectorRDD, 11), as.list(head(numVector, n = 11)))
+ # case: number of elements to take is greater than all elements
+ expect_equal(take(numVectorRDD, length(numVector)), as.list(numVector))
+ expect_equal(take(numVectorRDD, length(numVector) + 1), as.list(numVector))
+
+ numListRDD <- parallelize(jsc, numList, 1)
+ numListRDD2 <- parallelize(jsc, numList, 4)
+ expect_equal(take(numListRDD, 3), take(numListRDD2, 3))
+ expect_equal(take(numListRDD, 5), take(numListRDD2, 5))
+ expect_equal(take(numListRDD, 1), as.list(head(numList, n = 1)))
+ expect_equal(take(numListRDD2, 999), numList)
+
+ strVectorRDD <- parallelize(jsc, strVector, 2)
+ strVectorRDD2 <- parallelize(jsc, strVector, 3)
+ expect_equal(take(strVectorRDD, 4), as.list(strVector))
+ expect_equal(take(strVectorRDD2, 2), as.list(head(strVector, n = 2)))
+
+ strListRDD <- parallelize(jsc, strList, 4)
+ strListRDD2 <- parallelize(jsc, strList, 1)
+ expect_equal(take(strListRDD, 3), as.list(head(strList, n = 3)))
+ expect_equal(take(strListRDD2, 1), as.list(head(strList, n = 1)))
+
+ expect_true(length(take(strListRDD, 0)) == 0)
+ expect_true(length(take(strVectorRDD, 0)) == 0)
+ expect_true(length(take(numListRDD, 0)) == 0)
+ expect_true(length(take(numVectorRDD, 0)) == 0)
+})
+
diff --git a/R/pkg/inst/tests/test_textFile.R b/R/pkg/inst/tests/test_textFile.R
new file mode 100644
index 0000000000..7bb3e80031
--- /dev/null
+++ b/R/pkg/inst/tests/test_textFile.R
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("the textFile() function")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+mockFile = c("Spark is pretty.", "Spark is awesome.")
+
+test_that("textFile() on a local file returns an RDD", {
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName)
+
+ rdd <- textFile(sc, fileName)
+ expect_true(inherits(rdd, "RDD"))
+ expect_true(count(rdd) > 0)
+ expect_true(count(rdd) == 2)
+
+ unlink(fileName)
+})
+
+test_that("textFile() followed by a collect() returns the same content", {
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName)
+
+ rdd <- textFile(sc, fileName)
+ expect_equal(collect(rdd), as.list(mockFile))
+
+ unlink(fileName)
+})
+
+test_that("textFile() word count works as expected", {
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName)
+
+ rdd <- textFile(sc, fileName)
+
+ words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
+ wordCount <- lapply(words, function(word) { list(word, 1L) })
+
+ counts <- reduceByKey(wordCount, "+", 2L)
+ output <- collect(counts)
+ expected <- list(list("pretty.", 1), list("is", 2), list("awesome.", 1),
+ list("Spark", 2))
+ expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
+
+ unlink(fileName)
+})
+
+test_that("several transformations on RDD created by textFile()", {
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName)
+
+ rdd <- textFile(sc, fileName) # RDD
+ for (i in 1:10) {
+ # PipelinedRDD initially created from RDD
+ rdd <- lapply(rdd, function(x) paste(x, x))
+ }
+ collect(rdd)
+
+ unlink(fileName)
+})
+
+test_that("textFile() followed by a saveAsTextFile() returns the same content", {
+ fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
+ fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName1)
+
+ rdd <- textFile(sc, fileName1)
+ saveAsTextFile(rdd, fileName2)
+ rdd <- textFile(sc, fileName2)
+ expect_equal(collect(rdd), as.list(mockFile))
+
+ unlink(fileName1)
+ unlink(fileName2)
+})
+
+test_that("saveAsTextFile() on a parallelized list works as expected", {
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+ l <- list(1, 2, 3)
+ rdd <- parallelize(sc, l)
+ saveAsTextFile(rdd, fileName)
+ rdd <- textFile(sc, fileName)
+ expect_equal(collect(rdd), lapply(l, function(x) {toString(x)}))
+
+ unlink(fileName)
+})
+
+test_that("textFile() and saveAsTextFile() word count works as expected", {
+ fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
+ fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName1)
+
+ rdd <- textFile(sc, fileName1)
+
+ words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
+ wordCount <- lapply(words, function(word) { list(word, 1L) })
+
+ counts <- reduceByKey(wordCount, "+", 2L)
+
+ saveAsTextFile(counts, fileName2)
+ rdd <- textFile(sc, fileName2)
+
+ output <- collect(rdd)
+ expected <- list(list("awesome.", 1), list("Spark", 2),
+ list("pretty.", 1), list("is", 2))
+ expectedStr <- lapply(expected, function(x) { toString(x) })
+ expect_equal(sortKeyValueList(output), sortKeyValueList(expectedStr))
+
+ unlink(fileName1)
+ unlink(fileName2)
+})
+
+test_that("textFile() on multiple paths", {
+ fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
+ fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines("Spark is pretty.", fileName1)
+ writeLines("Spark is awesome.", fileName2)
+
+ rdd <- textFile(sc, c(fileName1, fileName2))
+ expect_true(count(rdd) == 2)
+
+ unlink(fileName1)
+ unlink(fileName2)
+})
+
+test_that("Pipelined operations on RDDs created using textFile", {
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName)
+
+ rdd <- textFile(sc, fileName)
+
+ lengths <- lapply(rdd, function(x) { length(x) })
+ expect_equal(collect(lengths), list(1, 1))
+
+ lengthsPipelined <- lapply(lengths, function(x) { x + 10 })
+ expect_equal(collect(lengthsPipelined), list(11, 11))
+
+ lengths30 <- lapply(lengthsPipelined, function(x) { x + 20 })
+ expect_equal(collect(lengths30), list(31, 31))
+
+ lengths20 <- lapply(lengths, function(x) { x + 20 })
+ expect_equal(collect(lengths20), list(21, 21))
+
+ unlink(fileName)
+})
+
diff --git a/R/pkg/inst/tests/test_utils.R b/R/pkg/inst/tests/test_utils.R
new file mode 100644
index 0000000000..9c5bb42793
--- /dev/null
+++ b/R/pkg/inst/tests/test_utils.R
@@ -0,0 +1,137 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("functions in utils.R")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+test_that("convertJListToRList() gives back (deserializes) the original JLists
+ of strings and integers", {
+ # It's hard to manually create a Java List using rJava, since it does not
+ # support generics well. Instead, we rely on collect() returning a
+ # JList.
+ nums <- as.list(1:10)
+ rdd <- parallelize(sc, nums, 1L)
+ jList <- callJMethod(rdd@jrdd, "collect")
+ rList <- convertJListToRList(jList, flatten = TRUE)
+ expect_equal(rList, nums)
+
+ strs <- as.list("hello", "spark")
+ rdd <- parallelize(sc, strs, 2L)
+ jList <- callJMethod(rdd@jrdd, "collect")
+ rList <- convertJListToRList(jList, flatten = TRUE)
+ expect_equal(rList, strs)
+})
+
+test_that("serializeToBytes on RDD", {
+ # File content
+ mockFile <- c("Spark is pretty.", "Spark is awesome.")
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName)
+
+ text.rdd <- textFile(sc, fileName)
+ expect_true(getSerializedMode(text.rdd) == "string")
+ ser.rdd <- serializeToBytes(text.rdd)
+ expect_equal(collect(ser.rdd), as.list(mockFile))
+ expect_true(getSerializedMode(ser.rdd) == "byte")
+
+ unlink(fileName)
+})
+
+test_that("cleanClosure on R functions", {
+ y <- c(1, 2, 3)
+ g <- function(x) { x + 1 }
+ f <- function(x) { g(x) + y }
+ newF <- cleanClosure(f)
+ env <- environment(newF)
+ expect_equal(length(ls(env)), 2) # y, g
+ actual <- get("y", envir = env, inherits = FALSE)
+ expect_equal(actual, y)
+ actual <- get("g", envir = env, inherits = FALSE)
+ expect_equal(actual, g)
+
+ # Test for nested enclosures and package variables.
+ env2 <- new.env()
+ funcEnv <- new.env(parent = env2)
+ f <- function(x) { log(g(x) + y) }
+ environment(f) <- funcEnv # enclosing relationship: f -> funcEnv -> env2 -> .GlobalEnv
+ newF <- cleanClosure(f)
+ env <- environment(newF)
+ expect_equal(length(ls(env)), 2) # "min" should not be included
+ actual <- get("y", envir = env, inherits = FALSE)
+ expect_equal(actual, y)
+ actual <- get("g", envir = env, inherits = FALSE)
+ expect_equal(actual, g)
+
+ base <- c(1, 2, 3)
+ l <- list(field = matrix(1))
+ field <- matrix(2)
+ defUse <- 3
+ g <- function(x) { x + y }
+ f <- function(x) {
+ defUse <- base::as.integer(x) + 1 # Test for access operators `::`.
+ lapply(x, g) + 1 # Test for capturing function call "g"'s closure as a argument of lapply.
+ l$field[1,1] <- 3 # Test for access operators `$`.
+ res <- defUse + l$field[1,] # Test for def-use chain of "defUse", and "" symbol.
+ f(res) # Test for recursive calls.
+ }
+ newF <- cleanClosure(f)
+ env <- environment(newF)
+ expect_equal(length(ls(env)), 3) # Only "g", "l" and "f". No "base", "field" or "defUse".
+ expect_true("g" %in% ls(env))
+ expect_true("l" %in% ls(env))
+ expect_true("f" %in% ls(env))
+ expect_equal(get("l", envir = env, inherits = FALSE), l)
+ # "y" should be in the environemnt of g.
+ newG <- get("g", envir = env, inherits = FALSE)
+ env <- environment(newG)
+ expect_equal(length(ls(env)), 1)
+ actual <- get("y", envir = env, inherits = FALSE)
+ expect_equal(actual, y)
+
+ # Test for function (and variable) definitions.
+ f <- function(x) {
+ g <- function(y) { y * 2 }
+ g(x)
+ }
+ newF <- cleanClosure(f)
+ env <- environment(newF)
+ expect_equal(length(ls(env)), 0) # "y" and "g" should not be included.
+
+ # Test for overriding variables in base namespace (Issue: SparkR-196).
+ nums <- as.list(1:10)
+ rdd <- parallelize(sc, nums, 2L)
+ t = 4 # Override base::t in .GlobalEnv.
+ f <- function(x) { x > t }
+ newF <- cleanClosure(f)
+ env <- environment(newF)
+ expect_equal(ls(env), "t")
+ expect_equal(get("t", envir = env, inherits = FALSE), t)
+ actual <- collect(lapply(rdd, f))
+ expected <- as.list(c(rep(FALSE, 4), rep(TRUE, 6)))
+ expect_equal(actual, expected)
+
+ # Test for broadcast variables.
+ a <- matrix(nrow=10, ncol=10, data=rnorm(100))
+ aBroadcast <- broadcast(sc, a)
+ normMultiply <- function(x) { norm(aBroadcast$value) * x }
+ newnormMultiply <- SparkR:::cleanClosure(normMultiply)
+ env <- environment(newnormMultiply)
+ expect_equal(ls(env), "aBroadcast")
+ expect_equal(get("aBroadcast", envir = env, inherits = FALSE), aBroadcast)
+})
diff --git a/R/pkg/inst/worker/daemon.R b/R/pkg/inst/worker/daemon.R
new file mode 100644
index 0000000000..3584b418a7
--- /dev/null
+++ b/R/pkg/inst/worker/daemon.R
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Worker daemon
+
+rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
+script <- paste(rLibDir, "SparkR/worker/worker.R", sep = "/")
+
+# preload SparkR package, speedup worker
+.libPaths(c(rLibDir, .libPaths()))
+suppressPackageStartupMessages(library(SparkR))
+
+port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
+inputCon <- socketConnection(port = port, open = "rb", blocking = TRUE, timeout = 3600)
+
+while (TRUE) {
+ ready <- socketSelect(list(inputCon))
+ if (ready) {
+ port <- SparkR:::readInt(inputCon)
+ # There is a small chance that it could be interrupted by signal, retry one time
+ if (length(port) == 0) {
+ port <- SparkR:::readInt(inputCon)
+ if (length(port) == 0) {
+ cat("quitting daemon\n")
+ quit(save = "no")
+ }
+ }
+ p <- parallel:::mcfork()
+ if (inherits(p, "masterProcess")) {
+ close(inputCon)
+ Sys.setenv(SPARKR_WORKER_PORT = port)
+ source(script)
+ # Set SIGUSR1 so that child can exit
+ tools::pskill(Sys.getpid(), tools::SIGUSR1)
+ parallel:::mcexit(0L)
+ }
+ }
+}
diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R
new file mode 100644
index 0000000000..c6542928e8
--- /dev/null
+++ b/R/pkg/inst/worker/worker.R
@@ -0,0 +1,128 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Worker class
+
+rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
+# Set libPaths to include SparkR package as loadNamespace needs this
+# TODO: Figure out if we can avoid this by not loading any objects that require
+# SparkR namespace
+.libPaths(c(rLibDir, .libPaths()))
+suppressPackageStartupMessages(library(SparkR))
+
+port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
+inputCon <- socketConnection(port = port, blocking = TRUE, open = "rb")
+outputCon <- socketConnection(port = port, blocking = TRUE, open = "wb")
+
+# read the index of the current partition inside the RDD
+partition <- SparkR:::readInt(inputCon)
+
+deserializer <- SparkR:::readString(inputCon)
+serializer <- SparkR:::readString(inputCon)
+
+# Include packages as required
+packageNames <- unserialize(SparkR:::readRaw(inputCon))
+for (pkg in packageNames) {
+ suppressPackageStartupMessages(require(as.character(pkg), character.only=TRUE))
+}
+
+# read function dependencies
+funcLen <- SparkR:::readInt(inputCon)
+computeFunc <- unserialize(SparkR:::readRawLen(inputCon, funcLen))
+env <- environment(computeFunc)
+parent.env(env) <- .GlobalEnv # Attach under global environment.
+
+# Read and set broadcast variables
+numBroadcastVars <- SparkR:::readInt(inputCon)
+if (numBroadcastVars > 0) {
+ for (bcast in seq(1:numBroadcastVars)) {
+ bcastId <- SparkR:::readInt(inputCon)
+ value <- unserialize(SparkR:::readRaw(inputCon))
+ setBroadcastValue(bcastId, value)
+ }
+}
+
+# If -1: read as normal RDD; if >= 0, treat as pairwise RDD and treat the int
+# as number of partitions to create.
+numPartitions <- SparkR:::readInt(inputCon)
+
+isEmpty <- SparkR:::readInt(inputCon)
+
+if (isEmpty != 0) {
+
+ if (numPartitions == -1) {
+ if (deserializer == "byte") {
+ # Now read as many characters as described in funcLen
+ data <- SparkR:::readDeserialize(inputCon)
+ } else if (deserializer == "string") {
+ data <- as.list(readLines(inputCon))
+ } else if (deserializer == "row") {
+ data <- SparkR:::readDeserializeRows(inputCon)
+ }
+ output <- computeFunc(partition, data)
+ if (serializer == "byte") {
+ SparkR:::writeRawSerialize(outputCon, output)
+ } else if (serializer == "row") {
+ SparkR:::writeRowSerialize(outputCon, output)
+ } else {
+ SparkR:::writeStrings(outputCon, output)
+ }
+ } else {
+ if (deserializer == "byte") {
+ # Now read as many characters as described in funcLen
+ data <- SparkR:::readDeserialize(inputCon)
+ } else if (deserializer == "string") {
+ data <- readLines(inputCon)
+ } else if (deserializer == "row") {
+ data <- SparkR:::readDeserializeRows(inputCon)
+ }
+
+ res <- new.env()
+
+ # Step 1: hash the data to an environment
+ hashTupleToEnvir <- function(tuple) {
+ # NOTE: execFunction is the hash function here
+ hashVal <- computeFunc(tuple[[1]])
+ bucket <- as.character(hashVal %% numPartitions)
+ acc <- res[[bucket]]
+ # Create a new accumulator
+ if (is.null(acc)) {
+ acc <- SparkR:::initAccumulator()
+ }
+ SparkR:::addItemToAccumulator(acc, tuple)
+ res[[bucket]] <- acc
+ }
+ invisible(lapply(data, hashTupleToEnvir))
+
+ # Step 2: write out all of the environment as key-value pairs.
+ for (name in ls(res)) {
+ SparkR:::writeInt(outputCon, 2L)
+ SparkR:::writeInt(outputCon, as.integer(name))
+ # Truncate the accumulator list to the number of elements we have
+ length(res[[name]]$data) <- res[[name]]$counter
+ SparkR:::writeRawSerialize(outputCon, res[[name]]$data)
+ }
+ }
+}
+
+# End of output
+if (serializer %in% c("byte", "row")) {
+ SparkR:::writeInt(outputCon, 0L)
+}
+
+close(outputCon)
+close(inputCon)