aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala39
2 files changed, 44 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 170f09be21..288badd316 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util.collection
import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException}
import java.util.Comparator
+import scala.collection.BufferedIterator
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -231,7 +232,7 @@ class ExternalAppendOnlyMap[K, V, C](
// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
private val sortedMap = currentMap.destructiveSortedIterator(comparator)
- private val inputStreams = Seq(sortedMap) ++ spilledMaps
+ private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
inputStreams.foreach { it =>
val kcPairs = getMorePairs(it)
@@ -246,13 +247,13 @@ class ExternalAppendOnlyMap[K, V, C](
* In the event of key hash collisions, this ensures no pairs are hidden from being merged.
* Assume the given iterator is in sorted order.
*/
- private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
+ private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = {
val kcPairs = new ArrayBuffer[(K, C)]
if (it.hasNext) {
var kc = it.next()
kcPairs += kc
val minHash = kc._1.hashCode()
- while (it.hasNext && kc._1.hashCode() == minHash) {
+ while (it.hasNext && it.head._1.hashCode() == minHash) {
kc = it.next()
kcPairs += kc
}
@@ -325,7 +326,8 @@ class ExternalAppendOnlyMap[K, V, C](
*
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
*/
- private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
+ private class StreamBuffer(
+ val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
extends Comparable[StreamBuffer] {
def isEmpty = pairs.length == 0
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index cdebefb675..deb7809535 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -277,6 +277,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
("pomatoes", "eructation") // 568647356
)
+ collisionPairs.foreach { case (w1, w2) =>
+ // String.hashCode is documented to use a specific algorithm, but check just in case
+ assert(w1.hashCode === w2.hashCode)
+ }
+
(1 to 100000).map(_.toString).foreach { i => map.insert(i, i) }
collisionPairs.foreach { case (w1, w2) =>
map.insert(w1, w2)
@@ -296,7 +301,32 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
assert(kv._2.equals(expectedValue))
count += 1
}
- assert(count == 100000 + collisionPairs.size * 2)
+ assert(count === 100000 + collisionPairs.size * 2)
+ }
+
+ test("spilling with many hash collisions") {
+ val conf = new SparkConf(true)
+ conf.set("spark.shuffle.memoryFraction", "0.0001")
+ sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+
+ val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
+
+ // Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
+ // problems if the map fails to group together the objects with the same code (SPARK-2043).
+ for (i <- 1 to 10) {
+ for (j <- 1 to 10000) {
+ map.insert(FixedHashObject(j, j % 2), 1)
+ }
+ }
+
+ val it = map.iterator
+ var count = 0
+ while (it.hasNext) {
+ val kv = it.next()
+ assert(kv._2 === 10)
+ count += 1
+ }
+ assert(count === 10000)
}
test("spilling with hash collisions using the Int.MaxValue key") {
@@ -317,3 +347,10 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}
}
}
+
+/**
+ * A dummy class that always returns the same hash code, to easily test hash collisions
+ */
+case class FixedHashObject(val v: Int, val h: Int) extends Serializable {
+ override def hashCode(): Int = h
+}