aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorhlin09 <hlin09pu@gmail.com>2015-04-13 20:43:24 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-04-13 20:43:24 -0700
commit0ba3fdd5992cf09bd38303ebff34d2ed19e5e09b (patch)
tree703b469c6eff31c1ce28102bffee0bbc87c083ae /R
parentb45059d0d7809a986ba07a447deb71f11ec6afe4 (diff)
downloadspark-0ba3fdd5992cf09bd38303ebff34d2ed19e5e09b.tar.gz
spark-0ba3fdd5992cf09bd38303ebff34d2ed19e5e09b.tar.bz2
spark-0ba3fdd5992cf09bd38303ebff34d2ed19e5e09b.zip
[Minor][SparkR] Minor refactor and removes redundancy related to cleanClosure.
1. Only use `cleanClosure` in creation of RRDDs. Normally, user and developer do not need to call `cleanClosure` in their function definition. 2. Removes redundant code (e.g. unnecessary wrapper functions) related to `cleanClosure`. Author: hlin09 <hlin09pu@gmail.com> Closes #5495 from hlin09/cleanClosureFix and squashes the following commits: 74ec303 [hlin09] Minor refactor and removes redundancy.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/R/RDD.R16
-rw-r--r--R/pkg/R/pairRDD.R4
2 files changed, 4 insertions, 16 deletions
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index d6a75007a6..820027ef67 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -85,7 +85,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
# This transformation is the first in its stage:
- .Object@func <- func
+ .Object@func <- cleanClosure(func)
.Object@prev_jrdd <- getJRDD(prev)
.Object@env$prev_serializedMode <- prev@env$serializedMode
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
@@ -94,7 +94,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
pipelinedFunc <- function(split, iterator) {
func(split, prev@func(split, iterator))
}
- .Object@func <- pipelinedFunc
+ .Object@func <- cleanClosure(pipelinedFunc)
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
# Get the serialization mode of the parent RDD
.Object@env$prev_serializedMode <- prev@env$prev_serializedMode
@@ -144,17 +144,13 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
return(rdd@env$jrdd_val)
}
- computeFunc <- function(split, part) {
- rdd@func(split, part)
- }
-
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
connection = NULL)
broadcastArr <- lapply(ls(.broadcastNames),
function(name) { get(name, .broadcastNames) })
- serializedFuncArr <- serialize(computeFunc, connection = NULL)
+ serializedFuncArr <- serialize(rdd@func, connection = NULL)
prev_jrdd <- rdd@prev_jrdd
@@ -551,11 +547,7 @@ setMethod("mapPartitions",
setMethod("lapplyPartitionsWithIndex",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
- FUN <- cleanClosure(FUN)
- closureCapturingFunc <- function(split, part) {
- FUN(split, part)
- }
- PipelinedRDD(X, closureCapturingFunc)
+ PipelinedRDD(X, FUN)
})
#' @rdname lapplyPartitionsWithIndex
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index c2396c32a7..739d399f08 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -694,10 +694,6 @@ setMethod("cogroup",
for (i in 1:rddsLen) {
rdds[[i]] <- lapply(rdds[[i]],
function(x) { list(x[[1]], list(i, x[[2]])) })
- # TODO(hao): As issue [SparkR-142] mentions, the right value of i
- # will not be captured into UDF if getJRDD is not invoked.
- # It should be resolved together with that issue.
- getJRDD(rdds[[i]]) # Capture the closure.
}
union.rdd <- Reduce(unionRDD, rdds)
group.func <- function(vlist) {