aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/join.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/join.py')
-rw-r--r--python/pyspark/join.py4
1 files changed, 3 insertions, 1 deletions
diff --git a/python/pyspark/join.py b/python/pyspark/join.py
index 5f3a7e71f7..b0f1cc1927 100644
--- a/python/pyspark/join.py
+++ b/python/pyspark/join.py
@@ -33,10 +33,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from pyspark.resultiterable import ResultIterable
+
def _do_python_join(rdd, other, numPartitions, dispatch):
vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v)))
- return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x : dispatch(x.__iter__()))
+ return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x: dispatch(x.__iter__()))
def python_join(rdd, other, numPartitions):
@@ -85,6 +86,7 @@ def python_cogroup(rdds, numPartitions):
vrdds = [rdd.map(make_mapper(i)) for i, rdd in enumerate(rdds)]
union_vrdds = reduce(lambda acc, other: acc.union(other), vrdds)
rdd_len = len(vrdds)
+
def dispatch(seq):
bufs = [[] for i in range(rdd_len)]
for (n, v) in seq: