aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorVyacheslav Baranov <slavik.baranov@gmail.com>2015-06-17 09:42:29 +0100
committerSean Owen <sowen@cloudera.com>2015-06-17 09:42:29 +0100
commitc13da20a55b80b8632d547240d2c8f97539969a1 (patch)
tree8c413e96a53fe13054614848a68841704c1a6ad7 /core/src/main/scala/org
parente3de14d3b20bff92a4d82ac99825fcb5180fdccc (diff)
downloadspark-c13da20a55b80b8632d547240d2c8f97539969a1.tar.gz
spark-c13da20a55b80b8632d547240d2c8f97539969a1.tar.bz2
spark-c13da20a55b80b8632d547240d2c8f97539969a1.zip
[SPARK-8309] [CORE] Support for more than 12M items in OpenHashMap
The problem occurs because the position mask `0xEFFFFFF` is incorrect. It has zero 25th bit, so when capacity grows beyond 2^24, `OpenHashMap` calculates incorrect index of value in `_values` array. I've also added a size check in `rehash()`, so that it fails instead of reporting invalid item indices. Author: Vyacheslav Baranov <slavik.baranov@gmail.com> Closes #6763 from SlavikBaranov/SPARK-8309 and squashes the following commits: 8557445 [Vyacheslav Baranov] Resolved review comments 4d5b954 [Vyacheslav Baranov] Resolved review comments eaf1e68 [Vyacheslav Baranov] Fixed failing test f9284fd [Vyacheslav Baranov] Resolved review comments 3920656 [Vyacheslav Baranov] SPARK-8309: Support for more than 12M items in OpenHashMap
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala10
1 files changed, 7 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
index 64e7102e36..60bf4dd746 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@ -45,7 +45,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
loadFactor: Double)
extends Serializable {
- require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
+ require(initialCapacity <= OpenHashSet.MAX_CAPACITY,
+ s"Can't make capacity bigger than ${OpenHashSet.MAX_CAPACITY} elements")
require(initialCapacity >= 1, "Invalid initial capacity")
require(loadFactor < 1.0, "Load factor must be less than 1.0")
require(loadFactor > 0.0, "Load factor must be greater than 0.0")
@@ -223,6 +224,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
*/
private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
val newCapacity = _capacity * 2
+ require(newCapacity > 0 && newCapacity <= OpenHashSet.MAX_CAPACITY,
+ s"Can't contain more than ${(loadFactor * OpenHashSet.MAX_CAPACITY).toInt} elements")
allocateFunc(newCapacity)
val newBitset = new BitSet(newCapacity)
val newData = new Array[T](newCapacity)
@@ -276,9 +279,10 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
private[spark]
object OpenHashSet {
+ val MAX_CAPACITY = 1 << 30
val INVALID_POS = -1
- val NONEXISTENCE_MASK = 0x80000000
- val POSITION_MASK = 0xEFFFFFF
+ val NONEXISTENCE_MASK = 1 << 31
+ val POSITION_MASK = (1 << 31) - 1
/**
* A set of specialized hash function implementation to avoid boxing hash code computation