aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/join.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/join.py')
-rw-r--r--python/pyspark/join.py8
1 files changed, 4 insertions, 4 deletions
diff --git a/python/pyspark/join.py b/python/pyspark/join.py
index b4a8447137..efc1ef9396 100644
--- a/python/pyspark/join.py
+++ b/python/pyspark/join.py
@@ -35,8 +35,8 @@ from pyspark.resultiterable import ResultIterable
def _do_python_join(rdd, other, numPartitions, dispatch):
- vs = rdd.map(lambda (k, v): (k, (1, v)))
- ws = other.map(lambda (k, v): (k, (2, v)))
+ vs = rdd.mapValues(lambda v: (1, v))
+ ws = other.mapValues(lambda v: (2, v))
return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x: dispatch(x.__iter__()))
@@ -98,8 +98,8 @@ def python_full_outer_join(rdd, other, numPartitions):
def python_cogroup(rdds, numPartitions):
def make_mapper(i):
- return lambda (k, v): (k, (i, v))
- vrdds = [rdd.map(make_mapper(i)) for i, rdd in enumerate(rdds)]
+ return lambda v: (i, v)
+ vrdds = [rdd.mapValues(make_mapper(i)) for i, rdd in enumerate(rdds)]
union_vrdds = reduce(lambda acc, other: acc.union(other), vrdds)
rdd_len = len(vrdds)