aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/join.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/join.py')
-rw-r--r--python/pyspark/join.py5
1 files changed, 3 insertions, 2 deletions
diff --git a/python/pyspark/join.py b/python/pyspark/join.py
index 5f4294fb1b..6f94d26ef8 100644
--- a/python/pyspark/join.py
+++ b/python/pyspark/join.py
@@ -31,11 +31,12 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
+from pyspark.resultiterable import ResultIterable
def _do_python_join(rdd, other, numPartitions, dispatch):
vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v)))
- return vs.union(ws).groupByKey(numPartitions).flatMapValues(dispatch)
+ return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x : dispatch(x.__iter__()))
def python_join(rdd, other, numPartitions):
@@ -88,5 +89,5 @@ def python_cogroup(rdd, other, numPartitions):
vbuf.append(v)
elif n == 2:
wbuf.append(v)
- return (vbuf, wbuf)
+ return (ResultIterable(vbuf), ResultIterable(wbuf))
return vs.union(ws).groupByKey(numPartitions).mapValues(dispatch)