aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-12-29 19:54:53 -0800
committerAaron Davidson <aaron@databricks.com>2013-12-29 19:58:37 -0800
commite3cac47e65bd57fb8fec298eaf9412203d525e68 (patch)
treee72bd8101724f38d8ea7b16a2b8ac9c238357a55 /core
parent8fbff9f5d04064b870e372db0e3885e3fbf28222 (diff)
downloadspark-e3cac47e65bd57fb8fec298eaf9412203d525e68.tar.gz
spark-e3cac47e65bd57fb8fec298eaf9412203d525e68.tar.bz2
spark-e3cac47e65bd57fb8fec298eaf9412203d525e68.zip
Use Comparator instead of Ordering
lower object creation costs
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala13
2 files changed, 11 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index 7810119847..a32416afae 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -18,6 +18,7 @@
package org.apache.spark.util.collection
import java.util
+import java.util.Comparator
/**
* A simple open hash table optimized for the append-only use case, where keys
@@ -240,7 +241,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
/** Return an iterator of the map in sorted order. This provides a way to sort the map without
* using additional memory, at the expense of destroying the validity of the map.
*/
- def destructiveSortedIterator(ordering: Ordering[(K, V)]): Iterator[(K, V)] = {
+ def destructiveSortedIterator(cmp: Comparator[(K, V)]): Iterator[(K, V)] = {
var keyIndex, newIndex = 0
// Pack KV pairs into the front of the underlying array
while (keyIndex < capacity) {
@@ -252,9 +253,9 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
}
assert(newIndex == curSize)
// Sort by the given ordering
- val rawOrdering = new Ordering[AnyRef] {
+ val rawOrdering = new Comparator[AnyRef] {
def compare(x: AnyRef, y: AnyRef): Int = {
- ordering.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
+ cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
}
}
util.Arrays.sort(data, 0, curSize, rawOrdering)
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 1de545c05b..bbf96f71ce 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -18,6 +18,7 @@
package org.apache.spark.util.collection
import java.io._
+import java.util.Comparator
import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
import scala.reflect.ClassTag
@@ -113,7 +114,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat
bufferSize * bufferPercent
}
- private val ordering = new KeyGroupOrdering[K, G]
+ private val comparator = new KeyGroupComparator[K, G]
private val ser = serializer.newInstance()
def insert(key: K, value: V): Unit = {
@@ -129,7 +130,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
private def spill(): Unit = {
val file = File.createTempFile("external_append_only_map", "")
val out = ser.serializeStream(new FileOutputStream(file))
- val it = currentMap.destructiveSortedIterator(ordering)
+ val it = currentMap.destructiveSortedIterator(comparator)
while (it.hasNext) {
val kv = it.next()
out.writeObject(kv)
@@ -150,7 +151,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
// An iterator that sort-merges (K, G) pairs from memory and disk into (K, C) pairs
private class ExternalIterator extends Iterator[(K, C)] {
val mergeHeap = new PriorityQueue[KGITuple]
- val inputStreams = oldMaps ++ Seq(currentMap.destructiveSortedIterator(ordering))
+ val inputStreams = oldMaps ++ Seq(currentMap.destructiveSortedIterator(comparator))
// Invariant: size of mergeHeap == number of input streams
inputStreams.foreach{ it =>
@@ -241,7 +242,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
}
case class KGITuple(iterator: Iterator[(K, G)], pairs: ArrayBuffer[(K, G)])
- extends Ordered[KGITuple] {
+ extends Comparable[KGITuple] {
// Invariant: pairs are ordered by key hash
def minHash: Int = {
@@ -252,7 +253,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
}
}
- def compare(other: KGITuple): Int = {
+ override def compareTo(other: KGITuple): Int = {
// mutable.PriorityQueue dequeues the max, not the min
-minHash.compareTo(other.minHash)
}
@@ -299,7 +300,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
}
private[spark] object SpillableAppendOnlyMap {
- private class KeyGroupOrdering[K, G] extends Ordering[(K, G)] {
+ private class KeyGroupComparator[K, G] extends Comparator[(K, G)] {
def compare(kg1: (K, G), kg2: (K, G)): Int = {
kg1._1.hashCode().compareTo(kg2._1.hashCode())
}