aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2013-12-31 17:19:02 -0800
committerAndrew Or <andrewor14@gmail.com>2013-12-31 17:19:02 -0800
commit53d8d36684b16ae536a5e065e690bb21b9aadc49 (patch)
treefebc27409ba71fb7373970d0be133f07a9cd33c3 /core/src/main/scala/org
parent3ce22df954a17a582b5000b87db8fa887ad8392b (diff)
downloadspark-53d8d36684b16ae536a5e065e690bb21b9aadc49.tar.gz
spark-53d8d36684b16ae536a5e065e690bb21b9aadc49.tar.bz2
spark-53d8d36684b16ae536a5e065e690bb21b9aadc49.zip
Add support and test for null keys in ExternalAppendOnlyMap
Also add safeguard against use of destructively sorted AppendOnlyMap
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala88
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala1
2 files changed, 57 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index a32416afae..d2a9574a71 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -48,10 +48,14 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
private var haveNullValue = false
private var nullValue: V = null.asInstanceOf[V]
+ // Triggered by destructiveSortedIterator; the underlying data array may no longer be used
+ private var destroyed = false
+
private val LOAD_FACTOR = 0.7
/** Get the value for a given key */
def apply(key: K): V = {
+ checkValidityOrThrowException()
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
return nullValue
@@ -75,6 +79,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
/** Set the value for a key */
def update(key: K, value: V): Unit = {
+ checkValidityOrThrowException()
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
@@ -109,6 +114,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
* for key, if any, or null otherwise. Returns the newly updated value.
*/
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
+ checkValidityOrThrowException()
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
@@ -142,35 +148,38 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
}
/** Iterator method from Iterable */
- override def iterator: Iterator[(K, V)] = new Iterator[(K, V)] {
- var pos = -1
+ override def iterator: Iterator[(K, V)] = {
+ checkValidityOrThrowException()
+ new Iterator[(K, V)] {
+ var pos = -1
- /** Get the next value we should return from next(), or null if we're finished iterating */
- def nextValue(): (K, V) = {
- if (pos == -1) { // Treat position -1 as looking at the null value
- if (haveNullValue) {
- return (null.asInstanceOf[K], nullValue)
+ /** Get the next value we should return from next(), or null if we're finished iterating */
+ def nextValue(): (K, V) = {
+ if (pos == -1) { // Treat position -1 as looking at the null value
+ if (haveNullValue) {
+ return (null.asInstanceOf[K], nullValue)
+ }
+ pos += 1
}
- pos += 1
- }
- while (pos < capacity) {
- if (!data(2 * pos).eq(null)) {
- return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V])
+ while (pos < capacity) {
+ if (!data(2 * pos).eq(null)) {
+ return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V])
+ }
+ pos += 1
}
- pos += 1
+ null
}
- null
- }
- override def hasNext: Boolean = nextValue() != null
+ override def hasNext: Boolean = nextValue() != null
- override def next(): (K, V) = {
- val value = nextValue()
- if (value == null) {
- throw new NoSuchElementException("End of iterator")
+ override def next(): (K, V) = {
+ val value = nextValue()
+ if (value == null) {
+ throw new NoSuchElementException("End of iterator")
+ }
+ pos += 1
+ value
}
- pos += 1
- value
}
}
@@ -238,12 +247,14 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
if (highBit == n) n else highBit << 1
}
- /** Return an iterator of the map in sorted order. This provides a way to sort the map without
- * using additional memory, at the expense of destroying the validity of the map.
- */
+ /**
+ * Return an iterator of the map in sorted order. This provides a way to sort the map without
+ * using additional memory, at the expense of destroying the validity of the map.
+ */
def destructiveSortedIterator(cmp: Comparator[(K, V)]): Iterator[(K, V)] = {
- var keyIndex, newIndex = 0
+ destroyed = true
// Pack KV pairs into the front of the underlying array
+ var keyIndex, newIndex = 0
while (keyIndex < capacity) {
if (data(2 * keyIndex) != null) {
data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1))
@@ -251,23 +262,36 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
}
keyIndex += 1
}
- assert(newIndex == curSize)
+ assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
+
// Sort by the given ordering
val rawOrdering = new Comparator[AnyRef] {
def compare(x: AnyRef, y: AnyRef): Int = {
cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
}
}
- util.Arrays.sort(data, 0, curSize, rawOrdering)
+ util.Arrays.sort(data, 0, newIndex, rawOrdering)
new Iterator[(K, V)] {
var i = 0
- def hasNext = i < curSize
+ var nullValueReady = haveNullValue
+ def hasNext: Boolean = (i < newIndex || nullValueReady)
def next(): (K, V) = {
- val item = data(i).asInstanceOf[(K, V)]
- i += 1
- item
+ if (nullValueReady) {
+ nullValueReady = false
+ (null.asInstanceOf[K], nullValue)
+ } else {
+ val item = data(i).asInstanceOf[(K, V)]
+ i += 1
+ item
+ }
}
}
}
+
+ private def checkValidityOrThrowException(): Unit = {
+ if (destroyed) {
+ throw new IllegalStateException("Map state is invalid from destructive sorting!")
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 317a6c168c..492b4fc7c6 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -112,6 +112,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
private var currentMap = new SizeTrackingAppendOnlyMap[K, G]
private val oldMaps = new ArrayBuffer[DiskKGIterator]
+
private val memoryThresholdMB = {
val bufferSize = System.getProperty("spark.shuffle.buffer.mb", "1024").toLong
val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat