diff options
Diffstat (limited to 'python/pyspark/join.py')
-rw-r--r-- | python/pyspark/join.py | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/python/pyspark/join.py b/python/pyspark/join.py index efc1ef9396..c3491defb2 100644 --- a/python/pyspark/join.py +++ b/python/pyspark/join.py @@ -48,7 +48,7 @@ def python_join(rdd, other, numPartitions): vbuf.append(v) elif n == 2: wbuf.append(v) - return [(v, w) for v in vbuf for w in wbuf] + return ((v, w) for v in vbuf for w in wbuf) return _do_python_join(rdd, other, numPartitions, dispatch) @@ -62,7 +62,7 @@ def python_right_outer_join(rdd, other, numPartitions): wbuf.append(v) if not vbuf: vbuf.append(None) - return [(v, w) for v in vbuf for w in wbuf] + return ((v, w) for v in vbuf for w in wbuf) return _do_python_join(rdd, other, numPartitions, dispatch) @@ -76,7 +76,7 @@ def python_left_outer_join(rdd, other, numPartitions): wbuf.append(v) if not wbuf: wbuf.append(None) - return [(v, w) for v in vbuf for w in wbuf] + return ((v, w) for v in vbuf for w in wbuf) return _do_python_join(rdd, other, numPartitions, dispatch) @@ -104,8 +104,9 @@ def python_cogroup(rdds, numPartitions): rdd_len = len(vrdds) def dispatch(seq): - bufs = [[] for i in range(rdd_len)] - for (n, v) in seq: + bufs = [[] for _ in range(rdd_len)] + for n, v in seq: bufs[n].append(v) - return tuple(map(ResultIterable, bufs)) + return tuple(ResultIterable(vs) for vs in bufs) + return union_vrdds.groupByKey(numPartitions).mapValues(dispatch) |