diff options
author | Andrew Or <andrewor14@gmail.com> | 2013-12-29 22:03:47 -0800 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2013-12-29 22:03:47 -0800 |
commit | 015a510b0a456d98d791fb2c7dc3c1448f7db2ab (patch) | |
tree | 115377cc1c0a5cc1cd9193d725522cc5c68fff13 /core | |
parent | 2a48d71528f8be31068b9d60f161bf8e8b32c659 (diff) | |
parent | e3cac47e65bd57fb8fec298eaf9412203d525e68 (diff) | |
download | spark-015a510b0a456d98d791fb2c7dc3c1448f7db2ab.tar.gz spark-015a510b0a456d98d791fb2c7dc3c1448f7db2ab.tar.bz2 spark-015a510b0a456d98d791fb2c7dc3c1448f7db2ab.zip |
Merge branch 'master' of github.com:andrewor14/incubator-spark
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala | 13 |
2 files changed, 11 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala index 7810119847..a32416afae 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala @@ -18,6 +18,7 @@ package org.apache.spark.util.collection import java.util +import java.util.Comparator /** * A simple open hash table optimized for the append-only use case, where keys @@ -240,7 +241,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi /** Return an iterator of the map in sorted order. This provides a way to sort the map without * using additional memory, at the expense of destroying the validity of the map. */ - def destructiveSortedIterator(ordering: Ordering[(K, V)]): Iterator[(K, V)] = { + def destructiveSortedIterator(cmp: Comparator[(K, V)]): Iterator[(K, V)] = { var keyIndex, newIndex = 0 // Pack KV pairs into the front of the underlying array while (keyIndex < capacity) { @@ -252,9 +253,9 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi } assert(newIndex == curSize) // Sort by the given ordering - val rawOrdering = new Ordering[AnyRef] { + val rawOrdering = new Comparator[AnyRef] { def compare(x: AnyRef, y: AnyRef): Int = { - ordering.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)]) + cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)]) } } util.Arrays.sort(data, 0, curSize, rawOrdering) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index b15cae1259..ac9431cb0d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -18,6 +18,7 @@ package org.apache.spark.util.collection import java.io._ +import java.util.Comparator import scala.collection.mutable.{ArrayBuffer, PriorityQueue} import scala.reflect.ClassTag @@ -113,7 +114,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat bufferSize * bufferPercent } - private val ordering = new KeyGroupOrdering[K, G] + private val comparator = new KeyGroupComparator[K, G] private val ser = serializer.newInstance() def insert(key: K, value: V): Unit = { @@ -129,7 +130,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( private def spill(): Unit = { val file = File.createTempFile("external_append_only_map", "") val out = ser.serializeStream(new FileOutputStream(file)) - val it = currentMap.destructiveSortedIterator(ordering) + val it = currentMap.destructiveSortedIterator(comparator) while (it.hasNext) { val kv = it.next() out.writeObject(kv) @@ -150,7 +151,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( // An iterator that sort-merges (K, G) pairs from memory and disk into (K, C) pairs private class ExternalIterator extends Iterator[(K, C)] { val mergeHeap = new PriorityQueue[KGITuple] - val inputStreams = oldMaps ++ Seq(currentMap.destructiveSortedIterator(ordering)) + val inputStreams = oldMaps ++ Seq(currentMap.destructiveSortedIterator(comparator)) // Invariant: size of mergeHeap == number of input streams inputStreams.foreach{ it => @@ -241,7 +242,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( } case class KGITuple(iterator: Iterator[(K, G)], pairs: ArrayBuffer[(K, G)]) - extends Ordered[KGITuple] { + extends Comparable[KGITuple] { // Invariant: pairs are ordered by key hash def minHash: Int = { @@ -252,7 +253,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( } } - def compare(other: KGITuple): Int = { + override def compareTo(other: KGITuple): Int = { // mutable.PriorityQueue dequeues the max, not the min -minHash.compareTo(other.minHash) } @@ -299,7 +300,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( } private[spark] object SpillableAppendOnlyMap { - private class KeyGroupOrdering[K, G] extends Ordering[(K, G)] { + private class KeyGroupComparator[K, G] extends Comparator[(K, G)] { def compare(kg1: (K, G), kg2: (K, G)): Int = { kg1._1.hashCode().compareTo(kg2._1.hashCode()) } |