diff options
Diffstat (limited to 'python/pyspark/join.py')
-rw-r--r-- | python/pyspark/join.py | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/python/pyspark/join.py b/python/pyspark/join.py index 5f3a7e71f7..b0f1cc1927 100644 --- a/python/pyspark/join.py +++ b/python/pyspark/join.py @@ -33,10 +33,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. from pyspark.resultiterable import ResultIterable + def _do_python_join(rdd, other, numPartitions, dispatch): vs = rdd.map(lambda (k, v): (k, (1, v))) ws = other.map(lambda (k, v): (k, (2, v))) - return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x : dispatch(x.__iter__())) + return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x: dispatch(x.__iter__())) def python_join(rdd, other, numPartitions): @@ -85,6 +86,7 @@ def python_cogroup(rdds, numPartitions): vrdds = [rdd.map(make_mapper(i)) for i, rdd in enumerate(rdds)] union_vrdds = reduce(lambda acc, other: acc.union(other), vrdds) rdd_len = len(vrdds) + def dispatch(seq): bufs = [[] for i in range(rdd_len)] for (n, v) in seq: |