aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/join.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/join.py')
-rw-r--r--python/pyspark/join.py20
1 files changed, 10 insertions, 10 deletions
diff --git a/python/pyspark/join.py b/python/pyspark/join.py
index 7036c47980..5f4294fb1b 100644
--- a/python/pyspark/join.py
+++ b/python/pyspark/join.py
@@ -32,13 +32,13 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
-def _do_python_join(rdd, other, numSplits, dispatch):
+def _do_python_join(rdd, other, numPartitions, dispatch):
vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v)))
- return vs.union(ws).groupByKey(numSplits).flatMapValues(dispatch)
+ return vs.union(ws).groupByKey(numPartitions).flatMapValues(dispatch)
-def python_join(rdd, other, numSplits):
+def python_join(rdd, other, numPartitions):
def dispatch(seq):
vbuf, wbuf = [], []
for (n, v) in seq:
@@ -47,10 +47,10 @@ def python_join(rdd, other, numSplits):
elif n == 2:
wbuf.append(v)
return [(v, w) for v in vbuf for w in wbuf]
- return _do_python_join(rdd, other, numSplits, dispatch)
+ return _do_python_join(rdd, other, numPartitions, dispatch)
-def python_right_outer_join(rdd, other, numSplits):
+def python_right_outer_join(rdd, other, numPartitions):
def dispatch(seq):
vbuf, wbuf = [], []
for (n, v) in seq:
@@ -61,10 +61,10 @@ def python_right_outer_join(rdd, other, numSplits):
if not vbuf:
vbuf.append(None)
return [(v, w) for v in vbuf for w in wbuf]
- return _do_python_join(rdd, other, numSplits, dispatch)
+ return _do_python_join(rdd, other, numPartitions, dispatch)
-def python_left_outer_join(rdd, other, numSplits):
+def python_left_outer_join(rdd, other, numPartitions):
def dispatch(seq):
vbuf, wbuf = [], []
for (n, v) in seq:
@@ -75,10 +75,10 @@ def python_left_outer_join(rdd, other, numSplits):
if not wbuf:
wbuf.append(None)
return [(v, w) for v in vbuf for w in wbuf]
- return _do_python_join(rdd, other, numSplits, dispatch)
+ return _do_python_join(rdd, other, numPartitions, dispatch)
-def python_cogroup(rdd, other, numSplits):
+def python_cogroup(rdd, other, numPartitions):
vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v)))
def dispatch(seq):
@@ -89,4 +89,4 @@ def python_cogroup(rdd, other, numSplits):
elif n == 2:
wbuf.append(v)
return (vbuf, wbuf)
- return vs.union(ws).groupByKey(numSplits).mapValues(dispatch)
+ return vs.union(ws).groupByKey(numPartitions).mapValues(dispatch)