diff options
author | Andrew Or <andrewor14@gmail.com> | 2013-12-31 17:19:02 -0800 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2013-12-31 17:19:02 -0800 |
commit | 53d8d36684b16ae536a5e065e690bb21b9aadc49 (patch) | |
tree | febc27409ba71fb7373970d0be133f07a9cd33c3 /core/src/main/scala/org | |
parent | 3ce22df954a17a582b5000b87db8fa887ad8392b (diff) | |
download | spark-53d8d36684b16ae536a5e065e690bb21b9aadc49.tar.gz spark-53d8d36684b16ae536a5e065e690bb21b9aadc49.tar.bz2 spark-53d8d36684b16ae536a5e065e690bb21b9aadc49.zip |
Add support and test for null keys in ExternalAppendOnlyMap
Also add safeguard against use of destructively sorted AppendOnlyMap
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala | 88 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala | 1 |
2 files changed, 57 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala index a32416afae..d2a9574a71 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala @@ -48,10 +48,14 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi private var haveNullValue = false private var nullValue: V = null.asInstanceOf[V] + // Triggered by destructiveSortedIterator; the underlying data array may no longer be used + private var destroyed = false + private val LOAD_FACTOR = 0.7 /** Get the value for a given key */ def apply(key: K): V = { + checkValidityOrThrowException() val k = key.asInstanceOf[AnyRef] if (k.eq(null)) { return nullValue @@ -75,6 +79,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi /** Set the value for a key */ def update(key: K, value: V): Unit = { + checkValidityOrThrowException() val k = key.asInstanceOf[AnyRef] if (k.eq(null)) { if (!haveNullValue) { @@ -109,6 +114,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi * for key, if any, or null otherwise. Returns the newly updated value. */ def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { + checkValidityOrThrowException() val k = key.asInstanceOf[AnyRef] if (k.eq(null)) { if (!haveNullValue) { @@ -142,35 +148,38 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi } /** Iterator method from Iterable */ - override def iterator: Iterator[(K, V)] = new Iterator[(K, V)] { - var pos = -1 + override def iterator: Iterator[(K, V)] = { + checkValidityOrThrowException() + new Iterator[(K, V)] { + var pos = -1 - /** Get the next value we should return from next(), or null if we're finished iterating */ - def nextValue(): (K, V) = { - if (pos == -1) { // Treat position -1 as looking at the null value - if (haveNullValue) { - return (null.asInstanceOf[K], nullValue) + /** Get the next value we should return from next(), or null if we're finished iterating */ + def nextValue(): (K, V) = { + if (pos == -1) { // Treat position -1 as looking at the null value + if (haveNullValue) { + return (null.asInstanceOf[K], nullValue) + } + pos += 1 } - pos += 1 - } - while (pos < capacity) { - if (!data(2 * pos).eq(null)) { - return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V]) + while (pos < capacity) { + if (!data(2 * pos).eq(null)) { + return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V]) + } + pos += 1 } - pos += 1 + null } - null - } - override def hasNext: Boolean = nextValue() != null + override def hasNext: Boolean = nextValue() != null - override def next(): (K, V) = { - val value = nextValue() - if (value == null) { - throw new NoSuchElementException("End of iterator") + override def next(): (K, V) = { + val value = nextValue() + if (value == null) { + throw new NoSuchElementException("End of iterator") + } + pos += 1 + value } - pos += 1 - value } } @@ -238,12 +247,14 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi if (highBit == n) n else highBit << 1 } - /** Return an iterator of the map in sorted order. This provides a way to sort the map without - * using additional memory, at the expense of destroying the validity of the map. - */ + /** + * Return an iterator of the map in sorted order. This provides a way to sort the map without + * using additional memory, at the expense of destroying the validity of the map. + */ def destructiveSortedIterator(cmp: Comparator[(K, V)]): Iterator[(K, V)] = { - var keyIndex, newIndex = 0 + destroyed = true // Pack KV pairs into the front of the underlying array + var keyIndex, newIndex = 0 while (keyIndex < capacity) { if (data(2 * keyIndex) != null) { data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1)) @@ -251,23 +262,36 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi } keyIndex += 1 } - assert(newIndex == curSize) + assert(curSize == newIndex + (if (haveNullValue) 1 else 0)) + // Sort by the given ordering val rawOrdering = new Comparator[AnyRef] { def compare(x: AnyRef, y: AnyRef): Int = { cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)]) } } - util.Arrays.sort(data, 0, curSize, rawOrdering) + util.Arrays.sort(data, 0, newIndex, rawOrdering) new Iterator[(K, V)] { var i = 0 - def hasNext = i < curSize + var nullValueReady = haveNullValue + def hasNext: Boolean = (i < newIndex || nullValueReady) def next(): (K, V) = { - val item = data(i).asInstanceOf[(K, V)] - i += 1 - item + if (nullValueReady) { + nullValueReady = false + (null.asInstanceOf[K], nullValue) + } else { + val item = data(i).asInstanceOf[(K, V)] + i += 1 + item + } } } } + + private def checkValidityOrThrowException(): Unit = { + if (destroyed) { + throw new IllegalStateException("Map state is invalid from destructive sorting!") + } + } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 317a6c168c..492b4fc7c6 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -112,6 +112,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( private var currentMap = new SizeTrackingAppendOnlyMap[K, G] private val oldMaps = new ArrayBuffer[DiskKGIterator] + private val memoryThresholdMB = { val bufferSize = System.getProperty("spark.shuffle.buffer.mb", "1024").toLong val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat |