aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorErik Selin <erik.selin@jadedpixel.com>2014-06-03 13:31:16 -0700
committerMatei Zaharia <matei@databricks.com>2014-06-03 13:31:23 -0700
commit350cfd311a0b93638a8f73900fcd06372d89d99d (patch)
tree8898fe181b6392d75c4a7d00de2f2829da67c6de /python
parent316d026f4d1a3dbd5c29dc2849d32d889b7ca59e (diff)
downloadspark-350cfd311a0b93638a8f73900fcd06372d89d99d.tar.gz
spark-350cfd311a0b93638a8f73900fcd06372d89d99d.tar.bz2
spark-350cfd311a0b93638a8f73900fcd06372d89d99d.zip
[SPARK-1468] Modify the partition function used by partitionBy.
Make partitionBy use a tweaked version of hash as its default partition function since the python hash function does not consistently assign the same value to None across python processes. Associated JIRA at https://issues.apache.org/jira/browse/SPARK-1468 Author: Erik Selin <erik.selin@jadedpixel.com> Closes #371 from tyro89/consistent_hashing and squashes the following commits: 201c301 [Erik Selin] Make partitionBy use a tweaked version of hash as its default partition function since the python hash function does not consistently assign the same value to None across python processes. (cherry picked from commit 8edc9d0330c94b50e01956ae88693cff4e0977b2) Signed-off-by: Matei Zaharia <matei@databricks.com>
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/rdd.py5
1 files changed, 4 insertions, 1 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 07578b8d93..efdb6de245 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1045,7 +1045,7 @@ class RDD(object):
return python_right_outer_join(self, other, numPartitions)
# TODO: add option to control map-side combining
- def partitionBy(self, numPartitions, partitionFunc=hash):
+ def partitionBy(self, numPartitions, partitionFunc=None):
"""
Return a copy of the RDD partitioned using the specified partitioner.
@@ -1056,6 +1056,9 @@ class RDD(object):
"""
if numPartitions is None:
numPartitions = self.ctx.defaultParallelism
+
+ if partitionFunc is None:
+ partitionFunc = lambda x: 0 if x is None else hash(x)
# Transferring O(n) objects to Java is too expensive. Instead, we'll
# form the hash buckets in Python, transferring O(numPartitions) objects
# to Java. Each object is a (splitNumber, [objects]) pair.