aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2015-04-09 17:07:23 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-04-09 17:07:23 -0700
commitb5c51c8df480f1a82a82e4d597d8eea631bffb4e (patch)
tree7842078dd2b5d8dab92a129725353647136e8a9f /python/pyspark/rdd.py
parent9c67049b4ef416a80803ccb958bbac1dd02cc380 (diff)
downloadspark-b5c51c8df480f1a82a82e4d597d8eea631bffb4e.tar.gz
spark-b5c51c8df480f1a82a82e4d597d8eea631bffb4e.tar.bz2
spark-b5c51c8df480f1a82a82e4d597d8eea631bffb4e.zip
[SPARK-3074] [PySpark] support groupByKey() with single huge key
This patch change groupByKey() to use external sort based approach, so it can support single huge key. For example, it can group by a dataset including one hot key with 40 millions values (strings), using 500M memory for Python worker, finished in about 2 minutes. (it will need 6G memory in hash based approach). During groupByKey(), it will do in-memory groupBy first. If the dataset can not fit in memory, then data will be partitioned by hash. If one partition still can not fit in memory, it will switch to sort based groupBy(). Author: Davies Liu <davies.liu@gmail.com> Author: Davies Liu <davies@databricks.com> Closes #1977 from davies/groupby and squashes the following commits: af3713a [Davies Liu] make sure it's iterator 67772dd [Davies Liu] fix tests e78c15c [Davies Liu] address comments 0b0fde8 [Davies Liu] address comments 0dcf320 [Davies Liu] address comments, rollback changes in ResultIterable e3b8eab [Davies Liu] fix narrow dependency 2a1857a [Davies Liu] typo d2f053b [Davies Liu] add repr for FlattedValuesSerializer c6a2f8d [Davies Liu] address comments 9e2df24 [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby 2b9c261 [Davies Liu] fix typo in comments 70aadcd [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby a14b4bd [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby ab5515b [Davies Liu] Merge branch 'master' into groupby 651f891 [Davies Liu] simplify GroupByKey 1578f2e [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby 1f69f93 [Davies Liu] fix tests 0d3395f [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby 341f1e0 [Davies Liu] add comments, refactor 47918b8 [Davies Liu] remove unused code 6540948 [Davies Liu] address comments: 17f4ec6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby 4d4bc86 [Davies Liu] bugfix 8ef965e [Davies Liu] Merge branch 'master' into groupby fbc504a [Davies Liu] Merge branch 'master' into groupby 779ed03 [Davies Liu] fix merge conflict 2c1d05b [Davies Liu] refactor, minor turning b48cda5 [Davies Liu] Merge branch 'master' into groupby 85138e6 [Davies Liu] Merge branch 'master' into groupby acd8e1b [Davies Liu] fix memory when groupByKey().count() 905b233 [Davies Liu] Merge branch 'sort' into groupby 1f075ed [Davies Liu] Merge branch 'master' into sort 4b07d39 [Davies Liu] compress the data while spilling 0a081c6 [Davies Liu] Merge branch 'master' into groupby f157fe7 [Davies Liu] Merge branch 'sort' into groupby eb53ca6 [Davies Liu] Merge branch 'master' into sort b2dc3bf [Davies Liu] Merge branch 'sort' into groupby 644abaf [Davies Liu] add license in LICENSE 19f7873 [Davies Liu] improve tests 11ba318 [Davies Liu] typo 085aef8 [Davies Liu] Merge branch 'master' into groupby 3ee58e5 [Davies Liu] switch to sort based groupBy, based on size of data 1ea0669 [Davies Liu] choose sort based groupByKey() automatically b40bae7 [Davies Liu] bugfix efa23df [Davies Liu] refactor, add spark.shuffle.sort=False 250be4e [Davies Liu] flatten the combined values when dumping into disks d05060d [Davies Liu] group the same key before shuffle, reduce the comparison during sorting 083d842 [Davies Liu] sorted based groupByKey() 55602ee [Davies Liu] use external sort in sortBy() and sortByKey()
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py48
1 files changed, 36 insertions, 12 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 2d05611321..1b18789040 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -41,7 +41,7 @@ from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler
from pyspark.storagelevel import StorageLevel
from pyspark.resultiterable import ResultIterable
from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \
- get_used_memory, ExternalSorter
+ get_used_memory, ExternalSorter, ExternalGroupBy
from pyspark.traceback_utils import SCCallSiteSync
from py4j.java_collections import ListConverter, MapConverter
@@ -573,8 +573,8 @@ class RDD(object):
if numPartitions is None:
numPartitions = self._defaultReducePartitions()
- spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == 'true')
- memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
+ spill = self._can_spill()
+ memory = self._memory_limit()
serializer = self._jrdd_deserializer
def sortPartition(iterator):
@@ -1699,10 +1699,8 @@ class RDD(object):
numPartitions = self._defaultReducePartitions()
serializer = self.ctx.serializer
- spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower()
- == 'true')
- memory = _parse_memory(self.ctx._conf.get(
- "spark.python.worker.memory", "512m"))
+ spill = self._can_spill()
+ memory = self._memory_limit()
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
def combineLocally(iterator):
@@ -1755,21 +1753,28 @@ class RDD(object):
return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions)
+ def _can_spill(self):
+ return self.ctx._conf.get("spark.shuffle.spill", "True").lower() == "true"
+
+ def _memory_limit(self):
+ return _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
+
# TODO: support variant with custom partitioner
def groupByKey(self, numPartitions=None):
"""
Group the values for each key in the RDD into a single sequence.
- Hash-partitions the resulting RDD with into numPartitions partitions.
+ Hash-partitions the resulting RDD with numPartitions partitions.
Note: If you are grouping in order to perform an aggregation (such as a
sum or average) over each key, using reduceByKey or aggregateByKey will
provide much better performance.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
- >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
+ >>> sorted(x.groupByKey().mapValues(len).collect())
+ [('a', 2), ('b', 1)]
+ >>> sorted(x.groupByKey().mapValues(list).collect())
[('a', [1, 1]), ('b', [1])]
"""
-
def createCombiner(x):
return [x]
@@ -1781,8 +1786,27 @@ class RDD(object):
a.extend(b)
return a
- return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
- numPartitions).mapValues(lambda x: ResultIterable(x))
+ spill = self._can_spill()
+ memory = self._memory_limit()
+ serializer = self._jrdd_deserializer
+ agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
+
+ def combine(iterator):
+ merger = ExternalMerger(agg, memory * 0.9, serializer) \
+ if spill else InMemoryMerger(agg)
+ merger.mergeValues(iterator)
+ return merger.iteritems()
+
+ locally_combined = self.mapPartitions(combine, preservesPartitioning=True)
+ shuffled = locally_combined.partitionBy(numPartitions)
+
+ def groupByKey(it):
+ merger = ExternalGroupBy(agg, memory, serializer)\
+ if spill else InMemoryMerger(agg)
+ merger.mergeCombiners(it)
+ return merger.iteritems()
+
+ return shuffled.mapPartitions(groupByKey, True).mapValues(ResultIterable)
def flatMapValues(self, f):
"""