diff options
author | hlin09 <hlin09pu@gmail.com> | 2015-04-13 20:43:24 -0700 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@cs.berkeley.edu> | 2015-04-13 20:43:24 -0700 |
commit | 0ba3fdd5992cf09bd38303ebff34d2ed19e5e09b (patch) | |
tree | 703b469c6eff31c1ce28102bffee0bbc87c083ae /R | |
parent | b45059d0d7809a986ba07a447deb71f11ec6afe4 (diff) | |
download | spark-0ba3fdd5992cf09bd38303ebff34d2ed19e5e09b.tar.gz spark-0ba3fdd5992cf09bd38303ebff34d2ed19e5e09b.tar.bz2 spark-0ba3fdd5992cf09bd38303ebff34d2ed19e5e09b.zip |
[Minor][SparkR] Minor refactor and removes redundancy related to cleanClosure.
1. Only use `cleanClosure` in creation of RRDDs. Normally, user and developer do not need to call `cleanClosure` in their function definition.
2. Removes redundant code (e.g. unnecessary wrapper functions) related to `cleanClosure`.
Author: hlin09 <hlin09pu@gmail.com>
Closes #5495 from hlin09/cleanClosureFix and squashes the following commits:
74ec303 [hlin09] Minor refactor and removes redundancy.
Diffstat (limited to 'R')
-rw-r--r-- | R/pkg/R/RDD.R | 16 | ||||
-rw-r--r-- | R/pkg/R/pairRDD.R | 4 |
2 files changed, 4 insertions, 16 deletions
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index d6a75007a6..820027ef67 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -85,7 +85,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) { # This transformation is the first in its stage: - .Object@func <- func + .Object@func <- cleanClosure(func) .Object@prev_jrdd <- getJRDD(prev) .Object@env$prev_serializedMode <- prev@env$serializedMode # NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD @@ -94,7 +94,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) pipelinedFunc <- function(split, iterator) { func(split, prev@func(split, iterator)) } - .Object@func <- pipelinedFunc + .Object@func <- cleanClosure(pipelinedFunc) .Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline # Get the serialization mode of the parent RDD .Object@env$prev_serializedMode <- prev@env$prev_serializedMode @@ -144,17 +144,13 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"), return(rdd@env$jrdd_val) } - computeFunc <- function(split, part) { - rdd@func(split, part) - } - packageNamesArr <- serialize(.sparkREnv[[".packages"]], connection = NULL) broadcastArr <- lapply(ls(.broadcastNames), function(name) { get(name, .broadcastNames) }) - serializedFuncArr <- serialize(computeFunc, connection = NULL) + serializedFuncArr <- serialize(rdd@func, connection = NULL) prev_jrdd <- rdd@prev_jrdd @@ -551,11 +547,7 @@ setMethod("mapPartitions", setMethod("lapplyPartitionsWithIndex", signature(X = "RDD", FUN = "function"), function(X, FUN) { - FUN <- cleanClosure(FUN) - closureCapturingFunc <- function(split, part) { - FUN(split, part) - } - PipelinedRDD(X, closureCapturingFunc) + PipelinedRDD(X, FUN) }) #' @rdname lapplyPartitionsWithIndex diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R index c2396c32a7..739d399f08 100644 --- a/R/pkg/R/pairRDD.R +++ b/R/pkg/R/pairRDD.R @@ -694,10 +694,6 @@ setMethod("cogroup", for (i in 1:rddsLen) { rdds[[i]] <- lapply(rdds[[i]], function(x) { list(x[[1]], list(i, x[[2]])) }) - # TODO(hao): As issue [SparkR-142] mentions, the right value of i - # will not be captured into UDF if getJRDD is not invoked. - # It should be resolved together with that issue. - getJRDD(rdds[[i]]) # Capture the closure. } union.rdd <- Reduce(unionRDD, rdds) group.func <- function(vlist) { |