diff options
author | zsxwing <zsxwing@gmail.com> | 2015-06-17 14:03:15 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-06-17 14:03:15 -0700 |
commit | a411a40de2209c56e898e3fb4af955d7b55af11c (patch) | |
tree | 82921029e9b268c01f920ee3688c2a8edacb347d /core/src | |
parent | 0fc4b96f3e3bf81724ac133a6acc97c1b77271b4 (diff) | |
download | spark-a411a40de2209c56e898e3fb4af955d7b55af11c.tar.gz spark-a411a40de2209c56e898e3fb4af955d7b55af11c.tar.bz2 spark-a411a40de2209c56e898e3fb4af955d7b55af11c.zip |
[SPARK-7913] [CORE] Increase the maximum capacity of PartitionedPairBuffe, PartitionedSerializedPairBuffer and AppendOnlyMap
The previous growing strategy is alway doubling the capacity.
This PR adjusts the growing strategy: doubling the capacity but if overflow, use the maximum capacity as the new capacity. It increases the maximum capacity of PartitionedPairBuffer from `2 ^ 29` to `2 ^ 30 - 1`, the maximum capacity of PartitionedSerializedPairBuffer from `2 ^ 28` to `(2 ^ 29) - 1`, and the maximum capacity of AppendOnlyMap from `0.7 * (2 ^ 29)` to `(2 ^ 29)`.
Author: zsxwing <zsxwing@gmail.com>
Closes #6456 from zsxwing/SPARK-7913 and squashes the following commits:
abcb932 [zsxwing] Address comments
e30b61b [zsxwing] Increase the maximum capacity of AppendOnlyMap
05b6420 [zsxwing] Update the exception message
64fe227 [zsxwing] Increase the maximum capacity of PartitionedPairBuffer and PartitionedSerializedPairBuffer
Diffstat (limited to 'core/src')
3 files changed, 53 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala index 290282c9c2..d215ee43cb 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala @@ -32,12 +32,18 @@ import org.apache.spark.annotation.DeveloperApi * size, which is guaranteed to explore all spaces for each key (see * http://en.wikipedia.org/wiki/Quadratic_probing). * + * The map can support up to `536870912 (2 ^ 29)` elements. + * * TODO: Cache the hash values of each key? java.util.HashMap does that. */ @DeveloperApi class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] with Serializable { - require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements") + + import AppendOnlyMap._ + + require(initialCapacity <= MAXIMUM_CAPACITY, + s"Can't make capacity bigger than ${MAXIMUM_CAPACITY} elements") require(initialCapacity >= 1, "Invalid initial capacity") private val LOAD_FACTOR = 0.7 @@ -193,8 +199,11 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) /** Increase table size by 1, rehashing if necessary */ private def incrementSize() { + if (curSize == MAXIMUM_CAPACITY) { + throw new IllegalStateException(s"Can't put more that ${MAXIMUM_CAPACITY} elements") + } curSize += 1 - if (curSize > growThreshold) { + if (curSize > growThreshold && capacity < MAXIMUM_CAPACITY) { growTable() } } @@ -206,12 +215,8 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) /** Double the table's size and re-hash everything */ protected def growTable() { - val newCapacity = capacity * 2 - if (newCapacity >= (1 << 30)) { - // We can't make the table this big because we want an array of 2x - // that size for our data, but array sizes are at most Int.MaxValue - throw new Exception("Can't make capacity bigger than 2^29 elements") - } + // capacity < MAXIMUM_CAPACITY (2 ^ 29) so capacity * 2 won't overflow + val newCapacity = (capacity * 2).min(MAXIMUM_CAPACITY) val newData = new Array[AnyRef](2 * newCapacity) val newMask = newCapacity - 1 // Insert all our old values into the new array. Note that because our old keys are @@ -292,3 +297,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) */ def atGrowThreshold: Boolean = curSize == growThreshold } + +private object AppendOnlyMap { + val MAXIMUM_CAPACITY = (1 << 29) +} diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala index 5a6e9a9580..04bb7fc78c 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala @@ -25,11 +25,16 @@ import org.apache.spark.util.collection.WritablePartitionedPairCollection._ /** * Append-only buffer of key-value pairs, each with a corresponding partition ID, that keeps track * of its estimated size in bytes. + * + * The buffer can support up to `1073741823 (2 ^ 30 - 1)` elements. */ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) extends WritablePartitionedPairCollection[K, V] with SizeTracker { - require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements") + import PartitionedPairBuffer._ + + require(initialCapacity <= MAXIMUM_CAPACITY, + s"Can't make capacity bigger than ${MAXIMUM_CAPACITY} elements") require(initialCapacity >= 1, "Invalid initial capacity") // Basic growable array data structure. We use a single array of AnyRef to hold both the keys @@ -51,11 +56,15 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) /** Double the size of the array because we've reached capacity */ private def growArray(): Unit = { - if (capacity == (1 << 29)) { - // Doubling the capacity would create an array bigger than Int.MaxValue, so don't - throw new Exception("Can't grow buffer beyond 2^29 elements") + if (capacity >= MAXIMUM_CAPACITY) { + throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_CAPACITY} elements") } - val newCapacity = capacity * 2 + val newCapacity = + if (capacity * 2 < 0 || capacity * 2 > MAXIMUM_CAPACITY) { // Overflow + MAXIMUM_CAPACITY + } else { + capacity * 2 + } val newArray = new Array[AnyRef](2 * newCapacity) System.arraycopy(data, 0, newArray, 0, 2 * capacity) data = newArray @@ -86,3 +95,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) } } } + +private object PartitionedPairBuffer { + val MAXIMUM_CAPACITY = Int.MaxValue / 2 // 2 ^ 30 - 1 +} diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala index 862408b7a4..ae9a48729e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala @@ -48,6 +48,8 @@ import org.apache.spark.util.collection.PartitionedSerializedPairBuffer._ * | keyStart | keyValLen | partitionId | * +-------------+------------+------------+-------------+ * + * The buffer can support up to `536870911 (2 ^ 29 - 1)` records. + * * @param metaInitialRecords The initial number of entries in the metadata buffer. * @param kvBlockSize The size of each byte buffer in the ChainedBuffer used to store the records. * @param serializerInstance the serializer used for serializing inserted records. @@ -63,6 +65,8 @@ private[spark] class PartitionedSerializedPairBuffer[K, V]( " Java-serialized objects.") } + require(metaInitialRecords <= MAXIMUM_RECORDS, + s"Can't make capacity bigger than ${MAXIMUM_RECORDS} records") private var metaBuffer = IntBuffer.allocate(metaInitialRecords * RECORD_SIZE) private val kvBuffer: ChainedBuffer = new ChainedBuffer(kvBlockSize) @@ -89,11 +93,17 @@ private[spark] class PartitionedSerializedPairBuffer[K, V]( /** Double the size of the array because we've reached capacity */ private def growMetaBuffer(): Unit = { - if (metaBuffer.capacity.toLong * 2 > Int.MaxValue) { - // Doubling the capacity would create an array bigger than Int.MaxValue, so don't - throw new Exception(s"Can't grow buffer beyond ${Int.MaxValue} bytes") + if (metaBuffer.capacity >= MAXIMUM_META_BUFFER_CAPACITY) { + throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_RECORDS} records") } - val newMetaBuffer = IntBuffer.allocate(metaBuffer.capacity * 2) + val newCapacity = + if (metaBuffer.capacity * 2 < 0 || metaBuffer.capacity * 2 > MAXIMUM_META_BUFFER_CAPACITY) { + // Overflow + MAXIMUM_META_BUFFER_CAPACITY + } else { + metaBuffer.capacity * 2 + } + val newMetaBuffer = IntBuffer.allocate(newCapacity) newMetaBuffer.put(metaBuffer.array) metaBuffer = newMetaBuffer } @@ -247,12 +257,15 @@ private[spark] class SerializedSortDataFormat extends SortDataFormat[Int, IntBuf } } -private[spark] object PartitionedSerializedPairBuffer { +private object PartitionedSerializedPairBuffer { val KEY_START = 0 // keyStart, a long, gets split across two ints val KEY_VAL_LEN = 2 val PARTITION = 3 val RECORD_SIZE = PARTITION + 1 // num ints of metadata + val MAXIMUM_RECORDS = Int.MaxValue / RECORD_SIZE // (2 ^ 29) - 1 + val MAXIMUM_META_BUFFER_CAPACITY = MAXIMUM_RECORDS * RECORD_SIZE // (2 ^ 31) - 4 + def getKeyStartPos(metaBuffer: IntBuffer, metaBufferPos: Int): Long = { val lower32 = metaBuffer.get(metaBufferPos + KEY_START) val upper32 = metaBuffer.get(metaBufferPos + KEY_START + 1) |