aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-07-30 18:07:59 -0700
committerReynold Xin <rxin@apache.org>2014-07-30 18:07:59 -0700
commite966284409f9355e1169960e73a2215617c8cb22 (patch)
tree2e2ad582ff8fa55a8d1cc747cf8a833c3de77dff /core/src/test
parentda501766834453c9ac7095c7e8c930151f87cf11 (diff)
downloadspark-e966284409f9355e1169960e73a2215617c8cb22.tar.gz
spark-e966284409f9355e1169960e73a2215617c8cb22.tar.bz2
spark-e966284409f9355e1169960e73a2215617c8cb22.zip
SPARK-2045 Sort-based shuffle
This adds a new ShuffleManager based on sorting, as described in https://issues.apache.org/jira/browse/SPARK-2045. The bulk of the code is in an ExternalSorter class that is similar to ExternalAppendOnlyMap, but sorts key-value pairs by partition ID and can be used to create a single sorted file with a map task's output. (Longer-term I think this can take on the remaining functionality in ExternalAppendOnlyMap and replace it so we don't have code duplication.) The main TODOs still left are: - [x] enabling ExternalSorter to merge across spilled files - [x] with an Ordering - [x] without an Ordering, using the keys' hash codes - [x] adding more tests (e.g. a version of our shuffle suite that runs on this) - [x] rebasing on top of the size-tracking refactoring in #1165 when that is merged - [x] disabling spilling if spark.shuffle.spill is set to false Despite this though, this seems to work pretty well (running successfully in cases where the hash shuffle would OOM, such as 1000 reduce tasks on executors with only 1G memory), and it seems to be comparable in speed or faster than hash-based shuffle (it will create much fewer files for the OS to keep track of). So I'm posting it to get some early feedback. After these TODOs are done, I'd also like to enable ExternalSorter to sort data within each partition by a key as well, which will allow us to use it to implement external spilling in reduce tasks in `sortByKey`. Author: Matei Zaharia <matei@databricks.com> Closes #1499 from mateiz/sort-based-shuffle and squashes the following commits: bd841f9 [Matei Zaharia] Various review comments d1c137fd [Matei Zaharia] Various review comments a611159 [Matei Zaharia] Compile fixes due to rebase 62c56c8 [Matei Zaharia] Fix ShuffledRDD sometimes not returning Tuple2s. f617432 [Matei Zaharia] Fix a failing test (seems to be due to change in SizeTracker logic) 9464d5f [Matei Zaharia] Simplify code and fix conflicts after latest rebase 0174149 [Matei Zaharia] Add cleanup behavior and cleanup tests for sort-based shuffle eb4ee0d [Matei Zaharia] Remove customizable element type in ShuffledRDD fa2e8db [Matei Zaharia] Allow nextBatchStream to be called after we're done looking at all streams a34b352 [Matei Zaharia] Fix tracking of indices within a partition in SpillReader, and add test 03e1006 [Matei Zaharia] Add a SortShuffleSuite that runs ShuffleSuite with sort-based shuffle 3c7ff1f [Matei Zaharia] Obey the spark.shuffle.spill setting in ExternalSorter ad65fbd [Matei Zaharia] Rebase on top of Aaron's Sorter change, and use Sorter in our buffer 44d2a93 [Matei Zaharia] Use estimateSize instead of atGrowThreshold to test collection sizes 5686f71 [Matei Zaharia] Optimize merging phase for in-memory only data: 5461cbb [Matei Zaharia] Review comments and more tests (e.g. tests with 1 element per partition) e9ad356 [Matei Zaharia] Update ContextCleanerSuite to make sure shuffle cleanup tests use hash shuffle (since they were written for it) c72362a [Matei Zaharia] Added bug fix and test for when iterators are empty de1fb40 [Matei Zaharia] Make trait SizeTrackingCollection private[spark] 4988d16 [Matei Zaharia] tweak c1b7572 [Matei Zaharia] Small optimization ba7db7f [Matei Zaharia] Handle null keys in hash-based comparator, and add tests for collisions ef4e397 [Matei Zaharia] Support for partial aggregation even without an Ordering 4b7a5ce [Matei Zaharia] More tests, and ability to sort data if a total ordering is given e1f84be [Matei Zaharia] Fix disk block manager test 5a40a1c [Matei Zaharia] More tests 614f1b4 [Matei Zaharia] Add spill metrics to map tasks cc52caf [Matei Zaharia] Add more error handling and tests for error cases bbf359d [Matei Zaharia] More work 3a56341 [Matei Zaharia] More partial work towards sort-based shuffle 7a0895d [Matei Zaharia] Some more partial work towards sort-based shuffle b615476 [Matei Zaharia] Scaffolding for sort-based shuffle
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala186
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/SortShuffleSuite.scala34
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala25
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala566
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/FixedHashObject.scala25
9 files changed, 788 insertions, 84 deletions
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index d1cb2d9d3a..a41914a1a9 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -99,7 +99,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
test("ShuffledRDD") {
testRDD(rdd => {
// Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD
- new ShuffledRDD[Int, Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner)
+ new ShuffledRDD[Int, Int, Int](rdd.map(x => (x % 2, 1)), partitioner)
})
}
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index ad20f9b937..4bc4346c0a 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -19,9 +19,6 @@ package org.apache.spark
import java.lang.ref.WeakReference
-import org.apache.spark.broadcast.Broadcast
-
-import scala.collection.mutable
import scala.collection.mutable.{HashSet, SynchronizedSet}
import scala.language.existentials
import scala.language.postfixOps
@@ -34,15 +31,28 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.{BlockId, BroadcastBlockId, RDDBlockId, ShuffleBlockId}
-
-class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
-
+import org.apache.spark.storage._
+import org.apache.spark.shuffle.hash.HashShuffleManager
+import org.apache.spark.shuffle.sort.SortShuffleManager
+import org.apache.spark.storage.BroadcastBlockId
+import org.apache.spark.storage.RDDBlockId
+import org.apache.spark.storage.ShuffleBlockId
+import org.apache.spark.storage.ShuffleIndexBlockId
+
+/**
+ * An abstract base class for context cleaner tests, which sets up a context with a config
+ * suitable for cleaner tests and provides some utility functions. Subclasses can use different
+ * config options, in particular, a different shuffle manager class
+ */
+abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[HashShuffleManager])
+ extends FunSuite with BeforeAndAfter with LocalSparkContext
+{
implicit val defaultTimeout = timeout(10000 millis)
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
+ .set("spark.shuffle.manager", shuffleManager.getName)
before {
sc = new SparkContext(conf)
@@ -55,6 +65,59 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
}
+ //------ Helper functions ------
+
+ protected def newRDD() = sc.makeRDD(1 to 10)
+ protected def newPairRDD() = newRDD().map(_ -> 1)
+ protected def newShuffleRDD() = newPairRDD().reduceByKey(_ + _)
+ protected def newBroadcast() = sc.broadcast(1 to 100)
+
+ protected def newRDDWithShuffleDependencies(): (RDD[_], Seq[ShuffleDependency[_, _, _]]) = {
+ def getAllDependencies(rdd: RDD[_]): Seq[Dependency[_]] = {
+ rdd.dependencies ++ rdd.dependencies.flatMap { dep =>
+ getAllDependencies(dep.rdd)
+ }
+ }
+ val rdd = newShuffleRDD()
+
+ // Get all the shuffle dependencies
+ val shuffleDeps = getAllDependencies(rdd)
+ .filter(_.isInstanceOf[ShuffleDependency[_, _, _]])
+ .map(_.asInstanceOf[ShuffleDependency[_, _, _]])
+ (rdd, shuffleDeps)
+ }
+
+ protected def randomRdd() = {
+ val rdd: RDD[_] = Random.nextInt(3) match {
+ case 0 => newRDD()
+ case 1 => newShuffleRDD()
+ case 2 => newPairRDD.join(newPairRDD())
+ }
+ if (Random.nextBoolean()) rdd.persist()
+ rdd.count()
+ rdd
+ }
+
+ /** Run GC and make sure it actually has run */
+ protected def runGC() {
+ val weakRef = new WeakReference(new Object())
+ val startTime = System.currentTimeMillis
+ System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC.
+ // Wait until a weak reference object has been GCed
+ while (System.currentTimeMillis - startTime < 10000 && weakRef.get != null) {
+ System.gc()
+ Thread.sleep(200)
+ }
+ }
+
+ protected def cleaner = sc.cleaner.get
+}
+
+
+/**
+ * Basic ContextCleanerSuite, which uses sort-based shuffle
+ */
+class ContextCleanerSuite extends ContextCleanerSuiteBase {
test("cleanup RDD") {
val rdd = newRDD().persist()
val collected = rdd.collect().toList
@@ -147,7 +210,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
val numRdds = 100
val numBroadcasts = 4 // Broadcasts are more costly
val rddBuffer = (1 to numRdds).map(i => randomRdd()).toBuffer
- val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast()).toBuffer
+ val broadcastBuffer = (1 to numBroadcasts).map(i => newBroadcast()).toBuffer
val rddIds = sc.persistentRdds.keys.toSeq
val shuffleIds = 0 until sc.newShuffleId
val broadcastIds = broadcastBuffer.map(_.id)
@@ -180,12 +243,13 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
.setMaster("local-cluster[2, 1, 512]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
+ .set("spark.shuffle.manager", shuffleManager.getName)
sc = new SparkContext(conf2)
val numRdds = 10
val numBroadcasts = 4 // Broadcasts are more costly
val rddBuffer = (1 to numRdds).map(i => randomRdd()).toBuffer
- val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast()).toBuffer
+ val broadcastBuffer = (1 to numBroadcasts).map(i => newBroadcast()).toBuffer
val rddIds = sc.persistentRdds.keys.toSeq
val shuffleIds = 0 until sc.newShuffleId
val broadcastIds = broadcastBuffer.map(_.id)
@@ -210,57 +274,82 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
case _ => false
}, askSlaves = true).isEmpty)
}
+}
- //------ Helper functions ------
- private def newRDD() = sc.makeRDD(1 to 10)
- private def newPairRDD() = newRDD().map(_ -> 1)
- private def newShuffleRDD() = newPairRDD().reduceByKey(_ + _)
- private def newBroadcast() = sc.broadcast(1 to 100)
+/**
+ * A copy of the shuffle tests for sort-based shuffle
+ */
+class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[SortShuffleManager]) {
+ test("cleanup shuffle") {
+ val (rdd, shuffleDeps) = newRDDWithShuffleDependencies()
+ val collected = rdd.collect().toList
+ val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId))
- private def newRDDWithShuffleDependencies(): (RDD[_], Seq[ShuffleDependency[_, _, _]]) = {
- def getAllDependencies(rdd: RDD[_]): Seq[Dependency[_]] = {
- rdd.dependencies ++ rdd.dependencies.flatMap { dep =>
- getAllDependencies(dep.rdd)
- }
- }
- val rdd = newShuffleRDD()
+ // Explicit cleanup
+ shuffleDeps.foreach(s => cleaner.doCleanupShuffle(s.shuffleId, blocking = true))
+ tester.assertCleanup()
- // Get all the shuffle dependencies
- val shuffleDeps = getAllDependencies(rdd)
- .filter(_.isInstanceOf[ShuffleDependency[_, _, _]])
- .map(_.asInstanceOf[ShuffleDependency[_, _, _]])
- (rdd, shuffleDeps)
+ // Verify that shuffles can be re-executed after cleaning up
+ assert(rdd.collect().toList.equals(collected))
}
- private def randomRdd() = {
- val rdd: RDD[_] = Random.nextInt(3) match {
- case 0 => newRDD()
- case 1 => newShuffleRDD()
- case 2 => newPairRDD.join(newPairRDD())
- }
- if (Random.nextBoolean()) rdd.persist()
+ test("automatically cleanup shuffle") {
+ var rdd = newShuffleRDD()
rdd.count()
- rdd
- }
- private def randomBroadcast() = {
- sc.broadcast(Random.nextInt(Int.MaxValue))
+ // Test that GC does not cause shuffle cleanup due to a strong reference
+ val preGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
+ runGC()
+ intercept[Exception] {
+ preGCTester.assertCleanup()(timeout(1000 millis))
+ }
+
+ // Test that GC causes shuffle cleanup after dereferencing the RDD
+ val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
+ rdd = null // Make RDD out of scope, so that corresponding shuffle goes out of scope
+ runGC()
+ postGCTester.assertCleanup()
}
- /** Run GC and make sure it actually has run */
- private def runGC() {
- val weakRef = new WeakReference(new Object())
- val startTime = System.currentTimeMillis
- System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC.
- // Wait until a weak reference object has been GCed
- while (System.currentTimeMillis - startTime < 10000 && weakRef.get != null) {
- System.gc()
- Thread.sleep(200)
+ test("automatically cleanup RDD + shuffle + broadcast in distributed mode") {
+ sc.stop()
+
+ val conf2 = new SparkConf()
+ .setMaster("local-cluster[2, 1, 512]")
+ .setAppName("ContextCleanerSuite")
+ .set("spark.cleaner.referenceTracking.blocking", "true")
+ .set("spark.shuffle.manager", shuffleManager.getName)
+ sc = new SparkContext(conf2)
+
+ val numRdds = 10
+ val numBroadcasts = 4 // Broadcasts are more costly
+ val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer
+ val broadcastBuffer = (1 to numBroadcasts).map(i => newBroadcast).toBuffer
+ val rddIds = sc.persistentRdds.keys.toSeq
+ val shuffleIds = 0 until sc.newShuffleId()
+ val broadcastIds = broadcastBuffer.map(_.id)
+
+ val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
+ runGC()
+ intercept[Exception] {
+ preGCTester.assertCleanup()(timeout(1000 millis))
}
- }
- private def cleaner = sc.cleaner.get
+ // Test that GC triggers the cleanup of all variables after the dereferencing them
+ val postGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
+ broadcastBuffer.clear()
+ rddBuffer.clear()
+ runGC()
+ postGCTester.assertCleanup()
+
+ // Make sure the broadcasted task closure no longer exists after GC.
+ val taskClosureBroadcastId = broadcastIds.max + 1
+ assert(sc.env.blockManager.master.getMatchingBlockIds({
+ case BroadcastBlockId(`taskClosureBroadcastId`, _) => true
+ case _ => false
+ }, askSlaves = true).isEmpty)
+ }
}
@@ -418,6 +507,7 @@ class CleanerTester(
private def getShuffleBlocks(shuffleId: Int): Seq[BlockId] = {
blockManager.master.getMatchingBlockIds( _ match {
case ShuffleBlockId(`shuffleId`, _, _) => true
+ case ShuffleIndexBlockId(`shuffleId`, _, _) => true
case _ => false
}, askSlaves = true)
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
index 47df00050c..d7b2d2e1e3 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
@@ -28,6 +28,6 @@ class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {
}
override def afterAll() {
- System.setProperty("spark.shuffle.use.netty", "false")
+ System.clearProperty("spark.shuffle.use.netty")
}
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index eae67c7747..b13ddf96bc 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -58,8 +58,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
// default Java serializer cannot handle the non serializable class.
val c = new ShuffledRDD[Int,
NonJavaSerializableClass,
- NonJavaSerializableClass,
- (Int, NonJavaSerializableClass)](b, new HashPartitioner(NUM_BLOCKS))
+ NonJavaSerializableClass](b, new HashPartitioner(NUM_BLOCKS))
c.setSerializer(new KryoSerializer(conf))
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
@@ -83,8 +82,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
// default Java serializer cannot handle the non serializable class.
val c = new ShuffledRDD[Int,
NonJavaSerializableClass,
- NonJavaSerializableClass,
- (Int, NonJavaSerializableClass)](b, new HashPartitioner(3))
+ NonJavaSerializableClass](b, new HashPartitioner(3))
c.setSerializer(new KryoSerializer(conf))
assert(c.count === 10)
}
@@ -100,7 +98,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
// NOTE: The default Java serializer doesn't create zero-sized blocks.
// So, use Kryo
- val c = new ShuffledRDD[Int, Int, Int, (Int, Int)](b, new HashPartitioner(10))
+ val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(10))
.setSerializer(new KryoSerializer(conf))
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
@@ -126,7 +124,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
val b = a.map(x => (x, x*2))
// NOTE: The default Java serializer should create zero-sized blocks
- val c = new ShuffledRDD[Int, Int, Int, (Int, Int)](b, new HashPartitioner(10))
+ val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(10))
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
assert(c.count === 4)
@@ -141,19 +139,19 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
assert(nonEmptyBlocks.size <= 4)
}
- test("shuffle using mutable pairs") {
+ test("shuffle on mutable pairs") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,512]", "test")
def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
- val results = new ShuffledRDD[Int, Int, Int, MutablePair[Int, Int]](pairs,
+ val results = new ShuffledRDD[Int, Int, Int](pairs,
new HashPartitioner(2)).collect()
- data.foreach { pair => results should contain (pair) }
+ data.foreach { pair => results should contain ((pair._1, pair._2)) }
}
- test("sorting using mutable pairs") {
+ test("sorting on mutable pairs") {
// This is not in SortingSuite because of the local cluster setup.
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,512]", "test")
@@ -162,10 +160,10 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
val results = new OrderedRDDFunctions[Int, Int, MutablePair[Int, Int]](pairs)
.sortByKey().collect()
- results(0) should be (p(1, 11))
- results(1) should be (p(2, 22))
- results(2) should be (p(3, 33))
- results(3) should be (p(100, 100))
+ results(0) should be ((1, 11))
+ results(1) should be ((2, 22))
+ results(2) should be ((3, 33))
+ results(3) should be ((100, 100))
}
test("cogroup using mutable pairs") {
diff --git a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
new file mode 100644
index 0000000000..5c02c00586
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.BeforeAndAfterAll
+
+class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
+
+ // This test suite should run all tests in ShuffleSuite with sort-based shuffle.
+
+ override def beforeAll() {
+ System.setProperty("spark.shuffle.manager",
+ "org.apache.spark.shuffle.sort.SortShuffleManager")
+ }
+
+ override def afterAll() {
+ System.clearProperty("spark.shuffle.manager")
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 4953d565ae..8966eedd80 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -270,7 +270,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// we can optionally shuffle to keep the upstream parallel
val coalesced5 = data.coalesce(1, shuffle = true)
val isEquals = coalesced5.dependencies.head.rdd.dependencies.head.rdd.
- asInstanceOf[ShuffledRDD[_, _, _, _]] != null
+ asInstanceOf[ShuffledRDD[_, _, _]] != null
assert(isEquals)
// when shuffling, we can increase the number of partitions
@@ -730,9 +730,9 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// Any ancestors before the shuffle are not considered
assert(ancestors4.size === 0)
- assert(ancestors4.count(_.isInstanceOf[ShuffledRDD[_, _, _, _]]) === 0)
+ assert(ancestors4.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 0)
assert(ancestors5.size === 3)
- assert(ancestors5.count(_.isInstanceOf[ShuffledRDD[_, _, _, _]]) === 1)
+ assert(ancestors5.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 1)
assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 0)
assert(ancestors5.count(_.isInstanceOf[MappedValuesRDD[_, _, _]]) === 2)
}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 0b7ad184a4..7de5df6e1c 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -208,11 +208,8 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
val resultA = rddA.reduceByKey(math.max).collect()
assert(resultA.length == 50000)
resultA.foreach { case(k, v) =>
- k match {
- case 0 => assert(v == 1)
- case 25000 => assert(v == 50001)
- case 49999 => assert(v == 99999)
- case _ =>
+ if (v != k * 2 + 1) {
+ fail(s"Value for ${k} was wrong: expected ${k * 2 + 1}, got ${v}")
}
}
@@ -221,11 +218,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
val resultB = rddB.groupByKey().collect()
assert(resultB.length == 25000)
resultB.foreach { case(i, seq) =>
- i match {
- case 0 => assert(seq.toSet == Set[Int](0, 1, 2, 3))
- case 12500 => assert(seq.toSet == Set[Int](50000, 50001, 50002, 50003))
- case 24999 => assert(seq.toSet == Set[Int](99996, 99997, 99998, 99999))
- case _ =>
+ val expected = Set(i * 4, i * 4 + 1, i * 4 + 2, i * 4 + 3)
+ if (seq.toSet != expected) {
+ fail(s"Value for ${i} was wrong: expected ${expected}, got ${seq.toSet}")
}
}
@@ -239,6 +234,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
case 0 =>
assert(seq1.toSet == Set[Int](0))
assert(seq2.toSet == Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000))
+ case 1 =>
+ assert(seq1.toSet == Set[Int](1))
+ assert(seq2.toSet == Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001, 8001, 9001))
case 5000 =>
assert(seq1.toSet == Set[Int](5000))
assert(seq2.toSet == Set[Int]())
@@ -369,10 +367,3 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}
}
-
-/**
- * A dummy class that always returns the same hash code, to easily test hash collisions
- */
-case class FixedHashObject(v: Int, h: Int) extends Serializable {
- override def hashCode(): Int = h
-}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
new file mode 100644
index 0000000000..ddb5df4036
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.FunSuite
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+
+class ExternalSorterSuite extends FunSuite with LocalSparkContext {
+ test("empty data stream") {
+ val conf = new SparkConf(false)
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
+ sc = new SparkContext("local", "test", conf)
+
+ val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
+ val ord = implicitly[Ordering[Int]]
+
+ // Both aggregator and ordering
+ val sorter = new ExternalSorter[Int, Int, Int](
+ Some(agg), Some(new HashPartitioner(3)), Some(ord), None)
+ assert(sorter.iterator.toSeq === Seq())
+ sorter.stop()
+
+ // Only aggregator
+ val sorter2 = new ExternalSorter[Int, Int, Int](
+ Some(agg), Some(new HashPartitioner(3)), None, None)
+ assert(sorter2.iterator.toSeq === Seq())
+ sorter2.stop()
+
+ // Only ordering
+ val sorter3 = new ExternalSorter[Int, Int, Int](
+ None, Some(new HashPartitioner(3)), Some(ord), None)
+ assert(sorter3.iterator.toSeq === Seq())
+ sorter3.stop()
+
+ // Neither aggregator nor ordering
+ val sorter4 = new ExternalSorter[Int, Int, Int](
+ None, Some(new HashPartitioner(3)), None, None)
+ assert(sorter4.iterator.toSeq === Seq())
+ sorter4.stop()
+ }
+
+ test("few elements per partition") {
+ val conf = new SparkConf(false)
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
+ sc = new SparkContext("local", "test", conf)
+
+ val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
+ val ord = implicitly[Ordering[Int]]
+ val elements = Set((1, 1), (2, 2), (5, 5))
+ val expected = Set(
+ (0, Set()), (1, Set((1, 1))), (2, Set((2, 2))), (3, Set()), (4, Set()),
+ (5, Set((5, 5))), (6, Set()))
+
+ // Both aggregator and ordering
+ val sorter = new ExternalSorter[Int, Int, Int](
+ Some(agg), Some(new HashPartitioner(7)), Some(ord), None)
+ sorter.write(elements.iterator)
+ assert(sorter.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
+ sorter.stop()
+
+ // Only aggregator
+ val sorter2 = new ExternalSorter[Int, Int, Int](
+ Some(agg), Some(new HashPartitioner(7)), None, None)
+ sorter2.write(elements.iterator)
+ assert(sorter2.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
+ sorter2.stop()
+
+ // Only ordering
+ val sorter3 = new ExternalSorter[Int, Int, Int](
+ None, Some(new HashPartitioner(7)), Some(ord), None)
+ sorter3.write(elements.iterator)
+ assert(sorter3.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
+ sorter3.stop()
+
+ // Neither aggregator nor ordering
+ val sorter4 = new ExternalSorter[Int, Int, Int](
+ None, Some(new HashPartitioner(7)), None, None)
+ sorter4.write(elements.iterator)
+ assert(sorter4.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
+ sorter4.stop()
+ }
+
+ test("empty partitions with spilling") {
+ val conf = new SparkConf(false)
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
+ sc = new SparkContext("local", "test", conf)
+
+ val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
+ val ord = implicitly[Ordering[Int]]
+ val elements = Iterator((1, 1), (5, 5)) ++ (0 until 100000).iterator.map(x => (2, 2))
+
+ val sorter = new ExternalSorter[Int, Int, Int](
+ None, Some(new HashPartitioner(7)), None, None)
+ sorter.write(elements)
+ assert(sc.env.blockManager.diskBlockManager.getAllFiles().length > 0) // Make sure it spilled
+ val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList))
+ assert(iter.next() === (0, Nil))
+ assert(iter.next() === (1, List((1, 1))))
+ assert(iter.next() === (2, (0 until 100000).map(x => (2, 2)).toList))
+ assert(iter.next() === (3, Nil))
+ assert(iter.next() === (4, Nil))
+ assert(iter.next() === (5, List((5, 5))))
+ assert(iter.next() === (6, Nil))
+ sorter.stop()
+ }
+
+ test("spilling in local cluster") {
+ val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
+ sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+
+ // reduceByKey - should spill ~8 times
+ val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
+ val resultA = rddA.reduceByKey(math.max).collect()
+ assert(resultA.length == 50000)
+ resultA.foreach { case(k, v) =>
+ if (v != k * 2 + 1) {
+ fail(s"Value for ${k} was wrong: expected ${k * 2 + 1}, got ${v}")
+ }
+ }
+
+ // groupByKey - should spill ~17 times
+ val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i))
+ val resultB = rddB.groupByKey().collect()
+ assert(resultB.length == 25000)
+ resultB.foreach { case(i, seq) =>
+ val expected = Set(i * 4, i * 4 + 1, i * 4 + 2, i * 4 + 3)
+ if (seq.toSet != expected) {
+ fail(s"Value for ${i} was wrong: expected ${expected}, got ${seq.toSet}")
+ }
+ }
+
+ // cogroup - should spill ~7 times
+ val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i))
+ val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i))
+ val resultC = rddC1.cogroup(rddC2).collect()
+ assert(resultC.length == 10000)
+ resultC.foreach { case(i, (seq1, seq2)) =>
+ i match {
+ case 0 =>
+ assert(seq1.toSet == Set[Int](0))
+ assert(seq2.toSet == Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000))
+ case 1 =>
+ assert(seq1.toSet == Set[Int](1))
+ assert(seq2.toSet == Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001, 8001, 9001))
+ case 5000 =>
+ assert(seq1.toSet == Set[Int](5000))
+ assert(seq2.toSet == Set[Int]())
+ case 9999 =>
+ assert(seq1.toSet == Set[Int](9999))
+ assert(seq2.toSet == Set[Int]())
+ case _ =>
+ }
+ }
+
+ // larger cogroup - should spill ~7 times
+ val rddD1 = sc.parallelize(0 until 10000).map(i => (i/2, i))
+ val rddD2 = sc.parallelize(0 until 10000).map(i => (i/2, i))
+ val resultD = rddD1.cogroup(rddD2).collect()
+ assert(resultD.length == 5000)
+ resultD.foreach { case(i, (seq1, seq2)) =>
+ val expected = Set(i * 2, i * 2 + 1)
+ if (seq1.toSet != expected) {
+ fail(s"Value 1 for ${i} was wrong: expected ${expected}, got ${seq1.toSet}")
+ }
+ if (seq2.toSet != expected) {
+ fail(s"Value 2 for ${i} was wrong: expected ${expected}, got ${seq2.toSet}")
+ }
+ }
+ }
+
+ test("spilling in local cluster with many reduce tasks") {
+ val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+
+ // reduceByKey - should spill ~4 times per executor
+ val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
+ val resultA = rddA.reduceByKey(math.max _, 100).collect()
+ assert(resultA.length == 50000)
+ resultA.foreach { case(k, v) =>
+ if (v != k * 2 + 1) {
+ fail(s"Value for ${k} was wrong: expected ${k * 2 + 1}, got ${v}")
+ }
+ }
+
+ // groupByKey - should spill ~8 times per executor
+ val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i))
+ val resultB = rddB.groupByKey(100).collect()
+ assert(resultB.length == 25000)
+ resultB.foreach { case(i, seq) =>
+ val expected = Set(i * 4, i * 4 + 1, i * 4 + 2, i * 4 + 3)
+ if (seq.toSet != expected) {
+ fail(s"Value for ${i} was wrong: expected ${expected}, got ${seq.toSet}")
+ }
+ }
+
+ // cogroup - should spill ~4 times per executor
+ val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i))
+ val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i))
+ val resultC = rddC1.cogroup(rddC2, 100).collect()
+ assert(resultC.length == 10000)
+ resultC.foreach { case(i, (seq1, seq2)) =>
+ i match {
+ case 0 =>
+ assert(seq1.toSet == Set[Int](0))
+ assert(seq2.toSet == Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000))
+ case 1 =>
+ assert(seq1.toSet == Set[Int](1))
+ assert(seq2.toSet == Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001, 8001, 9001))
+ case 5000 =>
+ assert(seq1.toSet == Set[Int](5000))
+ assert(seq2.toSet == Set[Int]())
+ case 9999 =>
+ assert(seq1.toSet == Set[Int](9999))
+ assert(seq2.toSet == Set[Int]())
+ case _ =>
+ }
+ }
+
+ // larger cogroup - should spill ~4 times per executor
+ val rddD1 = sc.parallelize(0 until 10000).map(i => (i/2, i))
+ val rddD2 = sc.parallelize(0 until 10000).map(i => (i/2, i))
+ val resultD = rddD1.cogroup(rddD2).collect()
+ assert(resultD.length == 5000)
+ resultD.foreach { case(i, (seq1, seq2)) =>
+ val expected = Set(i * 2, i * 2 + 1)
+ if (seq1.toSet != expected) {
+ fail(s"Value 1 for ${i} was wrong: expected ${expected}, got ${seq1.toSet}")
+ }
+ if (seq2.toSet != expected) {
+ fail(s"Value 2 for ${i} was wrong: expected ${expected}, got ${seq2.toSet}")
+ }
+ }
+ }
+
+ test("cleanup of intermediate files in sorter") {
+ val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
+ sc = new SparkContext("local", "test", conf)
+ val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
+
+ val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None)
+ sorter.write((0 until 100000).iterator.map(i => (i, i)))
+ assert(diskBlockManager.getAllFiles().length > 0)
+ sorter.stop()
+ assert(diskBlockManager.getAllBlocks().length === 0)
+
+ val sorter2 = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None)
+ sorter2.write((0 until 100000).iterator.map(i => (i, i)))
+ assert(diskBlockManager.getAllFiles().length > 0)
+ assert(sorter2.iterator.toSet === (0 until 100000).map(i => (i, i)).toSet)
+ sorter2.stop()
+ assert(diskBlockManager.getAllBlocks().length === 0)
+ }
+
+ test("cleanup of intermediate files in sorter if there are errors") {
+ val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
+ sc = new SparkContext("local", "test", conf)
+ val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
+
+ val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None)
+ intercept[SparkException] {
+ sorter.write((0 until 100000).iterator.map(i => {
+ if (i == 99990) {
+ throw new SparkException("Intentional failure")
+ }
+ (i, i)
+ }))
+ }
+ assert(diskBlockManager.getAllFiles().length > 0)
+ sorter.stop()
+ assert(diskBlockManager.getAllBlocks().length === 0)
+ }
+
+ test("cleanup of intermediate files in shuffle") {
+ val conf = new SparkConf(false)
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
+ sc = new SparkContext("local", "test", conf)
+ val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
+
+ val data = sc.parallelize(0 until 100000, 2).map(i => (i, i))
+ assert(data.reduceByKey(_ + _).count() === 100000)
+
+ // After the shuffle, there should be only 4 files on disk: our two map output files and
+ // their index files. All other intermediate files should've been deleted.
+ assert(diskBlockManager.getAllFiles().length === 4)
+ }
+
+ test("cleanup of intermediate files in shuffle with errors") {
+ val conf = new SparkConf(false)
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
+ sc = new SparkContext("local", "test", conf)
+ val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
+
+ val data = sc.parallelize(0 until 100000, 2).map(i => {
+ if (i == 99990) {
+ throw new Exception("Intentional failure")
+ }
+ (i, i)
+ })
+ intercept[SparkException] {
+ data.reduceByKey(_ + _).count()
+ }
+
+ // After the shuffle, there should be only 2 files on disk: the output of task 1 and its index.
+ // All other files (map 2's output and intermediate merge files) should've been deleted.
+ assert(diskBlockManager.getAllFiles().length === 2)
+ }
+
+ test("no partial aggregation or sorting") {
+ val conf = new SparkConf(false)
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
+ sc = new SparkContext("local", "test", conf)
+
+ val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None)
+ sorter.write((0 until 100000).iterator.map(i => (i / 4, i)))
+ val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
+ val expected = (0 until 3).map(p => {
+ (p, (0 until 100000).map(i => (i / 4, i)).filter(_._1 % 3 == p).toSet)
+ }).toSet
+ assert(results === expected)
+ }
+
+ test("partial aggregation without spill") {
+ val conf = new SparkConf(false)
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
+ sc = new SparkContext("local", "test", conf)
+
+ val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
+ val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None)
+ sorter.write((0 until 100).iterator.map(i => (i / 2, i)))
+ val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
+ val expected = (0 until 3).map(p => {
+ (p, (0 until 50).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet)
+ }).toSet
+ assert(results === expected)
+ }
+
+ test("partial aggregation with spill, no ordering") {
+ val conf = new SparkConf(false)
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
+ sc = new SparkContext("local", "test", conf)
+
+ val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
+ val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None)
+ sorter.write((0 until 100000).iterator.map(i => (i / 2, i)))
+ val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
+ val expected = (0 until 3).map(p => {
+ (p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet)
+ }).toSet
+ assert(results === expected)
+ }
+
+ test("partial aggregation with spill, with ordering") {
+ val conf = new SparkConf(false)
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
+ sc = new SparkContext("local", "test", conf)
+
+ val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
+ val ord = implicitly[Ordering[Int]]
+ val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), Some(ord), None)
+ sorter.write((0 until 100000).iterator.map(i => (i / 2, i)))
+ val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
+ val expected = (0 until 3).map(p => {
+ (p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet)
+ }).toSet
+ assert(results === expected)
+ }
+
+ test("sorting without aggregation, no spill") {
+ val conf = new SparkConf(false)
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
+ sc = new SparkContext("local", "test", conf)
+
+ val ord = implicitly[Ordering[Int]]
+ val sorter = new ExternalSorter[Int, Int, Int](
+ None, Some(new HashPartitioner(3)), Some(ord), None)
+ sorter.write((0 until 100).iterator.map(i => (i, i)))
+ val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq
+ val expected = (0 until 3).map(p => {
+ (p, (0 until 100).map(i => (i, i)).filter(_._1 % 3 == p).toSeq)
+ }).toSeq
+ assert(results === expected)
+ }
+
+ test("sorting without aggregation, with spill") {
+ val conf = new SparkConf(false)
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
+ sc = new SparkContext("local", "test", conf)
+
+ val ord = implicitly[Ordering[Int]]
+ val sorter = new ExternalSorter[Int, Int, Int](
+ None, Some(new HashPartitioner(3)), Some(ord), None)
+ sorter.write((0 until 100000).iterator.map(i => (i, i)))
+ val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq
+ val expected = (0 until 3).map(p => {
+ (p, (0 until 100000).map(i => (i, i)).filter(_._1 % 3 == p).toSeq)
+ }).toSeq
+ assert(results === expected)
+ }
+
+ test("spilling with hash collisions") {
+ val conf = new SparkConf(true)
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+
+ def createCombiner(i: String) = ArrayBuffer[String](i)
+ def mergeValue(buffer: ArrayBuffer[String], i: String) = buffer += i
+ def mergeCombiners(buffer1: ArrayBuffer[String], buffer2: ArrayBuffer[String]) =
+ buffer1 ++= buffer2
+
+ val agg = new Aggregator[String, String, ArrayBuffer[String]](
+ createCombiner _, mergeValue _, mergeCombiners _)
+
+ val sorter = new ExternalSorter[String, String, ArrayBuffer[String]](
+ Some(agg), None, None, None)
+
+ val collisionPairs = Seq(
+ ("Aa", "BB"), // 2112
+ ("to", "v1"), // 3707
+ ("variants", "gelato"), // -1249574770
+ ("Teheran", "Siblings"), // 231609873
+ ("misused", "horsemints"), // 1069518484
+ ("isohel", "epistolaries"), // -1179291542
+ ("righto", "buzzards"), // -931102253
+ ("hierarch", "crinolines"), // -1732884796
+ ("inwork", "hypercatalexes"), // -1183663690
+ ("wainages", "presentencing"), // 240183619
+ ("trichothecenes", "locular"), // 339006536
+ ("pomatoes", "eructation") // 568647356
+ )
+
+ collisionPairs.foreach { case (w1, w2) =>
+ // String.hashCode is documented to use a specific algorithm, but check just in case
+ assert(w1.hashCode === w2.hashCode)
+ }
+
+ val toInsert = (1 to 100000).iterator.map(_.toString).map(s => (s, s)) ++
+ collisionPairs.iterator ++ collisionPairs.iterator.map(_.swap)
+
+ sorter.write(toInsert)
+
+ // A map of collision pairs in both directions
+ val collisionPairsMap = (collisionPairs ++ collisionPairs.map(_.swap)).toMap
+
+ // Avoid map.size or map.iterator.length because this destructively sorts the underlying map
+ var count = 0
+
+ val it = sorter.iterator
+ while (it.hasNext) {
+ val kv = it.next()
+ val expectedValue = ArrayBuffer[String](collisionPairsMap.getOrElse(kv._1, kv._1))
+ assert(kv._2.equals(expectedValue))
+ count += 1
+ }
+ assert(count === 100000 + collisionPairs.size * 2)
+ }
+
+ test("spilling with many hash collisions") {
+ val conf = new SparkConf(true)
+ conf.set("spark.shuffle.memoryFraction", "0.0001")
+ sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+
+ val agg = new Aggregator[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
+ val sorter = new ExternalSorter[FixedHashObject, Int, Int](Some(agg), None, None, None)
+
+ // Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
+ // problems if the map fails to group together the objects with the same code (SPARK-2043).
+ val toInsert = for (i <- 1 to 10; j <- 1 to 10000) yield (FixedHashObject(j, j % 2), 1)
+ sorter.write(toInsert.iterator)
+
+ val it = sorter.iterator
+ var count = 0
+ while (it.hasNext) {
+ val kv = it.next()
+ assert(kv._2 === 10)
+ count += 1
+ }
+ assert(count === 10000)
+ }
+
+ test("spilling with hash collisions using the Int.MaxValue key") {
+ val conf = new SparkConf(true)
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+
+ def createCombiner(i: Int) = ArrayBuffer[Int](i)
+ def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
+ def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2
+
+ val agg = new Aggregator[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners)
+ val sorter = new ExternalSorter[Int, Int, ArrayBuffer[Int]](Some(agg), None, None, None)
+
+ sorter.write((1 to 100000).iterator.map(i => (i, i)) ++ Iterator((Int.MaxValue, Int.MaxValue)))
+
+ val it = sorter.iterator
+ while (it.hasNext) {
+ // Should not throw NoSuchElementException
+ it.next()
+ }
+ }
+
+ test("spilling with null keys and values") {
+ val conf = new SparkConf(true)
+ conf.set("spark.shuffle.memoryFraction", "0.001")
+ sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+
+ def createCombiner(i: String) = ArrayBuffer[String](i)
+ def mergeValue(buffer: ArrayBuffer[String], i: String) = buffer += i
+ def mergeCombiners(buf1: ArrayBuffer[String], buf2: ArrayBuffer[String]) = buf1 ++= buf2
+
+ val agg = new Aggregator[String, String, ArrayBuffer[String]](
+ createCombiner, mergeValue, mergeCombiners)
+
+ val sorter = new ExternalSorter[String, String, ArrayBuffer[String]](
+ Some(agg), None, None, None)
+
+ sorter.write((1 to 100000).iterator.map(i => (i.toString, i.toString)) ++ Iterator(
+ (null.asInstanceOf[String], "1"),
+ ("1", null.asInstanceOf[String]),
+ (null.asInstanceOf[String], null.asInstanceOf[String])
+ ))
+
+ val it = sorter.iterator
+ while (it.hasNext) {
+ // Should not throw NullPointerException
+ it.next()
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/FixedHashObject.scala b/core/src/test/scala/org/apache/spark/util/collection/FixedHashObject.scala
new file mode 100644
index 0000000000..c787b5f066
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/FixedHashObject.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+/**
+ * A dummy class that always returns the same hash code, to easily test hash collisions
+ */
+case class FixedHashObject(v: Int, h: Int) extends Serializable {
+ override def hashCode(): Int = h
+}