diff options
author | Aaron Davidson <aaron@databricks.com> | 2014-06-20 00:06:57 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-06-20 00:06:57 -0700 |
commit | f46e02fcdbb3f86a8761c078708388d18282ee0c (patch) | |
tree | 3d4ec7204f5221ad070cb56744b0b45dea5220cc | |
parent | c55bbb49f7ec653f0ff635015d3bc789ca26c4eb (diff) | |
download | spark-f46e02fcdbb3f86a8761c078708388d18282ee0c.tar.gz spark-f46e02fcdbb3f86a8761c078708388d18282ee0c.tar.bz2 spark-f46e02fcdbb3f86a8761c078708388d18282ee0c.zip |
SPARK-2203: PySpark defaults to use same num reduce partitions as map side
For shuffle-based operators, such as rdd.groupBy() or rdd.sortByKey(), PySpark will always assume that the default parallelism to use for the reduce side is ctx.defaultParallelism, which is a constant typically determined by the number of cores in cluster.
In contrast, Spark's Partitioner#defaultPartitioner will use the same number of reduce partitions as map partitions unless the defaultParallelism config is explicitly set. This tends to be a better default in order to avoid OOMs, and should also be the behavior of PySpark.
JIRA: https://issues.apache.org/jira/browse/SPARK-2203
Author: Aaron Davidson <aaron@databricks.com>
Closes #1138 from aarondav/pyfix and squashes the following commits:
1bd5751 [Aaron Davidson] SPARK-2203: PySpark defaults to use same num reduce partitions as map partitions
-rw-r--r-- | python/pyspark/rdd.py | 21 |
1 files changed, 18 insertions, 3 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a0b2c744f0..62a95c8467 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -512,7 +512,7 @@ class RDD(object): [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)] """ if numPartitions is None: - numPartitions = self.ctx.defaultParallelism + numPartitions = self._defaultReducePartitions() bounds = list() @@ -1154,7 +1154,7 @@ class RDD(object): set([]) """ if numPartitions is None: - numPartitions = self.ctx.defaultParallelism + numPartitions = self._defaultReducePartitions() if partitionFunc is None: partitionFunc = lambda x: 0 if x is None else hash(x) @@ -1212,7 +1212,7 @@ class RDD(object): [('a', '11'), ('b', '1')] """ if numPartitions is None: - numPartitions = self.ctx.defaultParallelism + numPartitions = self._defaultReducePartitions() def combineLocally(iterator): combiners = {} for x in iterator: @@ -1475,6 +1475,21 @@ class RDD(object): java_storage_level.replication()) return storage_level + def _defaultReducePartitions(self): + """ + Returns the default number of partitions to use during reduce tasks (e.g., groupBy). + If spark.default.parallelism is set, then we'll use the value from SparkContext + defaultParallelism, otherwise we'll use the number of partitions in this RDD. + + This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce + the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will + be inherent. + """ + if self.ctx._conf.contains("spark.default.parallelism"): + return self.ctx.defaultParallelism + else: + return self.getNumPartitions() + # TODO: `lookup` is disabled because we can't make direct comparisons based # on the key; we need to compare the hash of the key to the hash of the # keys in the pairs. This could be an expensive operation, since those |