aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2013-12-26 21:22:38 -0800
committerAndrew Or <andrewor14@gmail.com>2013-12-26 23:40:08 -0800
commitec8c5dc644ce97c8cf6e13ba2b216ddbe16e9e0a (patch)
tree5aacec9b295542f45c5349186b1d0816e27b3173 /core
parenta515706d9cb2c94ed981d9015026331aaf582f36 (diff)
downloadspark-ec8c5dc644ce97c8cf6e13ba2b216ddbe16e9e0a.tar.gz
spark-ec8c5dc644ce97c8cf6e13ba2b216ddbe16e9e0a.tar.bz2
spark-ec8c5dc644ce97c8cf6e13ba2b216ddbe16e9e0a.zip
Sort AppendOnlyMap in-place
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala32
2 files changed, 51 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index cb0ca8f8c1..38f3c556ae 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -17,6 +17,8 @@
package org.apache.spark.util.collection
+import java.util
+
/**
* A simple open hash table optimized for the append-only use case, where keys
* are never removed, but the value for each key may be changed.
@@ -234,4 +236,35 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
val highBit = Integer.highestOneBit(n)
if (highBit == n) n else highBit << 1
}
+
+ // Return an iterator of the map in sorted order.
+ // Note that the validity of the map is no longer preserved.
+ def destructiveSortedIterator(ord: Ordering[(K, V)]): Iterator[(K, V)] = {
+ var keyIndex, newIndex = 0
+ while (keyIndex < capacity) {
+ if (data(2 * keyIndex) != null) {
+ data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1))
+ newIndex += 1
+ }
+ keyIndex += 1
+ }
+ // sort
+ assert(newIndex == curSize)
+ val rawOrdering = new Ordering[AnyRef] {
+ def compare(x: AnyRef, y: AnyRef): Int ={
+ ord.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
+ }
+ }
+ util.Arrays.sort(data, 0, curSize, rawOrdering)
+
+ new Iterator[(K, V)] {
+ var i = 0
+ def hasNext = i < curSize
+ def next(): (K, V) = {
+ val item = data(i).asInstanceOf[(K, V)]
+ i += 1
+ item
+ }
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 4bda763ffe..ed8b1d36a9 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -88,6 +88,7 @@ class SpillableAppendOnlyMap[K, V, M, C](
val bufferPercent = System.getProperty("spark.shuffle.buffer.percent", "0.8").toFloat
bufferSize * bufferPercent
}
+ val KMOrdering: Ordering[(K, M)] = Ordering.by(km => km._1.hashCode())
def insert(key: K, value: V): Unit = {
val update: (Boolean, M) => M = (hadVal, oldVal) => {
@@ -100,10 +101,14 @@ class SpillableAppendOnlyMap[K, V, M, C](
}
def spill(): Unit = {
+ println("******************* SPILL *********************")
val file = File.createTempFile("external_append_only_map", "")
val out = new ObjectOutputStream(new FileOutputStream(file))
- val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
- sortedMap.foreach(out.writeObject)
+ val it = currentMap.destructiveSortedIterator(KMOrdering)
+ while (it.hasNext) {
+ val kv = it.next()
+ out.writeObject(kv)
+ }
out.close()
currentMap = new SizeTrackingAppendOnlyMap[K, M]
oldMaps.append(new DiskIterator(file))
@@ -115,8 +120,8 @@ class SpillableAppendOnlyMap[K, V, M, C](
class ExternalIterator extends Iterator[(K, C)] {
// Order by key hash value
- val pq = PriorityQueue[KMITuple]()(Ordering.by(_.key.hashCode()))
- val inputStreams = Seq(new MemoryIterator(currentMap)) ++ oldMaps
+ val pq = new PriorityQueue[KMITuple]
+ val inputStreams = Seq(currentMap.destructiveSortedIterator(KMOrdering)) ++ oldMaps
inputStreams.foreach(readFromIterator)
// Read from the given iterator until a key of different hash is retrieved
@@ -127,7 +132,10 @@ class SpillableAppendOnlyMap[K, V, M, C](
pq.enqueue(KMITuple(k, m, it))
minHash match {
case None => minHash = Some(k.hashCode())
- case Some(expectedHash) if k.hashCode() != expectedHash => return
+ case Some(expectedHash) =>
+ if (k.hashCode() != expectedHash) {
+ return
+ }
}
}
}
@@ -156,15 +164,11 @@ class SpillableAppendOnlyMap[K, V, M, C](
(minKey, createCombiner(minGroup))
}
- case class KMITuple(key: K, group: M, iterator: Iterator[(K, M)])
- }
-
- // Iterate through (K, M) pairs in sorted order from the in-memory map
- class MemoryIterator(map: AppendOnlyMap[K, M]) extends Iterator[(K, M)] {
- val sortedMap = currentMap.iterator.toList.sortBy(km => km._1.hashCode())
- val it = sortedMap.iterator
- override def hasNext: Boolean = it.hasNext
- override def next(): (K, M) = it.next()
+ case class KMITuple(key: K, group: M, iterator: Iterator[(K, M)]) extends Ordered[KMITuple] {
+ def compare(other: KMITuple): Int = {
+ -key.hashCode().compareTo(other.key.hashCode())
+ }
+ }
}
// Iterate through (K, M) pairs in sorted order from an on-disk map