aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-06-17 14:03:15 -0700
committerAndrew Or <andrew@databricks.com>2015-06-17 14:03:15 -0700
commita411a40de2209c56e898e3fb4af955d7b55af11c (patch)
tree82921029e9b268c01f920ee3688c2a8edacb347d /core
parent0fc4b96f3e3bf81724ac133a6acc97c1b77271b4 (diff)
downloadspark-a411a40de2209c56e898e3fb4af955d7b55af11c.tar.gz
spark-a411a40de2209c56e898e3fb4af955d7b55af11c.tar.bz2
spark-a411a40de2209c56e898e3fb4af955d7b55af11c.zip
[SPARK-7913] [CORE] Increase the maximum capacity of PartitionedPairBuffe, PartitionedSerializedPairBuffer and AppendOnlyMap
The previous growing strategy is alway doubling the capacity. This PR adjusts the growing strategy: doubling the capacity but if overflow, use the maximum capacity as the new capacity. It increases the maximum capacity of PartitionedPairBuffer from `2 ^ 29` to `2 ^ 30 - 1`, the maximum capacity of PartitionedSerializedPairBuffer from `2 ^ 28` to `(2 ^ 29) - 1`, and the maximum capacity of AppendOnlyMap from `0.7 * (2 ^ 29)` to `(2 ^ 29)`. Author: zsxwing <zsxwing@gmail.com> Closes #6456 from zsxwing/SPARK-7913 and squashes the following commits: abcb932 [zsxwing] Address comments e30b61b [zsxwing] Increase the maximum capacity of AppendOnlyMap 05b6420 [zsxwing] Update the exception message 64fe227 [zsxwing] Increase the maximum capacity of PartitionedPairBuffer and PartitionedSerializedPairBuffer
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala23
3 files changed, 53 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index 290282c9c2..d215ee43cb 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -32,12 +32,18 @@ import org.apache.spark.annotation.DeveloperApi
* size, which is guaranteed to explore all spaces for each key (see
* http://en.wikipedia.org/wiki/Quadratic_probing).
*
+ * The map can support up to `536870912 (2 ^ 29)` elements.
+ *
* TODO: Cache the hash values of each key? java.util.HashMap does that.
*/
@DeveloperApi
class AppendOnlyMap[K, V](initialCapacity: Int = 64)
extends Iterable[(K, V)] with Serializable {
- require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
+
+ import AppendOnlyMap._
+
+ require(initialCapacity <= MAXIMUM_CAPACITY,
+ s"Can't make capacity bigger than ${MAXIMUM_CAPACITY} elements")
require(initialCapacity >= 1, "Invalid initial capacity")
private val LOAD_FACTOR = 0.7
@@ -193,8 +199,11 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
/** Increase table size by 1, rehashing if necessary */
private def incrementSize() {
+ if (curSize == MAXIMUM_CAPACITY) {
+ throw new IllegalStateException(s"Can't put more that ${MAXIMUM_CAPACITY} elements")
+ }
curSize += 1
- if (curSize > growThreshold) {
+ if (curSize > growThreshold && capacity < MAXIMUM_CAPACITY) {
growTable()
}
}
@@ -206,12 +215,8 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
/** Double the table's size and re-hash everything */
protected def growTable() {
- val newCapacity = capacity * 2
- if (newCapacity >= (1 << 30)) {
- // We can't make the table this big because we want an array of 2x
- // that size for our data, but array sizes are at most Int.MaxValue
- throw new Exception("Can't make capacity bigger than 2^29 elements")
- }
+ // capacity < MAXIMUM_CAPACITY (2 ^ 29) so capacity * 2 won't overflow
+ val newCapacity = (capacity * 2).min(MAXIMUM_CAPACITY)
val newData = new Array[AnyRef](2 * newCapacity)
val newMask = newCapacity - 1
// Insert all our old values into the new array. Note that because our old keys are
@@ -292,3 +297,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
*/
def atGrowThreshold: Boolean = curSize == growThreshold
}
+
+private object AppendOnlyMap {
+ val MAXIMUM_CAPACITY = (1 << 29)
+}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala
index 5a6e9a9580..04bb7fc78c 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala
@@ -25,11 +25,16 @@ import org.apache.spark.util.collection.WritablePartitionedPairCollection._
/**
* Append-only buffer of key-value pairs, each with a corresponding partition ID, that keeps track
* of its estimated size in bytes.
+ *
+ * The buffer can support up to `1073741823 (2 ^ 30 - 1)` elements.
*/
private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
extends WritablePartitionedPairCollection[K, V] with SizeTracker
{
- require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
+ import PartitionedPairBuffer._
+
+ require(initialCapacity <= MAXIMUM_CAPACITY,
+ s"Can't make capacity bigger than ${MAXIMUM_CAPACITY} elements")
require(initialCapacity >= 1, "Invalid initial capacity")
// Basic growable array data structure. We use a single array of AnyRef to hold both the keys
@@ -51,11 +56,15 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
/** Double the size of the array because we've reached capacity */
private def growArray(): Unit = {
- if (capacity == (1 << 29)) {
- // Doubling the capacity would create an array bigger than Int.MaxValue, so don't
- throw new Exception("Can't grow buffer beyond 2^29 elements")
+ if (capacity >= MAXIMUM_CAPACITY) {
+ throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_CAPACITY} elements")
}
- val newCapacity = capacity * 2
+ val newCapacity =
+ if (capacity * 2 < 0 || capacity * 2 > MAXIMUM_CAPACITY) { // Overflow
+ MAXIMUM_CAPACITY
+ } else {
+ capacity * 2
+ }
val newArray = new Array[AnyRef](2 * newCapacity)
System.arraycopy(data, 0, newArray, 0, 2 * capacity)
data = newArray
@@ -86,3 +95,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
}
}
}
+
+private object PartitionedPairBuffer {
+ val MAXIMUM_CAPACITY = Int.MaxValue / 2 // 2 ^ 30 - 1
+}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala
index 862408b7a4..ae9a48729e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala
@@ -48,6 +48,8 @@ import org.apache.spark.util.collection.PartitionedSerializedPairBuffer._
* | keyStart | keyValLen | partitionId |
* +-------------+------------+------------+-------------+
*
+ * The buffer can support up to `536870911 (2 ^ 29 - 1)` records.
+ *
* @param metaInitialRecords The initial number of entries in the metadata buffer.
* @param kvBlockSize The size of each byte buffer in the ChainedBuffer used to store the records.
* @param serializerInstance the serializer used for serializing inserted records.
@@ -63,6 +65,8 @@ private[spark] class PartitionedSerializedPairBuffer[K, V](
" Java-serialized objects.")
}
+ require(metaInitialRecords <= MAXIMUM_RECORDS,
+ s"Can't make capacity bigger than ${MAXIMUM_RECORDS} records")
private var metaBuffer = IntBuffer.allocate(metaInitialRecords * RECORD_SIZE)
private val kvBuffer: ChainedBuffer = new ChainedBuffer(kvBlockSize)
@@ -89,11 +93,17 @@ private[spark] class PartitionedSerializedPairBuffer[K, V](
/** Double the size of the array because we've reached capacity */
private def growMetaBuffer(): Unit = {
- if (metaBuffer.capacity.toLong * 2 > Int.MaxValue) {
- // Doubling the capacity would create an array bigger than Int.MaxValue, so don't
- throw new Exception(s"Can't grow buffer beyond ${Int.MaxValue} bytes")
+ if (metaBuffer.capacity >= MAXIMUM_META_BUFFER_CAPACITY) {
+ throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_RECORDS} records")
}
- val newMetaBuffer = IntBuffer.allocate(metaBuffer.capacity * 2)
+ val newCapacity =
+ if (metaBuffer.capacity * 2 < 0 || metaBuffer.capacity * 2 > MAXIMUM_META_BUFFER_CAPACITY) {
+ // Overflow
+ MAXIMUM_META_BUFFER_CAPACITY
+ } else {
+ metaBuffer.capacity * 2
+ }
+ val newMetaBuffer = IntBuffer.allocate(newCapacity)
newMetaBuffer.put(metaBuffer.array)
metaBuffer = newMetaBuffer
}
@@ -247,12 +257,15 @@ private[spark] class SerializedSortDataFormat extends SortDataFormat[Int, IntBuf
}
}
-private[spark] object PartitionedSerializedPairBuffer {
+private object PartitionedSerializedPairBuffer {
val KEY_START = 0 // keyStart, a long, gets split across two ints
val KEY_VAL_LEN = 2
val PARTITION = 3
val RECORD_SIZE = PARTITION + 1 // num ints of metadata
+ val MAXIMUM_RECORDS = Int.MaxValue / RECORD_SIZE // (2 ^ 29) - 1
+ val MAXIMUM_META_BUFFER_CAPACITY = MAXIMUM_RECORDS * RECORD_SIZE // (2 ^ 31) - 4
+
def getKeyStartPos(metaBuffer: IntBuffer, metaBufferPos: Int): Long = {
val lower32 = metaBuffer.get(metaBufferPos + KEY_START)
val upper32 = metaBuffer.get(metaBufferPos + KEY_START + 1)