aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/join.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/join.py')
-rw-r--r--python/pyspark/join.py13
1 files changed, 7 insertions, 6 deletions
diff --git a/python/pyspark/join.py b/python/pyspark/join.py
index efc1ef9396..c3491defb2 100644
--- a/python/pyspark/join.py
+++ b/python/pyspark/join.py
@@ -48,7 +48,7 @@ def python_join(rdd, other, numPartitions):
vbuf.append(v)
elif n == 2:
wbuf.append(v)
- return [(v, w) for v in vbuf for w in wbuf]
+ return ((v, w) for v in vbuf for w in wbuf)
return _do_python_join(rdd, other, numPartitions, dispatch)
@@ -62,7 +62,7 @@ def python_right_outer_join(rdd, other, numPartitions):
wbuf.append(v)
if not vbuf:
vbuf.append(None)
- return [(v, w) for v in vbuf for w in wbuf]
+ return ((v, w) for v in vbuf for w in wbuf)
return _do_python_join(rdd, other, numPartitions, dispatch)
@@ -76,7 +76,7 @@ def python_left_outer_join(rdd, other, numPartitions):
wbuf.append(v)
if not wbuf:
wbuf.append(None)
- return [(v, w) for v in vbuf for w in wbuf]
+ return ((v, w) for v in vbuf for w in wbuf)
return _do_python_join(rdd, other, numPartitions, dispatch)
@@ -104,8 +104,9 @@ def python_cogroup(rdds, numPartitions):
rdd_len = len(vrdds)
def dispatch(seq):
- bufs = [[] for i in range(rdd_len)]
- for (n, v) in seq:
+ bufs = [[] for _ in range(rdd_len)]
+ for n, v in seq:
bufs[n].append(v)
- return tuple(map(ResultIterable, bufs))
+ return tuple(ResultIterable(vs) for vs in bufs)
+
return union_vrdds.groupByKey(numPartitions).mapValues(dispatch)