aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorHolden Karau <holden@pigscanfly.ca>2015-09-21 23:21:24 -0700
committerDavies Liu <davies.liu@gmail.com>2015-09-21 23:21:24 -0700
commit1cd67415728e660a90e4dbe136272b5d6b8f1142 (patch)
tree5656afe2ba2f26015f8af92be28660662b4d5c8b /python
parentc986e933a900602af47966bd41edb2116c421a39 (diff)
downloadspark-1cd67415728e660a90e4dbe136272b5d6b8f1142.tar.gz
spark-1cd67415728e660a90e4dbe136272b5d6b8f1142.tar.bz2
spark-1cd67415728e660a90e4dbe136272b5d6b8f1142.zip
[SPARK-9821] [PYSPARK] pyspark-reduceByKey-should-take-a-custom-partitioner
from the issue: In Scala, I can supply a custom partitioner to reduceByKey (and other aggregation/repartitioning methods like aggregateByKey and combinedByKey), but as far as I can tell from the Pyspark API, there's no way to do the same in Python. Here's an example of my code in Scala: weblogs.map(s => (getFileType(s), 1)).reduceByKey(new FileTypePartitioner(),_+_) But I can't figure out how to do the same in Python. The closest I can get is to call repartition before reduceByKey like so: weblogs.map(lambda s: (getFileType(s), 1)).partitionBy(3,hash_filetype).reduceByKey(lambda v1,v2: v1+v2).collect() But that defeats the purpose, because I'm shuffling twice instead of once, so my performance is worse instead of better. Author: Holden Karau <holden@pigscanfly.ca> Closes #8569 from holdenk/SPARK-9821-pyspark-reduceByKey-should-take-a-custom-partitioner.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/rdd.py29
1 files changed, 16 insertions, 13 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 73d7d9a569..56e892243c 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -686,7 +686,7 @@ class RDD(object):
other._jrdd_deserializer)
return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
- def groupBy(self, f, numPartitions=None):
+ def groupBy(self, f, numPartitions=None, partitionFunc=portable_hash):
"""
Return an RDD of grouped items.
@@ -695,7 +695,7 @@ class RDD(object):
>>> sorted([(x, sorted(y)) for (x, y) in result])
[(0, [2, 8]), (1, [1, 1, 3, 5])]
"""
- return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
+ return self.map(lambda x: (f(x), x)).groupByKey(numPartitions, partitionFunc)
@ignore_unicode_prefix
def pipe(self, command, env=None, checkCode=False):
@@ -1539,22 +1539,23 @@ class RDD(object):
"""
return self.map(lambda x: x[1])
- def reduceByKey(self, func, numPartitions=None):
+ def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):
"""
Merge the values for each key using an associative reduce function.
This will also perform the merging locally on each mapper before
sending results to a reducer, similarly to a "combiner" in MapReduce.
- Output will be hash-partitioned with C{numPartitions} partitions, or
+ Output will be partitioned with C{numPartitions} partitions, or
the default parallelism level if C{numPartitions} is not specified.
+ Default partitioner is hash-partition.
>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]
"""
- return self.combineByKey(lambda x: x, func, func, numPartitions)
+ return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)
def reduceByKeyLocally(self, func):
"""
@@ -1739,7 +1740,7 @@ class RDD(object):
# TODO: add control over map-side aggregation
def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
- numPartitions=None):
+ numPartitions=None, partitionFunc=portable_hash):
"""
Generic function to combine the elements for each key using a custom
set of aggregation functions.
@@ -1777,7 +1778,7 @@ class RDD(object):
return merger.items()
locally_combined = self.mapPartitions(combineLocally, preservesPartitioning=True)
- shuffled = locally_combined.partitionBy(numPartitions)
+ shuffled = locally_combined.partitionBy(numPartitions, partitionFunc)
def _mergeCombiners(iterator):
merger = ExternalMerger(agg, memory, serializer)
@@ -1786,7 +1787,8 @@ class RDD(object):
return shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True)
- def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None):
+ def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None,
+ partitionFunc=portable_hash):
"""
Aggregate the values of each key, using given combine functions and a neutral
"zero value". This function can return a different result type, U, than the type
@@ -1800,9 +1802,9 @@ class RDD(object):
return copy.deepcopy(zeroValue)
return self.combineByKey(
- lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions)
+ lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions, partitionFunc)
- def foldByKey(self, zeroValue, func, numPartitions=None):
+ def foldByKey(self, zeroValue, func, numPartitions=None, partitionFunc=portable_hash):
"""
Merge the values for each key using an associative function "func"
and a neutral "zeroValue" which may be added to the result an
@@ -1817,13 +1819,14 @@ class RDD(object):
def createZero():
return copy.deepcopy(zeroValue)
- return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions)
+ return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions,
+ partitionFunc)
def _memory_limit(self):
return _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
# TODO: support variant with custom partitioner
- def groupByKey(self, numPartitions=None):
+ def groupByKey(self, numPartitions=None, partitionFunc=portable_hash):
"""
Group the values for each key in the RDD into a single sequence.
Hash-partitions the resulting RDD with numPartitions partitions.
@@ -1859,7 +1862,7 @@ class RDD(object):
return merger.items()
locally_combined = self.mapPartitions(combine, preservesPartitioning=True)
- shuffled = locally_combined.partitionBy(numPartitions)
+ shuffled = locally_combined.partitionBy(numPartitions, partitionFunc)
def groupByKey(it):
merger = ExternalGroupBy(agg, memory, serializer)