From 8edc9d0330c94b50e01956ae88693cff4e0977b2 Mon Sep 17 00:00:00 2001 From: Erik Selin Date: Tue, 3 Jun 2014 13:31:16 -0700 Subject: [SPARK-1468] Modify the partition function used by partitionBy. Make partitionBy use a tweaked version of hash as its default partition function since the python hash function does not consistently assign the same value to None across python processes. Associated JIRA at https://issues.apache.org/jira/browse/SPARK-1468 Author: Erik Selin Closes #371 from tyro89/consistent_hashing and squashes the following commits: 201c301 [Erik Selin] Make partitionBy use a tweaked version of hash as its default partition function since the python hash function does not consistently assign the same value to None across python processes. --- python/pyspark/rdd.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'python') diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index f3b1f1a665..1b3c460dd6 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1062,7 +1062,7 @@ class RDD(object): return python_right_outer_join(self, other, numPartitions) # TODO: add option to control map-side combining - def partitionBy(self, numPartitions, partitionFunc=hash): + def partitionBy(self, numPartitions, partitionFunc=None): """ Return a copy of the RDD partitioned using the specified partitioner. @@ -1073,6 +1073,9 @@ class RDD(object): """ if numPartitions is None: numPartitions = self.ctx.defaultParallelism + + if partitionFunc is None: + partitionFunc = lambda x: 0 if x is None else hash(x) # Transferring O(n) objects to Java is too expensive. Instead, we'll # form the hash buckets in Python, transferring O(numPartitions) objects # to Java. Each object is a (splitNumber, [objects]) pair. -- cgit v1.2.3