aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-07-21 11:59:54 -0700
committerMatei Zaharia <matei@databricks.com>2014-07-21 11:59:54 -0700
commit872538c600a452ead52638c1ccba90643a9fa41c (patch)
tree858c6073e8bd0393d8b479343cb859775f98a81e /python/pyspark/rdd.py
parentf89cf65d7aced0bb387c05586f9f51cb29865022 (diff)
downloadspark-872538c600a452ead52638c1ccba90643a9fa41c.tar.gz
spark-872538c600a452ead52638c1ccba90643a9fa41c.tar.bz2
spark-872538c600a452ead52638c1ccba90643a9fa41c.zip
[SPARK-2494] [PySpark] make hash of None consistant cross machines
In CPython, hash of None is different cross machines, it will cause wrong result during shuffle. This PR will fix this. Author: Davies Liu <davies.liu@gmail.com> Closes #1371 from davies/hash_of_none and squashes the following commits: d01745f [Davies Liu] add comments, remove outdated unit tests 5467141 [Davies Liu] disable hijack of hash, use it only for partitionBy() b7118aa [Davies Liu] use __builtin__ instead of __builtins__ 839e417 [Davies Liu] hijack hash to make hash of None consistant cross machines
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py35
1 files changed, 32 insertions, 3 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 0c35c66680..94ba22306a 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -48,6 +48,35 @@ from py4j.java_collections import ListConverter, MapConverter
__all__ = ["RDD"]
+# TODO: for Python 3.3+, PYTHONHASHSEED should be reset to disable randomized
+# hash for string
+def portable_hash(x):
+ """
+ This function returns consistant hash code for builtin types, especially
+ for None and tuple with None.
+
+ The algrithm is similar to that one used by CPython 2.7
+
+ >>> portable_hash(None)
+ 0
+ >>> portable_hash((None, 1))
+ 219750521
+ """
+ if x is None:
+ return 0
+ if isinstance(x, tuple):
+ h = 0x345678
+ for i in x:
+ h ^= portable_hash(i)
+ h *= 1000003
+ h &= 0xffffffff
+ h ^= len(x)
+ if h == -1:
+ h = -2
+ return h
+ return hash(x)
+
+
def _extract_concise_traceback():
"""
This function returns the traceback info for a callsite, returns a dict
@@ -1164,7 +1193,9 @@ class RDD(object):
return python_right_outer_join(self, other, numPartitions)
# TODO: add option to control map-side combining
- def partitionBy(self, numPartitions, partitionFunc=None):
+ # portable_hash is used as default, because builtin hash of None is different
+ # cross machines.
+ def partitionBy(self, numPartitions, partitionFunc=portable_hash):
"""
Return a copy of the RDD partitioned using the specified partitioner.
@@ -1176,8 +1207,6 @@ class RDD(object):
if numPartitions is None:
numPartitions = self._defaultReducePartitions()
- if partitionFunc is None:
- partitionFunc = lambda x: 0 if x is None else hash(x)
# Transferring O(n) objects to Java is too expensive. Instead, we'll
# form the hash buckets in Python, transferring O(numPartitions) objects
# to Java. Each object is a (splitNumber, [objects]) pair.