From 872538c600a452ead52638c1ccba90643a9fa41c Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 21 Jul 2014 11:59:54 -0700 Subject: [SPARK-2494] [PySpark] make hash of None consistant cross machines In CPython, hash of None is different cross machines, it will cause wrong result during shuffle. This PR will fix this. Author: Davies Liu Closes #1371 from davies/hash_of_none and squashes the following commits: d01745f [Davies Liu] add comments, remove outdated unit tests 5467141 [Davies Liu] disable hijack of hash, use it only for partitionBy() b7118aa [Davies Liu] use __builtin__ instead of __builtins__ 839e417 [Davies Liu] hijack hash to make hash of None consistant cross machines --- python/pyspark/rdd.py | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) (limited to 'python') diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 0c35c66680..94ba22306a 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -48,6 +48,35 @@ from py4j.java_collections import ListConverter, MapConverter __all__ = ["RDD"] +# TODO: for Python 3.3+, PYTHONHASHSEED should be reset to disable randomized +# hash for string +def portable_hash(x): + """ + This function returns consistant hash code for builtin types, especially + for None and tuple with None. + + The algrithm is similar to that one used by CPython 2.7 + + >>> portable_hash(None) + 0 + >>> portable_hash((None, 1)) + 219750521 + """ + if x is None: + return 0 + if isinstance(x, tuple): + h = 0x345678 + for i in x: + h ^= portable_hash(i) + h *= 1000003 + h &= 0xffffffff + h ^= len(x) + if h == -1: + h = -2 + return h + return hash(x) + + def _extract_concise_traceback(): """ This function returns the traceback info for a callsite, returns a dict @@ -1164,7 +1193,9 @@ class RDD(object): return python_right_outer_join(self, other, numPartitions) # TODO: add option to control map-side combining - def partitionBy(self, numPartitions, partitionFunc=None): + # portable_hash is used as default, because builtin hash of None is different + # cross machines. + def partitionBy(self, numPartitions, partitionFunc=portable_hash): """ Return a copy of the RDD partitioned using the specified partitioner. @@ -1176,8 +1207,6 @@ class RDD(object): if numPartitions is None: numPartitions = self._defaultReducePartitions() - if partitionFunc is None: - partitionFunc = lambda x: 0 if x is None else hash(x) # Transferring O(n) objects to Java is too expensive. Instead, we'll # form the hash buckets in Python, transferring O(numPartitions) objects # to Java. Each object is a (splitNumber, [objects]) pair. -- cgit v1.2.3