aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/inst
diff options
context:
space:
mode:
authorcafreeman <cfreeman@alteryx.com>2015-04-17 13:42:19 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-04-17 13:42:19 -0700
commit59e206deb7346148412bbf5ba4ab626718fadf18 (patch)
treecf4435a81197e76957c4afdcc48686a6e46dc5dc /R/pkg/inst
parenta83571acc938582865efb41645aa1e414f339e46 (diff)
downloadspark-59e206deb7346148412bbf5ba4ab626718fadf18.tar.gz
spark-59e206deb7346148412bbf5ba4ab626718fadf18.tar.bz2
spark-59e206deb7346148412bbf5ba4ab626718fadf18.zip
[SPARK-6807] [SparkR] Merge recent SparkR-pkg changes
This PR pulls in recent changes in SparkR-pkg, including cartesian, intersection, sampleByKey, subtract, subtractByKey, except, and some API for StructType and StructField. Author: cafreeman <cfreeman@alteryx.com> Author: Davies Liu <davies@databricks.com> Author: Zongheng Yang <zongheng.y@gmail.com> Author: Shivaram Venkataraman <shivaram.venkataraman@gmail.com> Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu> Author: Sun Rui <rui.sun@intel.com> Closes #5436 from davies/R3 and squashes the following commits: c2b09be [Davies Liu] SQLTypes -> schema a5a02f2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R3 168b7fe [Davies Liu] sort generics b1fe460 [Davies Liu] fix conflict in README.md e74c04e [Davies Liu] fix schema.R 4f5ac09 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R5 41f8184 [Davies Liu] rm man ae78312 [Davies Liu] Merge pull request #237 from sun-rui/SPARKR-154_3 1bdcb63 [Zongheng Yang] Updates to README.md. 5a553e7 [cafreeman] Use object attribute instead of argument 71372d9 [cafreeman] Update docs and examples 8526d2e71 [cafreeman] Remove `tojson` functions 6ef5f2d [cafreeman] Fix spacing 7741d66 [cafreeman] Rename the SQL DataType function 141efd8 [Shivaram Venkataraman] Merge pull request #245 from hqzizania/upstream 9387402 [Davies Liu] fix style 40199eb [Shivaram Venkataraman] Move except into sorted position 07d0dbc [Sun Rui] [SPARKR-244] Fix test failure after integration of subtract() and subtractByKey() for RDD. 7e8caa3 [Shivaram Venkataraman] Merge pull request #246 from hlin09/fixCombineByKey ed66c81 [cafreeman] Update `subtract` to work with `generics.R` f3ba785 [cafreeman] Fixed duplicate export 275deb4 [cafreeman] Update `NAMESPACE` and tests 1a3b63d [cafreeman] new version of `CreateDF` 836c4bf [cafreeman] Update `createDataFrame` and `toDF` be5d5c1 [cafreeman] refactor schema functions 40338a4 [Zongheng Yang] Merge pull request #244 from sun-rui/SPARKR-154_5 20b97a6 [Zongheng Yang] Merge pull request #234 from hqzizania/assist ba54e34 [Shivaram Venkataraman] Merge pull request #238 from sun-rui/SPARKR-154_4 c9497a3 [Shivaram Venkataraman] Merge pull request #208 from lythesia/master b317aa7 [Zongheng Yang] Merge pull request #243 from hqzizania/master 136a07e [Zongheng Yang] Merge pull request #242 from hqzizania/stats cd66603 [cafreeman] new line at EOF 8b76e81 [Shivaram Venkataraman] Merge pull request #233 from redbaron/fail-early-on-missing-dep 7dd81b7 [cafreeman] Documentation 0e2a94f [cafreeman] Define functions for schema and fields
Diffstat (limited to 'R/pkg/inst')
-rw-r--r--R/pkg/inst/tests/test_rdd.R193
-rw-r--r--R/pkg/inst/tests/test_shuffle.R12
-rw-r--r--R/pkg/inst/tests/test_sparkSQL.R35
-rw-r--r--R/pkg/inst/worker/worker.R59
4 files changed, 254 insertions, 45 deletions
diff --git a/R/pkg/inst/tests/test_rdd.R b/R/pkg/inst/tests/test_rdd.R
index b76e4db03e..3ba7d17163 100644
--- a/R/pkg/inst/tests/test_rdd.R
+++ b/R/pkg/inst/tests/test_rdd.R
@@ -35,7 +35,7 @@ test_that("get number of partitions in RDD", {
test_that("first on RDD", {
expect_true(first(rdd) == 1)
newrdd <- lapply(rdd, function(x) x + 1)
- expect_true(first(newrdd) == 2)
+ expect_true(first(newrdd) == 2)
})
test_that("count and length on RDD", {
@@ -48,7 +48,7 @@ test_that("count by values and keys", {
actual <- countByValue(mods)
expected <- list(list(0, 3L), list(1, 4L), list(2, 3L))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-
+
actual <- countByKey(intRdd)
expected <- list(list(2L, 2L), list(1L, 2L))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
@@ -82,11 +82,11 @@ test_that("filterRDD on RDD", {
filtered.rdd <- filterRDD(rdd, function(x) { x %% 2 == 0 })
actual <- collect(filtered.rdd)
expect_equal(actual, list(2, 4, 6, 8, 10))
-
+
filtered.rdd <- Filter(function(x) { x[[2]] < 0 }, intRdd)
actual <- collect(filtered.rdd)
expect_equal(actual, list(list(1L, -1)))
-
+
# Filter out all elements.
filtered.rdd <- filterRDD(rdd, function(x) { x > 10 })
actual <- collect(filtered.rdd)
@@ -96,7 +96,7 @@ test_that("filterRDD on RDD", {
test_that("lookup on RDD", {
vals <- lookup(intRdd, 1L)
expect_equal(vals, list(-1, 200))
-
+
vals <- lookup(intRdd, 3L)
expect_equal(vals, list())
})
@@ -110,7 +110,7 @@ test_that("several transformations on RDD (a benchmark on PipelinedRDD)", {
})
rdd2 <- lapply(rdd2, function(x) x + x)
actual <- collect(rdd2)
- expected <- list(24, 24, 24, 24, 24,
+ expected <- list(24, 24, 24, 24, 24,
168, 170, 172, 174, 176)
expect_equal(actual, expected)
})
@@ -248,10 +248,10 @@ test_that("flatMapValues() on pairwise RDDs", {
l <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
actual <- collect(flatMapValues(l, function(x) { x }))
expect_equal(actual, list(list(1,1), list(1,2), list(2,3), list(2,4)))
-
+
# Generate x to x+1 for every value
actual <- collect(flatMapValues(intRdd, function(x) { x:(x + 1) }))
- expect_equal(actual,
+ expect_equal(actual,
list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101),
list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201)))
})
@@ -348,7 +348,7 @@ test_that("top() on RDDs", {
rdd <- parallelize(sc, l)
actual <- top(rdd, 6L)
expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:6])
-
+
l <- list("e", "d", "c", "d", "a")
rdd <- parallelize(sc, l)
actual <- top(rdd, 3L)
@@ -358,7 +358,7 @@ test_that("top() on RDDs", {
test_that("fold() on RDDs", {
actual <- fold(rdd, 0, "+")
expect_equal(actual, Reduce("+", nums, 0))
-
+
rdd <- parallelize(sc, list())
actual <- fold(rdd, 0, "+")
expect_equal(actual, 0)
@@ -371,7 +371,7 @@ test_that("aggregateRDD() on RDDs", {
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
expect_equal(actual, list(10, 4))
-
+
rdd <- parallelize(sc, list())
actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
expect_equal(actual, list(0, 0))
@@ -380,13 +380,13 @@ test_that("aggregateRDD() on RDDs", {
test_that("zipWithUniqueId() on RDDs", {
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
actual <- collect(zipWithUniqueId(rdd))
- expected <- list(list("a", 0), list("b", 3), list("c", 1),
+ expected <- list(list("a", 0), list("b", 3), list("c", 1),
list("d", 4), list("e", 2))
expect_equal(actual, expected)
-
+
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
actual <- collect(zipWithUniqueId(rdd))
- expected <- list(list("a", 0), list("b", 1), list("c", 2),
+ expected <- list(list("a", 0), list("b", 1), list("c", 2),
list("d", 3), list("e", 4))
expect_equal(actual, expected)
})
@@ -394,13 +394,13 @@ test_that("zipWithUniqueId() on RDDs", {
test_that("zipWithIndex() on RDDs", {
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
actual <- collect(zipWithIndex(rdd))
- expected <- list(list("a", 0), list("b", 1), list("c", 2),
+ expected <- list(list("a", 0), list("b", 1), list("c", 2),
list("d", 3), list("e", 4))
expect_equal(actual, expected)
-
+
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
actual <- collect(zipWithIndex(rdd))
- expected <- list(list("a", 0), list("b", 1), list("c", 2),
+ expected <- list(list("a", 0), list("b", 1), list("c", 2),
list("d", 3), list("e", 4))
expect_equal(actual, expected)
})
@@ -427,12 +427,12 @@ test_that("pipeRDD() on RDDs", {
actual <- collect(pipeRDD(rdd, "more"))
expected <- as.list(as.character(1:10))
expect_equal(actual, expected)
-
+
trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n"))
actual <- collect(pipeRDD(trailed.rdd, "sort"))
expected <- list("", "1", "2", "3")
expect_equal(actual, expected)
-
+
rev.nums <- 9:0
rev.rdd <- parallelize(sc, rev.nums, 2L)
actual <- collect(pipeRDD(rev.rdd, "sort"))
@@ -446,11 +446,11 @@ test_that("zipRDD() on RDDs", {
actual <- collect(zipRDD(rdd1, rdd2))
expect_equal(actual,
list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004)))
-
+
mockFile = c("Spark is pretty.", "Spark is awesome.")
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines(mockFile, fileName)
-
+
rdd <- textFile(sc, fileName, 1)
actual <- collect(zipRDD(rdd, rdd))
expected <- lapply(mockFile, function(x) { list(x ,x) })
@@ -465,10 +465,125 @@ test_that("zipRDD() on RDDs", {
actual <- collect(zipRDD(rdd, rdd1))
expected <- lapply(mockFile, function(x) { list(x, x) })
expect_equal(actual, expected)
-
+
+ unlink(fileName)
+})
+
+test_that("cartesian() on RDDs", {
+ rdd <- parallelize(sc, 1:3)
+ actual <- collect(cartesian(rdd, rdd))
+ expect_equal(sortKeyValueList(actual),
+ list(
+ list(1, 1), list(1, 2), list(1, 3),
+ list(2, 1), list(2, 2), list(2, 3),
+ list(3, 1), list(3, 2), list(3, 3)))
+
+ # test case where one RDD is empty
+ emptyRdd <- parallelize(sc, list())
+ actual <- collect(cartesian(rdd, emptyRdd))
+ expect_equal(actual, list())
+
+ mockFile = c("Spark is pretty.", "Spark is awesome.")
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName)
+
+ rdd <- textFile(sc, fileName)
+ actual <- collect(cartesian(rdd, rdd))
+ expected <- list(
+ list("Spark is awesome.", "Spark is pretty."),
+ list("Spark is awesome.", "Spark is awesome."),
+ list("Spark is pretty.", "Spark is pretty."),
+ list("Spark is pretty.", "Spark is awesome."))
+ expect_equal(sortKeyValueList(actual), expected)
+
+ rdd1 <- parallelize(sc, 0:1)
+ actual <- collect(cartesian(rdd1, rdd))
+ expect_equal(sortKeyValueList(actual),
+ list(
+ list(0, "Spark is pretty."),
+ list(0, "Spark is awesome."),
+ list(1, "Spark is pretty."),
+ list(1, "Spark is awesome.")))
+
+ rdd1 <- map(rdd, function(x) { x })
+ actual <- collect(cartesian(rdd, rdd1))
+ expect_equal(sortKeyValueList(actual), expected)
+
unlink(fileName)
})
+test_that("subtract() on RDDs", {
+ l <- list(1, 1, 2, 2, 3, 4)
+ rdd1 <- parallelize(sc, l)
+
+ # subtract by itself
+ actual <- collect(subtract(rdd1, rdd1))
+ expect_equal(actual, list())
+
+ # subtract by an empty RDD
+ rdd2 <- parallelize(sc, list())
+ actual <- collect(subtract(rdd1, rdd2))
+ expect_equal(as.list(sort(as.vector(actual, mode="integer"))),
+ l)
+
+ rdd2 <- parallelize(sc, list(2, 4))
+ actual <- collect(subtract(rdd1, rdd2))
+ expect_equal(as.list(sort(as.vector(actual, mode="integer"))),
+ list(1, 1, 3))
+
+ l <- list("a", "a", "b", "b", "c", "d")
+ rdd1 <- parallelize(sc, l)
+ rdd2 <- parallelize(sc, list("b", "d"))
+ actual <- collect(subtract(rdd1, rdd2))
+ expect_equal(as.list(sort(as.vector(actual, mode="character"))),
+ list("a", "a", "c"))
+})
+
+test_that("subtractByKey() on pairwise RDDs", {
+ l <- list(list("a", 1), list("b", 4),
+ list("b", 5), list("a", 2))
+ rdd1 <- parallelize(sc, l)
+
+ # subtractByKey by itself
+ actual <- collect(subtractByKey(rdd1, rdd1))
+ expect_equal(actual, list())
+
+ # subtractByKey by an empty RDD
+ rdd2 <- parallelize(sc, list())
+ actual <- collect(subtractByKey(rdd1, rdd2))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(l))
+
+ rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
+ actual <- collect(subtractByKey(rdd1, rdd2))
+ expect_equal(actual,
+ list(list("b", 4), list("b", 5)))
+
+ l <- list(list(1, 1), list(2, 4),
+ list(2, 5), list(1, 2))
+ rdd1 <- parallelize(sc, l)
+ rdd2 <- parallelize(sc, list(list(1, 3), list(3, 1)))
+ actual <- collect(subtractByKey(rdd1, rdd2))
+ expect_equal(actual,
+ list(list(2, 4), list(2, 5)))
+})
+
+test_that("intersection() on RDDs", {
+ # intersection with self
+ actual <- collect(intersection(rdd, rdd))
+ expect_equal(sort(as.integer(actual)), nums)
+
+ # intersection with an empty RDD
+ emptyRdd <- parallelize(sc, list())
+ actual <- collect(intersection(rdd, emptyRdd))
+ expect_equal(actual, list())
+
+ rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
+ rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
+ actual <- collect(intersection(rdd1, rdd2))
+ expect_equal(sort(as.integer(actual)), 1:3)
+})
+
test_that("join() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1,1), list(2,4)))
rdd2 <- parallelize(sc, list(list(1,2), list(1,3)))
@@ -596,9 +711,9 @@ test_that("sortByKey() on pairwise RDDs", {
sortedRdd3 <- sortByKey(rdd3)
actual <- collect(sortedRdd3)
expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
-
+
# test on the boundary cases
-
+
# boundary case 1: the RDD to be sorted has only 1 partition
rdd4 <- parallelize(sc, l, 1L)
sortedRdd4 <- sortByKey(rdd4)
@@ -623,7 +738,7 @@ test_that("sortByKey() on pairwise RDDs", {
rdd7 <- parallelize(sc, l3, 2L)
sortedRdd7 <- sortByKey(rdd7)
actual <- collect(sortedRdd7)
- expect_equal(actual, l3)
+ expect_equal(actual, l3)
})
test_that("collectAsMap() on a pairwise RDD", {
@@ -634,12 +749,36 @@ test_that("collectAsMap() on a pairwise RDD", {
rdd <- parallelize(sc, list(list("a", 1), list("b", 2)))
vals <- collectAsMap(rdd)
expect_equal(vals, list(a = 1, b = 2))
-
+
rdd <- parallelize(sc, list(list(1.1, 2.2), list(1.2, 2.4)))
vals <- collectAsMap(rdd)
expect_equal(vals, list(`1.1` = 2.2, `1.2` = 2.4))
-
+
rdd <- parallelize(sc, list(list(1, "a"), list(2, "b")))
vals <- collectAsMap(rdd)
expect_equal(vals, list(`1` = "a", `2` = "b"))
})
+
+test_that("sampleByKey() on pairwise RDDs", {
+ rdd <- parallelize(sc, 1:2000)
+ pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list("a", x) else list("b", x) })
+ fractions <- list(a = 0.2, b = 0.1)
+ sample <- sampleByKey(pairsRDD, FALSE, fractions, 1618L)
+ expect_equal(100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")), TRUE)
+ expect_equal(50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")), TRUE)
+ expect_equal(lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0, TRUE)
+ expect_equal(lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000, TRUE)
+ expect_equal(lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0, TRUE)
+ expect_equal(lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000, TRUE)
+
+ rdd <- parallelize(sc, 1:2000)
+ pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list(2, x) else list(3, x) })
+ fractions <- list(`2` = 0.2, `3` = 0.1)
+ sample <- sampleByKey(pairsRDD, TRUE, fractions, 1618L)
+ expect_equal(100 < length(lookup(sample, 2)) && 300 > length(lookup(sample, 2)), TRUE)
+ expect_equal(50 < length(lookup(sample, 3)) && 150 > length(lookup(sample, 3)), TRUE)
+ expect_equal(lookup(sample, 2)[which.min(lookup(sample, 2))] >= 0, TRUE)
+ expect_equal(lookup(sample, 2)[which.max(lookup(sample, 2))] <= 2000, TRUE)
+ expect_equal(lookup(sample, 3)[which.min(lookup(sample, 3))] >= 0, TRUE)
+ expect_equal(lookup(sample, 3)[which.max(lookup(sample, 3))] <= 2000, TRUE)
+})
diff --git a/R/pkg/inst/tests/test_shuffle.R b/R/pkg/inst/tests/test_shuffle.R
index d1da8232ae..d7dedda553 100644
--- a/R/pkg/inst/tests/test_shuffle.R
+++ b/R/pkg/inst/tests/test_shuffle.R
@@ -87,6 +87,18 @@ test_that("combineByKey for doubles", {
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
})
+test_that("combineByKey for characters", {
+ stringKeyRDD <- parallelize(sc,
+ list(list("max", 1L), list("min", 2L),
+ list("other", 3L), list("max", 4L)), 2L)
+ reduced <- combineByKey(stringKeyRDD,
+ function(x) { x }, "+", "+", 2L)
+ actual <- collect(reduced)
+
+ expected <- list(list("max", 5L), list("min", 2L), list("other", 3L))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
test_that("aggregateByKey", {
# test aggregateByKey for int keys
rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index cf5cf6d169..25831ae2d9 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -44,9 +44,8 @@ test_that("infer types", {
expect_equal(infer_type(list(1L, 2L)),
list(type = 'array', elementType = "integer", containsNull = TRUE))
expect_equal(infer_type(list(a = 1L, b = "2")),
- list(type = "struct",
- fields = list(list(name = "a", type = "integer", nullable = TRUE),
- list(name = "b", type = "string", nullable = TRUE))))
+ structType(structField(x = "a", type = "integer", nullable = TRUE),
+ structField(x = "b", type = "string", nullable = TRUE)))
e <- new.env()
assign("a", 1L, envir = e)
expect_equal(infer_type(e),
@@ -54,6 +53,18 @@ test_that("infer types", {
valueContainsNull = TRUE))
})
+test_that("structType and structField", {
+ testField <- structField("a", "string")
+ expect_true(inherits(testField, "structField"))
+ expect_true(testField$name() == "a")
+ expect_true(testField$nullable())
+
+ testSchema <- structType(testField, structField("b", "integer"))
+ expect_true(inherits(testSchema, "structType"))
+ expect_true(inherits(testSchema$fields()[[2]], "structField"))
+ expect_true(testSchema$fields()[[1]]$dataType.toString() == "StringType")
+})
+
test_that("create DataFrame from RDD", {
rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) })
df <- createDataFrame(sqlCtx, rdd, list("a", "b"))
@@ -66,9 +77,8 @@ test_that("create DataFrame from RDD", {
expect_true(inherits(df, "DataFrame"))
expect_equal(columns(df), c("_1", "_2"))
- fields <- list(list(name = "a", type = "integer", nullable = TRUE),
- list(name = "b", type = "string", nullable = TRUE))
- schema <- list(type = "struct", fields = fields)
+ schema <- structType(structField(x = "a", type = "integer", nullable = TRUE),
+ structField(x = "b", type = "string", nullable = TRUE))
df <- createDataFrame(sqlCtx, rdd, schema)
expect_true(inherits(df, "DataFrame"))
expect_equal(columns(df), c("a", "b"))
@@ -94,9 +104,8 @@ test_that("toDF", {
expect_true(inherits(df, "DataFrame"))
expect_equal(columns(df), c("_1", "_2"))
- fields <- list(list(name = "a", type = "integer", nullable = TRUE),
- list(name = "b", type = "string", nullable = TRUE))
- schema <- list(type = "struct", fields = fields)
+ schema <- structType(structField(x = "a", type = "integer", nullable = TRUE),
+ structField(x = "b", type = "string", nullable = TRUE))
df <- toDF(rdd, schema)
expect_true(inherits(df, "DataFrame"))
expect_equal(columns(df), c("a", "b"))
@@ -635,7 +644,7 @@ test_that("isLocal()", {
expect_false(isLocal(df))
})
-test_that("unionAll(), subtract(), and intersect() on a DataFrame", {
+test_that("unionAll(), except(), and intersect() on a DataFrame", {
df <- jsonFile(sqlCtx, jsonPath)
lines <- c("{\"name\":\"Bob\", \"age\":24}",
@@ -650,10 +659,10 @@ test_that("unionAll(), subtract(), and intersect() on a DataFrame", {
expect_true(count(unioned) == 6)
expect_true(first(unioned)$name == "Michael")
- subtracted <- sortDF(subtract(df, df2), desc(df$age))
+ excepted <- sortDF(except(df, df2), desc(df$age))
expect_true(inherits(unioned, "DataFrame"))
- expect_true(count(subtracted) == 2)
- expect_true(first(subtracted)$name == "Justin")
+ expect_true(count(excepted) == 2)
+ expect_true(first(excepted)$name == "Justin")
intersected <- sortDF(intersect(df, df2), df$age)
expect_true(inherits(unioned, "DataFrame"))
diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R
index c6542928e8..014bf7bd7b 100644
--- a/R/pkg/inst/worker/worker.R
+++ b/R/pkg/inst/worker/worker.R
@@ -17,6 +17,23 @@
# Worker class
+# Get current system time
+currentTimeSecs <- function() {
+ as.numeric(Sys.time())
+}
+
+# Get elapsed time
+elapsedSecs <- function() {
+ proc.time()[3]
+}
+
+# Constants
+specialLengths <- list(END_OF_STERAM = 0L, TIMING_DATA = -1L)
+
+# Timing R process boot
+bootTime <- currentTimeSecs()
+bootElap <- elapsedSecs()
+
rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
# Set libPaths to include SparkR package as loadNamespace needs this
# TODO: Figure out if we can avoid this by not loading any objects that require
@@ -37,7 +54,7 @@ serializer <- SparkR:::readString(inputCon)
# Include packages as required
packageNames <- unserialize(SparkR:::readRaw(inputCon))
for (pkg in packageNames) {
- suppressPackageStartupMessages(require(as.character(pkg), character.only=TRUE))
+ suppressPackageStartupMessages(library(as.character(pkg), character.only=TRUE))
}
# read function dependencies
@@ -46,6 +63,9 @@ computeFunc <- unserialize(SparkR:::readRawLen(inputCon, funcLen))
env <- environment(computeFunc)
parent.env(env) <- .GlobalEnv # Attach under global environment.
+# Timing init envs for computing
+initElap <- elapsedSecs()
+
# Read and set broadcast variables
numBroadcastVars <- SparkR:::readInt(inputCon)
if (numBroadcastVars > 0) {
@@ -56,6 +76,9 @@ if (numBroadcastVars > 0) {
}
}
+# Timing broadcast
+broadcastElap <- elapsedSecs()
+
# If -1: read as normal RDD; if >= 0, treat as pairwise RDD and treat the int
# as number of partitions to create.
numPartitions <- SparkR:::readInt(inputCon)
@@ -73,14 +96,23 @@ if (isEmpty != 0) {
} else if (deserializer == "row") {
data <- SparkR:::readDeserializeRows(inputCon)
}
+ # Timing reading input data for execution
+ inputElap <- elapsedSecs()
+
output <- computeFunc(partition, data)
+ # Timing computing
+ computeElap <- elapsedSecs()
+
if (serializer == "byte") {
SparkR:::writeRawSerialize(outputCon, output)
} else if (serializer == "row") {
SparkR:::writeRowSerialize(outputCon, output)
} else {
- SparkR:::writeStrings(outputCon, output)
+ # write lines one-by-one with flag
+ lapply(output, function(line) SparkR:::writeString(outputCon, line))
}
+ # Timing output
+ outputElap <- elapsedSecs()
} else {
if (deserializer == "byte") {
# Now read as many characters as described in funcLen
@@ -90,6 +122,8 @@ if (isEmpty != 0) {
} else if (deserializer == "row") {
data <- SparkR:::readDeserializeRows(inputCon)
}
+ # Timing reading input data for execution
+ inputElap <- elapsedSecs()
res <- new.env()
@@ -107,6 +141,8 @@ if (isEmpty != 0) {
res[[bucket]] <- acc
}
invisible(lapply(data, hashTupleToEnvir))
+ # Timing computing
+ computeElap <- elapsedSecs()
# Step 2: write out all of the environment as key-value pairs.
for (name in ls(res)) {
@@ -116,13 +152,26 @@ if (isEmpty != 0) {
length(res[[name]]$data) <- res[[name]]$counter
SparkR:::writeRawSerialize(outputCon, res[[name]]$data)
}
+ # Timing output
+ outputElap <- elapsedSecs()
}
+} else {
+ inputElap <- broadcastElap
+ computeElap <- broadcastElap
+ outputElap <- broadcastElap
}
+# Report timing
+SparkR:::writeInt(outputCon, specialLengths$TIMING_DATA)
+SparkR:::writeDouble(outputCon, bootTime)
+SparkR:::writeDouble(outputCon, initElap - bootElap) # init
+SparkR:::writeDouble(outputCon, broadcastElap - initElap) # broadcast
+SparkR:::writeDouble(outputCon, inputElap - broadcastElap) # input
+SparkR:::writeDouble(outputCon, computeElap - inputElap) # compute
+SparkR:::writeDouble(outputCon, outputElap - computeElap) # output
+
# End of output
-if (serializer %in% c("byte", "row")) {
- SparkR:::writeInt(outputCon, 0L)
-}
+SparkR:::writeInt(outputCon, specialLengths$END_OF_STERAM)
close(outputCon)
close(inputCon)