aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-08-28 17:05:21 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-08-28 17:05:21 -0700
commita46b8f2d710d82ba3a212cac64b610a67b8798f9 (patch)
tree50cfb2e6c5c6a55948c6cb95425e7a3624ffb60c /core
parent92af2314f27e80227174499f2fca505bd551cda7 (diff)
downloadspark-a46b8f2d710d82ba3a212cac64b610a67b8798f9.tar.gz
spark-a46b8f2d710d82ba3a212cac64b610a67b8798f9.tar.bz2
spark-a46b8f2d710d82ba3a212cac64b610a67b8798f9.zip
[SPARK-3277] Fix external spilling with LZ4 assertion error
**Summary of the changes** The bulk of this PR is comprised of tests and documentation; the actual fix is really just adding 1 line of code (see `BlockObjectWriter.scala`). We currently do not run the `External*` test suites with different compression codecs, and this would have caught the bug reported in [SPARK-3277](https://issues.apache.org/jira/browse/SPARK-3277). This PR extends the existing code to test spilling using all compression codecs known to Spark, including `LZ4`. **The bug itself** In `DiskBlockObjectWriter`, we only report the shuffle bytes written before we close the streams. With `LZ4`, all the bytes written reported by our metrics were 0 because `flush()` was not taking effect for some reason. In general, compression codecs may write additional bytes to the file after we call `close()`, and so we must also capture those bytes in our shuffle write metrics. Thanks mridulm and pwendell for help with debugging. Author: Andrew Or <andrewor14@gmail.com> Author: Patrick Wendell <pwendell@gmail.com> Closes #2187 from andrewor14/fix-lz4-spilling and squashes the following commits: 1b54bdc [Andrew Or] Speed up tests by not compressing everything 1c4624e [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-lz4-spilling 6b2e7d1 [Andrew Or] Fix compilation error 92e251b [Patrick Wendell] Better documentation for BlockObjectWriter. a1ad536 [Andrew Or] Fix tests 089593f [Andrew Or] Actually fix SPARK-3277 (tests still fail) 4bbcf68 [Andrew Or] Update tests to actually test all compression codecs b264a84 [Andrew Or] ExternalAppendOnlyMapSuite code style fixes (minor) 1bfa743 [Andrew Or] Add more information to assert for better debugging
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/io/CompressionCodec.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala190
5 files changed, 144 insertions, 96 deletions
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index ef9c43ecf1..1ac7f4e448 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -64,6 +64,7 @@ private[spark] object CompressionCodec {
}
val DEFAULT_COMPRESSION_CODEC = "snappy"
+ val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index adda971fd7..9c469370ff 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -65,8 +65,6 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
/**
* BlockObjectWriter which writes directly to a file on disk. Appends to the given file.
- * The given write metrics will be updated incrementally, but will not necessarily be current until
- * commitAndClose is called.
*/
private[spark] class DiskBlockObjectWriter(
blockId: BlockId,
@@ -75,6 +73,8 @@ private[spark] class DiskBlockObjectWriter(
bufferSize: Int,
compressStream: OutputStream => OutputStream,
syncWrites: Boolean,
+ // These write metrics concurrently shared with other active BlockObjectWriter's who
+ // are themselves performing writes. All updates must be relative.
writeMetrics: ShuffleWriteMetrics)
extends BlockObjectWriter(blockId)
with Logging
@@ -94,14 +94,30 @@ private[spark] class DiskBlockObjectWriter(
private var fos: FileOutputStream = null
private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
+ private var initialized = false
+
+ /**
+ * Cursors used to represent positions in the file.
+ *
+ * xxxxxxxx|--------|--- |
+ * ^ ^ ^
+ * | | finalPosition
+ * | reportedPosition
+ * initialPosition
+ *
+ * initialPosition: Offset in the file where we start writing. Immutable.
+ * reportedPosition: Position at the time of the last update to the write metrics.
+ * finalPosition: Offset where we stopped writing. Set on closeAndCommit() then never changed.
+ * -----: Current writes to the underlying file.
+ * xxxxx: Existing contents of the file.
+ */
private val initialPosition = file.length()
private var finalPosition: Long = -1
- private var initialized = false
+ private var reportedPosition = initialPosition
/** Calling channel.position() to update the write metrics can be a little bit expensive, so we
* only call it every N writes */
private var writesSinceMetricsUpdate = 0
- private var lastPosition = initialPosition
override def open(): BlockObjectWriter = {
fos = new FileOutputStream(file, true)
@@ -140,17 +156,18 @@ private[spark] class DiskBlockObjectWriter(
// serializer stream and the lower level stream.
objOut.flush()
bs.flush()
- updateBytesWritten()
close()
}
finalPosition = file.length()
+ // In certain compression codecs, more bytes are written after close() is called
+ writeMetrics.shuffleBytesWritten += (finalPosition - reportedPosition)
}
// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
override def revertPartialWritesAndClose() {
try {
- writeMetrics.shuffleBytesWritten -= (lastPosition - initialPosition)
+ writeMetrics.shuffleBytesWritten -= (reportedPosition - initialPosition)
if (initialized) {
objOut.flush()
@@ -189,10 +206,14 @@ private[spark] class DiskBlockObjectWriter(
new FileSegment(file, initialPosition, finalPosition - initialPosition)
}
+ /**
+ * Report the number of bytes written in this writer's shuffle write metrics.
+ * Note that this is only valid before the underlying streams are closed.
+ */
private def updateBytesWritten() {
val pos = channel.position()
- writeMetrics.shuffleBytesWritten += (pos - lastPosition)
- lastPosition = pos
+ writeMetrics.shuffleBytesWritten += (pos - reportedPosition)
+ reportedPosition = pos
}
private def callWithTiming(f: => Unit) = {
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 9f85b94a70..8a015c1d26 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -413,7 +413,12 @@ class ExternalAppendOnlyMap[K, V, C](
extends Iterator[(K, C)]
{
private val batchOffsets = batchSizes.scanLeft(0L)(_ + _) // Size will be batchSize.length + 1
- assert(file.length() == batchOffsets(batchOffsets.length - 1))
+ assert(file.length() == batchOffsets.last,
+ "File length is not equal to the last batch offset:\n" +
+ s" file length = ${file.length}\n" +
+ s" last batch offset = ${batchOffsets.last}\n" +
+ s" all batch offsets = ${batchOffsets.mkString(",")}"
+ )
private var batchIndex = 0 // Which batch we're in
private var fileStream: FileInputStream = null
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index d81499ac6a..6b6e0104e5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -33,10 +33,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
*/
class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
private val fileSystem = Utils.getHadoopFileSystem("/")
- private val allCompressionCodecs = Seq[String](
- "org.apache.spark.io.LZFCompressionCodec",
- "org.apache.spark.io.SnappyCompressionCodec"
- )
+ private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
private var testDir: File = _
before {
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 04d7338488..ac3931e3d0 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -23,37 +23,42 @@ import org.scalatest.FunSuite
import org.apache.spark._
import org.apache.spark.SparkContext._
+import org.apache.spark.io.CompressionCodec
class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
+ private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
+ private def createCombiner[T](i: T) = ArrayBuffer[T](i)
+ private def mergeValue[T](buffer: ArrayBuffer[T], i: T): ArrayBuffer[T] = buffer += i
+ private def mergeCombiners[T](buf1: ArrayBuffer[T], buf2: ArrayBuffer[T]): ArrayBuffer[T] =
+ buf1 ++= buf2
- private def createCombiner(i: Int) = ArrayBuffer[Int](i)
- private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
- private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2
+ private def createExternalMap[T] = new ExternalAppendOnlyMap[T, T, ArrayBuffer[T]](
+ createCombiner[T], mergeValue[T], mergeCombiners[T])
- private def createSparkConf(loadDefaults: Boolean): SparkConf = {
+ private def createSparkConf(loadDefaults: Boolean, codec: Option[String] = None): SparkConf = {
val conf = new SparkConf(loadDefaults)
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
conf.set("spark.serializer.objectStreamReset", "1")
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+ conf.set("spark.shuffle.spill.compress", codec.isDefined.toString)
+ codec.foreach { c => conf.set("spark.io.compression.codec", c) }
// Ensure that we actually have multiple batches per spill file
conf.set("spark.shuffle.spill.batchSize", "10")
conf
}
test("simple insert") {
- val conf = createSparkConf(false)
+ val conf = createSparkConf(loadDefaults = false)
sc = new SparkContext("local", "test", conf)
-
- val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
- mergeValue, mergeCombiners)
+ val map = createExternalMap[Int]
// Single insert
map.insert(1, 10)
var it = map.iterator
assert(it.hasNext)
val kv = it.next()
- assert(kv._1 == 1 && kv._2 == ArrayBuffer[Int](10))
+ assert(kv._1 === 1 && kv._2 === ArrayBuffer[Int](10))
assert(!it.hasNext)
// Multiple insert
@@ -61,18 +66,17 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
map.insert(3, 30)
it = map.iterator
assert(it.hasNext)
- assert(it.toSet == Set[(Int, ArrayBuffer[Int])](
+ assert(it.toSet === Set[(Int, ArrayBuffer[Int])](
(1, ArrayBuffer[Int](10)),
(2, ArrayBuffer[Int](20)),
(3, ArrayBuffer[Int](30))))
+ sc.stop()
}
test("insert with collision") {
- val conf = createSparkConf(false)
+ val conf = createSparkConf(loadDefaults = false)
sc = new SparkContext("local", "test", conf)
-
- val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
- mergeValue, mergeCombiners)
+ val map = createExternalMap[Int]
map.insertAll(Seq(
(1, 10),
@@ -84,30 +88,28 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
val it = map.iterator
assert(it.hasNext)
val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
- assert(result == Set[(Int, Set[Int])](
+ assert(result === Set[(Int, Set[Int])](
(1, Set[Int](10, 100, 1000)),
(2, Set[Int](20, 200)),
(3, Set[Int](30))))
+ sc.stop()
}
test("ordering") {
- val conf = createSparkConf(false)
+ val conf = createSparkConf(loadDefaults = false)
sc = new SparkContext("local", "test", conf)
- val map1 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
- mergeValue, mergeCombiners)
+ val map1 = createExternalMap[Int]
map1.insert(1, 10)
map1.insert(2, 20)
map1.insert(3, 30)
- val map2 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
- mergeValue, mergeCombiners)
+ val map2 = createExternalMap[Int]
map2.insert(2, 20)
map2.insert(3, 30)
map2.insert(1, 10)
- val map3 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
- mergeValue, mergeCombiners)
+ val map3 = createExternalMap[Int]
map3.insert(3, 30)
map3.insert(1, 10)
map3.insert(2, 20)
@@ -119,33 +121,33 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
var kv1 = it1.next()
var kv2 = it2.next()
var kv3 = it3.next()
- assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
- assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
+ assert(kv1._1 === kv2._1 && kv2._1 === kv3._1)
+ assert(kv1._2 === kv2._2 && kv2._2 === kv3._2)
kv1 = it1.next()
kv2 = it2.next()
kv3 = it3.next()
- assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
- assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
+ assert(kv1._1 === kv2._1 && kv2._1 === kv3._1)
+ assert(kv1._2 === kv2._2 && kv2._2 === kv3._2)
kv1 = it1.next()
kv2 = it2.next()
kv3 = it3.next()
- assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
- assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
+ assert(kv1._1 === kv2._1 && kv2._1 === kv3._1)
+ assert(kv1._2 === kv2._2 && kv2._2 === kv3._2)
+ sc.stop()
}
test("null keys and values") {
- val conf = createSparkConf(false)
+ val conf = createSparkConf(loadDefaults = false)
sc = new SparkContext("local", "test", conf)
- val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
- mergeValue, mergeCombiners)
+ val map = createExternalMap[Int]
map.insert(1, 5)
map.insert(2, 6)
map.insert(3, 7)
assert(map.size === 3)
- assert(map.iterator.toSet == Set[(Int, Seq[Int])](
+ assert(map.iterator.toSet === Set[(Int, Seq[Int])](
(1, Seq[Int](5)),
(2, Seq[Int](6)),
(3, Seq[Int](7))
@@ -155,7 +157,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
val nullInt = null.asInstanceOf[Int]
map.insert(nullInt, 8)
assert(map.size === 4)
- assert(map.iterator.toSet == Set[(Int, Seq[Int])](
+ assert(map.iterator.toSet === Set[(Int, Seq[Int])](
(1, Seq[Int](5)),
(2, Seq[Int](6)),
(3, Seq[Int](7)),
@@ -167,32 +169,34 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
map.insert(nullInt, nullInt)
assert(map.size === 5)
val result = map.iterator.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
- assert(result == Set[(Int, Set[Int])](
+ assert(result === Set[(Int, Set[Int])](
(1, Set[Int](5)),
(2, Set[Int](6)),
(3, Set[Int](7)),
(4, Set[Int](nullInt)),
(nullInt, Set[Int](nullInt, 8))
))
+ sc.stop()
}
test("simple aggregator") {
- val conf = createSparkConf(false)
+ val conf = createSparkConf(loadDefaults = false)
sc = new SparkContext("local", "test", conf)
// reduceByKey
val rdd = sc.parallelize(1 to 10).map(i => (i%2, 1))
val result1 = rdd.reduceByKey(_+_).collect()
- assert(result1.toSet == Set[(Int, Int)]((0, 5), (1, 5)))
+ assert(result1.toSet === Set[(Int, Int)]((0, 5), (1, 5)))
// groupByKey
val result2 = rdd.groupByKey().collect().map(x => (x._1, x._2.toList)).toSet
- assert(result2.toSet == Set[(Int, Seq[Int])]
+ assert(result2.toSet === Set[(Int, Seq[Int])]
((0, List[Int](1, 1, 1, 1, 1)), (1, List[Int](1, 1, 1, 1, 1))))
+ sc.stop()
}
test("simple cogroup") {
- val conf = createSparkConf(false)
+ val conf = createSparkConf(loadDefaults = false)
sc = new SparkContext("local", "test", conf)
val rdd1 = sc.parallelize(1 to 4).map(i => (i, i))
val rdd2 = sc.parallelize(1 to 4).map(i => (i%2, i))
@@ -200,77 +204,98 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
result.foreach { case (i, (seq1, seq2)) =>
i match {
- case 0 => assert(seq1.toSet == Set[Int]() && seq2.toSet == Set[Int](2, 4))
- case 1 => assert(seq1.toSet == Set[Int](1) && seq2.toSet == Set[Int](1, 3))
- case 2 => assert(seq1.toSet == Set[Int](2) && seq2.toSet == Set[Int]())
- case 3 => assert(seq1.toSet == Set[Int](3) && seq2.toSet == Set[Int]())
- case 4 => assert(seq1.toSet == Set[Int](4) && seq2.toSet == Set[Int]())
+ case 0 => assert(seq1.toSet === Set[Int]() && seq2.toSet === Set[Int](2, 4))
+ case 1 => assert(seq1.toSet === Set[Int](1) && seq2.toSet === Set[Int](1, 3))
+ case 2 => assert(seq1.toSet === Set[Int](2) && seq2.toSet === Set[Int]())
+ case 3 => assert(seq1.toSet === Set[Int](3) && seq2.toSet === Set[Int]())
+ case 4 => assert(seq1.toSet === Set[Int](4) && seq2.toSet === Set[Int]())
}
}
+ sc.stop()
}
test("spilling") {
- val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
+ testSimpleSpilling()
+ }
+
+ test("spilling with compression") {
+ // Keep track of which compression codec we're using to report in test failure messages
+ var lastCompressionCodec: Option[String] = None
+ try {
+ allCompressionCodecs.foreach { c =>
+ lastCompressionCodec = Some(c)
+ testSimpleSpilling(Some(c))
+ }
+ } catch {
+ // Include compression codec used in test failure message
+ // We need to catch Throwable here because assertion failures are not covered by Exceptions
+ case t: Throwable =>
+ val compressionMessage = lastCompressionCodec
+ .map { c => "with compression using codec " + c }
+ .getOrElse("without compression")
+ val newException = new Exception(s"Test failed $compressionMessage:\n\n${t.getMessage}")
+ newException.setStackTrace(t.getStackTrace)
+ throw newException
+ }
+ }
+
+ /**
+ * Test spilling through simple aggregations and cogroups.
+ * If a compression codec is provided, use it. Otherwise, do not compress spills.
+ */
+ private def testSimpleSpilling(codec: Option[String] = None): Unit = {
+ val conf = createSparkConf(loadDefaults = true, codec) // Load defaults for Spark home
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
// reduceByKey - should spill ~8 times
val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
val resultA = rddA.reduceByKey(math.max).collect()
- assert(resultA.length == 50000)
- resultA.foreach { case(k, v) =>
- if (v != k * 2 + 1) {
- fail(s"Value for ${k} was wrong: expected ${k * 2 + 1}, got ${v}")
- }
+ assert(resultA.length === 50000)
+ resultA.foreach { case (k, v) =>
+ assert(v === k * 2 + 1, s"Value for $k was wrong: expected ${k * 2 + 1}, got $v")
}
// groupByKey - should spill ~17 times
val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i))
val resultB = rddB.groupByKey().collect()
- assert(resultB.length == 25000)
- resultB.foreach { case(i, seq) =>
+ assert(resultB.length === 25000)
+ resultB.foreach { case (i, seq) =>
val expected = Set(i * 4, i * 4 + 1, i * 4 + 2, i * 4 + 3)
- if (seq.toSet != expected) {
- fail(s"Value for ${i} was wrong: expected ${expected}, got ${seq.toSet}")
- }
+ assert(seq.toSet === expected,
+ s"Value for $i was wrong: expected $expected, got ${seq.toSet}")
}
// cogroup - should spill ~7 times
val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i))
val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i))
val resultC = rddC1.cogroup(rddC2).collect()
- assert(resultC.length == 10000)
- resultC.foreach { case(i, (seq1, seq2)) =>
+ assert(resultC.length === 10000)
+ resultC.foreach { case (i, (seq1, seq2)) =>
i match {
case 0 =>
- assert(seq1.toSet == Set[Int](0))
- assert(seq2.toSet == Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000))
+ assert(seq1.toSet === Set[Int](0))
+ assert(seq2.toSet === Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000))
case 1 =>
- assert(seq1.toSet == Set[Int](1))
- assert(seq2.toSet == Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001, 8001, 9001))
+ assert(seq1.toSet === Set[Int](1))
+ assert(seq2.toSet === Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001, 8001, 9001))
case 5000 =>
- assert(seq1.toSet == Set[Int](5000))
- assert(seq2.toSet == Set[Int]())
+ assert(seq1.toSet === Set[Int](5000))
+ assert(seq2.toSet === Set[Int]())
case 9999 =>
- assert(seq1.toSet == Set[Int](9999))
- assert(seq2.toSet == Set[Int]())
+ assert(seq1.toSet === Set[Int](9999))
+ assert(seq2.toSet === Set[Int]())
case _ =>
}
}
+ sc.stop()
}
test("spilling with hash collisions") {
- val conf = createSparkConf(true)
+ val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
-
- def createCombiner(i: String) = ArrayBuffer[String](i)
- def mergeValue(buffer: ArrayBuffer[String], i: String) = buffer += i
- def mergeCombiners(buffer1: ArrayBuffer[String], buffer2: ArrayBuffer[String]) =
- buffer1 ++= buffer2
-
- val map = new ExternalAppendOnlyMap[String, String, ArrayBuffer[String]](
- createCombiner, mergeValue, mergeCombiners)
+ val map = createExternalMap[String]
val collisionPairs = Seq(
("Aa", "BB"), // 2112
@@ -312,13 +337,13 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
count += 1
}
assert(count === 100000 + collisionPairs.size * 2)
+ sc.stop()
}
test("spilling with many hash collisions") {
- val conf = createSparkConf(true)
+ val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.memoryFraction", "0.0001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
-
val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
// Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
@@ -337,15 +362,14 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
count += 1
}
assert(count === 10000)
+ sc.stop()
}
test("spilling with hash collisions using the Int.MaxValue key") {
- val conf = createSparkConf(true)
+ val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
-
- val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](
- createCombiner, mergeValue, mergeCombiners)
+ val map = createExternalMap[Int]
(1 to 100000).foreach { i => map.insert(i, i) }
map.insert(Int.MaxValue, Int.MaxValue)
@@ -355,15 +379,14 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
// Should not throw NoSuchElementException
it.next()
}
+ sc.stop()
}
test("spilling with null keys and values") {
- val conf = createSparkConf(true)
+ val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
-
- val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](
- createCombiner, mergeValue, mergeCombiners)
+ val map = createExternalMap[Int]
map.insertAll((1 to 100000).iterator.map(i => (i, i)))
map.insert(null.asInstanceOf[Int], 1)
@@ -375,6 +398,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
// Should not throw NullPointerException
it.next()
}
+ sc.stop()
}
}