aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py56
1 files changed, 28 insertions, 28 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 6b6ab6abd9..172ed85fab 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -215,7 +215,7 @@ class RDD(object):
yield pair
return java_cartesian.flatMap(unpack_batches)
- def groupBy(self, f, numSplits=None):
+ def groupBy(self, f, numPartitions=None):
"""
Return an RDD of grouped items.
@@ -224,7 +224,7 @@ class RDD(object):
>>> sorted([(x, sorted(y)) for (x, y) in result])
[(0, [2, 8]), (1, [1, 1, 3, 5])]
"""
- return self.map(lambda x: (f(x), x)).groupByKey(numSplits)
+ return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
def pipe(self, command, env={}):
"""
@@ -274,7 +274,7 @@ class RDD(object):
def reduce(self, f):
"""
- Reduces the elements of this RDD using the specified commutative and
+ Reduces the elements of this RDD using the specified commutative and
associative binary operator.
>>> from operator import add
@@ -422,22 +422,22 @@ class RDD(object):
"""
return dict(self.collect())
- def reduceByKey(self, func, numSplits=None):
+ def reduceByKey(self, func, numPartitions=None):
"""
Merge the values for each key using an associative reduce function.
This will also perform the merging locally on each mapper before
sending results to a reducer, similarly to a "combiner" in MapReduce.
- Output will be hash-partitioned with C{numSplits} splits, or the
- default parallelism level if C{numSplits} is not specified.
+ Output will be hash-partitioned with C{numPartitions} partitions, or
+ the default parallelism level if C{numPartitions} is not specified.
>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]
"""
- return self.combineByKey(lambda x: x, func, func, numSplits)
+ return self.combineByKey(lambda x: x, func, func, numPartitions)
def reduceByKeyLocally(self, func):
"""
@@ -474,7 +474,7 @@ class RDD(object):
"""
return self.map(lambda x: x[0]).countByValue()
- def join(self, other, numSplits=None):
+ def join(self, other, numPartitions=None):
"""
Return an RDD containing all pairs of elements with matching keys in
C{self} and C{other}.
@@ -489,9 +489,9 @@ class RDD(object):
>>> sorted(x.join(y).collect())
[('a', (1, 2)), ('a', (1, 3))]
"""
- return python_join(self, other, numSplits)
+ return python_join(self, other, numPartitions)
- def leftOuterJoin(self, other, numSplits=None):
+ def leftOuterJoin(self, other, numPartitions=None):
"""
Perform a left outer join of C{self} and C{other}.
@@ -506,9 +506,9 @@ class RDD(object):
>>> sorted(x.leftOuterJoin(y).collect())
[('a', (1, 2)), ('b', (4, None))]
"""
- return python_left_outer_join(self, other, numSplits)
+ return python_left_outer_join(self, other, numPartitions)
- def rightOuterJoin(self, other, numSplits=None):
+ def rightOuterJoin(self, other, numPartitions=None):
"""
Perform a right outer join of C{self} and C{other}.
@@ -523,10 +523,10 @@ class RDD(object):
>>> sorted(y.rightOuterJoin(x).collect())
[('a', (2, 1)), ('b', (None, 4))]
"""
- return python_right_outer_join(self, other, numSplits)
+ return python_right_outer_join(self, other, numPartitions)
# TODO: add option to control map-side combining
- def partitionBy(self, numSplits, partitionFunc=hash):
+ def partitionBy(self, numPartitions, partitionFunc=hash):
"""
Return a copy of the RDD partitioned using the specified partitioner.
@@ -535,22 +535,22 @@ class RDD(object):
>>> set(sets[0]).intersection(set(sets[1]))
set([])
"""
- if numSplits is None:
- numSplits = self.ctx.defaultParallelism
+ if numPartitions is None:
+ numPartitions = self.ctx.defaultParallelism
# Transferring O(n) objects to Java is too expensive. Instead, we'll
- # form the hash buckets in Python, transferring O(numSplits) objects
+ # form the hash buckets in Python, transferring O(numPartitions) objects
# to Java. Each object is a (splitNumber, [objects]) pair.
def add_shuffle_key(split, iterator):
buckets = defaultdict(list)
for (k, v) in iterator:
- buckets[partitionFunc(k) % numSplits].append((k, v))
+ buckets[partitionFunc(k) % numPartitions].append((k, v))
for (split, items) in buckets.iteritems():
yield str(split)
yield dump_pickle(Batch(items))
keyed = PipelinedRDD(self, add_shuffle_key)
keyed._bypass_serializer = True
pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
- partitioner = self.ctx._jvm.PythonPartitioner(numSplits,
+ partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
id(partitionFunc))
jrdd = pairRDD.partitionBy(partitioner).values()
rdd = RDD(jrdd, self.ctx)
@@ -561,7 +561,7 @@ class RDD(object):
# TODO: add control over map-side aggregation
def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
- numSplits=None):
+ numPartitions=None):
"""
Generic function to combine the elements for each key using a custom
set of aggregation functions.
@@ -586,8 +586,8 @@ class RDD(object):
>>> sorted(x.combineByKey(str, add, add).collect())
[('a', '11'), ('b', '1')]
"""
- if numSplits is None:
- numSplits = self.ctx.defaultParallelism
+ if numPartitions is None:
+ numPartitions = self.ctx.defaultParallelism
def combineLocally(iterator):
combiners = {}
for (k, v) in iterator:
@@ -597,7 +597,7 @@ class RDD(object):
combiners[k] = mergeValue(combiners[k], v)
return combiners.iteritems()
locally_combined = self.mapPartitions(combineLocally)
- shuffled = locally_combined.partitionBy(numSplits)
+ shuffled = locally_combined.partitionBy(numPartitions)
def _mergeCombiners(iterator):
combiners = {}
for (k, v) in iterator:
@@ -609,10 +609,10 @@ class RDD(object):
return shuffled.mapPartitions(_mergeCombiners)
# TODO: support variant with custom partitioner
- def groupByKey(self, numSplits=None):
+ def groupByKey(self, numPartitions=None):
"""
Group the values for each key in the RDD into a single sequence.
- Hash-partitions the resulting RDD with into numSplits partitions.
+ Hash-partitions the resulting RDD with into numPartitions partitions.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(x.groupByKey().collect())
@@ -630,7 +630,7 @@ class RDD(object):
return a + b
return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
- numSplits)
+ numPartitions)
# TODO: add tests
def flatMapValues(self, f):
@@ -659,7 +659,7 @@ class RDD(object):
return self.cogroup(other)
# TODO: add variant with custom parittioner
- def cogroup(self, other, numSplits=None):
+ def cogroup(self, other, numPartitions=None):
"""
For each key k in C{self} or C{other}, return a resulting RDD that
contains a tuple with the list of values for that key in C{self} as well
@@ -670,7 +670,7 @@ class RDD(object):
>>> sorted(x.cogroup(y).collect())
[('a', ([1], [2])), ('b', ([4], []))]
"""
- return python_cogroup(self, other, numSplits)
+ return python_cogroup(self, other, numPartitions)
# TODO: `lookup` is disabled because we can't make direct comparisons based
# on the key; we need to compare the hash of the key to the hash of the