aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-10 16:25:01 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-10 16:25:01 -0800
commitd37408f39ca3fd94f45b50a65f919f4d7007a533 (patch)
tree156e7f6639c22f919a932db2a9b90e803d26c94d /core/src/test
parent0eaf01c5ed856c9aeb60c0841c3be9305c6da174 (diff)
parent2e393cd5fdfbf3a85fced370b5c42315e86dad49 (diff)
downloadspark-d37408f39ca3fd94f45b50a65f919f4d7007a533.tar.gz
spark-d37408f39ca3fd94f45b50a65f919f4d7007a533.tar.bz2
spark-d37408f39ca3fd94f45b50a65f919f4d7007a533.zip
Merge pull request #377 from andrewor14/master
External Sorting for Aggregator and CoGroupedRDDs (Revisited) (This pull request is re-opened from https://github.com/apache/incubator-spark/pull/303, which was closed because Jenkins / github was misbehaving) The target issue for this patch is the out-of-memory exceptions triggered by aggregate operations such as reduce, groupBy, join, and cogroup. The existing AppendOnlyMap used by these operations resides purely in memory, and grows with the size of the input data until the amount of allocated memory is exceeded. Under large workloads, this problem is aggravated by the fact that OOM frequently occurs only after a very long (> 1 hour) map phase, in which case the entire job must be restarted. The solution is to spill the contents of this map to disk once a certain memory threshold is exceeded. This functionality is provided by ExternalAppendOnlyMap, which additionally sorts this buffer before writing it out to disk, and later merges these buffers back in sorted order. Under normal circumstances in which OOM is not triggered, ExternalAppendOnlyMap is simply a wrapper around AppendOnlyMap and incurs little overhead. Only when the memory usage is expected to exceed the given threshold does ExternalAppendOnlyMap spill to disk.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala120
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/AppendOnlyMapSuite.scala)46
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala230
3 files changed, 395 insertions, 1 deletions
diff --git a/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
new file mode 100644
index 0000000000..93f0c6a8e6
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import scala.util.Random
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.util.SizeTrackingAppendOnlyMapSuite.LargeDummyClass
+import org.apache.spark.util.collection.{AppendOnlyMap, SizeTrackingAppendOnlyMap}
+
+class SizeTrackingAppendOnlyMapSuite extends FunSuite with BeforeAndAfterAll {
+ val NORMAL_ERROR = 0.20
+ val HIGH_ERROR = 0.30
+
+ test("fixed size insertions") {
+ testWith[Int, Long](10000, i => (i, i.toLong))
+ testWith[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong)))
+ testWith[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass()))
+ }
+
+ test("variable size insertions") {
+ val rand = new Random(123456789)
+ def randString(minLen: Int, maxLen: Int): String = {
+ "a" * (rand.nextInt(maxLen - minLen) + minLen)
+ }
+ testWith[Int, String](10000, i => (i, randString(0, 10)))
+ testWith[Int, String](10000, i => (i, randString(0, 100)))
+ testWith[Int, String](10000, i => (i, randString(90, 100)))
+ }
+
+ test("updates") {
+ val rand = new Random(123456789)
+ def randString(minLen: Int, maxLen: Int): String = {
+ "a" * (rand.nextInt(maxLen - minLen) + minLen)
+ }
+ testWith[String, Int](10000, i => (randString(0, 10000), i))
+ }
+
+ def testWith[K, V](numElements: Int, makeElement: (Int) => (K, V)) {
+ val map = new SizeTrackingAppendOnlyMap[K, V]()
+ for (i <- 0 until numElements) {
+ val (k, v) = makeElement(i)
+ map(k) = v
+ expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR)
+ }
+ }
+
+ def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) {
+ val betterEstimatedSize = SizeEstimator.estimate(obj)
+ assert(betterEstimatedSize * (1 - error) < estimatedSize,
+ s"Estimated size $estimatedSize was less than expected size $betterEstimatedSize")
+ assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize,
+ s"Estimated size $estimatedSize was greater than expected size $betterEstimatedSize")
+ }
+}
+
+object SizeTrackingAppendOnlyMapSuite {
+ // Speed test, for reproducibility of results.
+ // These could be highly non-deterministic in general, however.
+ // Results:
+ // AppendOnlyMap: 31 ms
+ // SizeTracker: 54 ms
+ // SizeEstimator: 1500 ms
+ def main(args: Array[String]) {
+ val numElements = 100000
+
+ val baseTimes = for (i <- 0 until 10) yield time {
+ val map = new AppendOnlyMap[Int, LargeDummyClass]()
+ for (i <- 0 until numElements) {
+ map(i) = new LargeDummyClass()
+ }
+ }
+
+ val sampledTimes = for (i <- 0 until 10) yield time {
+ val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]()
+ for (i <- 0 until numElements) {
+ map(i) = new LargeDummyClass()
+ map.estimateSize()
+ }
+ }
+
+ val unsampledTimes = for (i <- 0 until 3) yield time {
+ val map = new AppendOnlyMap[Int, LargeDummyClass]()
+ for (i <- 0 until numElements) {
+ map(i) = new LargeDummyClass()
+ SizeEstimator.estimate(map)
+ }
+ }
+
+ println("Base: " + baseTimes)
+ println("SizeTracker (sampled): " + sampledTimes)
+ println("SizeEstimator (unsampled): " + unsampledTimes)
+ }
+
+ def time(f: => Unit): Long = {
+ val start = System.currentTimeMillis()
+ f
+ System.currentTimeMillis() - start
+ }
+
+ private class LargeDummyClass {
+ val arr = new Array[Int](100)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/AppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
index 7177919a58..f44442f1a5 100644
--- a/core/src/test/scala/org/apache/spark/util/AppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.spark.util
+package org.apache.spark.util.collection
import scala.collection.mutable.HashSet
import org.scalatest.FunSuite
+import java.util.Comparator
class AppendOnlyMapSuite extends FunSuite {
test("initialization") {
@@ -151,4 +152,47 @@ class AppendOnlyMapSuite extends FunSuite {
assert(map("" + i) === "" + i)
}
}
+
+ test("destructive sort") {
+ val map = new AppendOnlyMap[String, String]()
+ for (i <- 1 to 100) {
+ map("" + i) = "" + i
+ }
+ map.update(null, "happy new year!")
+
+ try {
+ map.apply("1")
+ map.update("1", "2013")
+ map.changeValue("1", (hadValue, oldValue) => "2014")
+ map.iterator
+ } catch {
+ case e: IllegalStateException => fail()
+ }
+
+ val it = map.destructiveSortedIterator(new Comparator[(String, String)] {
+ def compare(kv1: (String, String), kv2: (String, String)): Int = {
+ val x = if (kv1 != null && kv1._1 != null) kv1._1.toInt else Int.MinValue
+ val y = if (kv2 != null && kv2._1 != null) kv2._1.toInt else Int.MinValue
+ x.compareTo(y)
+ }
+ })
+
+ // Should be sorted by key
+ assert(it.hasNext)
+ var previous = it.next()
+ assert(previous == (null, "happy new year!"))
+ previous = it.next()
+ assert(previous == ("1", "2014"))
+ while (it.hasNext) {
+ val kv = it.next()
+ assert(kv._1.toInt > previous._1.toInt)
+ previous = kv
+ }
+
+ // All subsequent calls to apply, update, changeValue and iterator should throw exception
+ intercept[AssertionError] { map.apply("1") }
+ intercept[AssertionError] { map.update("1", "2013") }
+ intercept[AssertionError] { map.changeValue("1", (hadValue, oldValue) => "2014") }
+ intercept[AssertionError] { map.iterator }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
new file mode 100644
index 0000000000..ef957bb0e5
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -0,0 +1,230 @@
+package org.apache.spark.util.collection
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+
+class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
+
+ override def beforeEach() {
+ val conf = new SparkConf(false)
+ conf.set("spark.shuffle.externalSorting", "true")
+ sc = new SparkContext("local", "test", conf)
+ }
+
+ val createCombiner: (Int => ArrayBuffer[Int]) = i => ArrayBuffer[Int](i)
+ val mergeValue: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = (buffer, i) => {
+ buffer += i
+ }
+ val mergeCombiners: (ArrayBuffer[Int], ArrayBuffer[Int]) => ArrayBuffer[Int] =
+ (buf1, buf2) => {
+ buf1 ++= buf2
+ }
+
+ test("simple insert") {
+ val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
+ mergeValue, mergeCombiners)
+
+ // Single insert
+ map.insert(1, 10)
+ var it = map.iterator
+ assert(it.hasNext)
+ val kv = it.next()
+ assert(kv._1 == 1 && kv._2 == ArrayBuffer[Int](10))
+ assert(!it.hasNext)
+
+ // Multiple insert
+ map.insert(2, 20)
+ map.insert(3, 30)
+ it = map.iterator
+ assert(it.hasNext)
+ assert(it.toSet == Set[(Int, ArrayBuffer[Int])](
+ (1, ArrayBuffer[Int](10)),
+ (2, ArrayBuffer[Int](20)),
+ (3, ArrayBuffer[Int](30))))
+ }
+
+ test("insert with collision") {
+ val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
+ mergeValue, mergeCombiners)
+
+ map.insert(1, 10)
+ map.insert(2, 20)
+ map.insert(3, 30)
+ map.insert(1, 100)
+ map.insert(2, 200)
+ map.insert(1, 1000)
+ val it = map.iterator
+ assert(it.hasNext)
+ val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
+ assert(result == Set[(Int, Set[Int])](
+ (1, Set[Int](10, 100, 1000)),
+ (2, Set[Int](20, 200)),
+ (3, Set[Int](30))))
+ }
+
+ test("ordering") {
+ val map1 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
+ mergeValue, mergeCombiners)
+ map1.insert(1, 10)
+ map1.insert(2, 20)
+ map1.insert(3, 30)
+
+ val map2 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
+ mergeValue, mergeCombiners)
+ map2.insert(2, 20)
+ map2.insert(3, 30)
+ map2.insert(1, 10)
+
+ val map3 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
+ mergeValue, mergeCombiners)
+ map3.insert(3, 30)
+ map3.insert(1, 10)
+ map3.insert(2, 20)
+
+ val it1 = map1.iterator
+ val it2 = map2.iterator
+ val it3 = map3.iterator
+
+ var kv1 = it1.next()
+ var kv2 = it2.next()
+ var kv3 = it3.next()
+ assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
+ assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
+
+ kv1 = it1.next()
+ kv2 = it2.next()
+ kv3 = it3.next()
+ assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
+ assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
+
+ kv1 = it1.next()
+ kv2 = it2.next()
+ kv3 = it3.next()
+ assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
+ assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
+ }
+
+ test("null keys and values") {
+ val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
+ mergeValue, mergeCombiners)
+ map.insert(1, 5)
+ map.insert(2, 6)
+ map.insert(3, 7)
+ assert(map.size === 3)
+ assert(map.iterator.toSet == Set[(Int, Seq[Int])](
+ (1, Seq[Int](5)),
+ (2, Seq[Int](6)),
+ (3, Seq[Int](7))
+ ))
+
+ // Null keys
+ val nullInt = null.asInstanceOf[Int]
+ map.insert(nullInt, 8)
+ assert(map.size === 4)
+ assert(map.iterator.toSet == Set[(Int, Seq[Int])](
+ (1, Seq[Int](5)),
+ (2, Seq[Int](6)),
+ (3, Seq[Int](7)),
+ (nullInt, Seq[Int](8))
+ ))
+
+ // Null values
+ map.insert(4, nullInt)
+ map.insert(nullInt, nullInt)
+ assert(map.size === 5)
+ val result = map.iterator.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
+ assert(result == Set[(Int, Set[Int])](
+ (1, Set[Int](5)),
+ (2, Set[Int](6)),
+ (3, Set[Int](7)),
+ (4, Set[Int](nullInt)),
+ (nullInt, Set[Int](nullInt, 8))
+ ))
+ }
+
+ test("simple aggregator") {
+ // reduceByKey
+ val rdd = sc.parallelize(1 to 10).map(i => (i%2, 1))
+ val result1 = rdd.reduceByKey(_+_).collect()
+ assert(result1.toSet == Set[(Int, Int)]((0, 5), (1, 5)))
+
+ // groupByKey
+ val result2 = rdd.groupByKey().collect()
+ assert(result2.toSet == Set[(Int, Seq[Int])]
+ ((0, ArrayBuffer[Int](1, 1, 1, 1, 1)), (1, ArrayBuffer[Int](1, 1, 1, 1, 1))))
+ }
+
+ test("simple cogroup") {
+ val rdd1 = sc.parallelize(1 to 4).map(i => (i, i))
+ val rdd2 = sc.parallelize(1 to 4).map(i => (i%2, i))
+ val result = rdd1.cogroup(rdd2).collect()
+
+ result.foreach { case (i, (seq1, seq2)) =>
+ i match {
+ case 0 => assert(seq1.toSet == Set[Int]() && seq2.toSet == Set[Int](2, 4))
+ case 1 => assert(seq1.toSet == Set[Int](1) && seq2.toSet == Set[Int](1, 3))
+ case 2 => assert(seq1.toSet == Set[Int](2) && seq2.toSet == Set[Int]())
+ case 3 => assert(seq1.toSet == Set[Int](3) && seq2.toSet == Set[Int]())
+ case 4 => assert(seq1.toSet == Set[Int](4) && seq2.toSet == Set[Int]())
+ }
+ }
+ }
+
+ test("spilling") {
+ // TODO: Figure out correct memory parameters to actually induce spilling
+ // System.setProperty("spark.shuffle.buffer.mb", "1")
+ // System.setProperty("spark.shuffle.buffer.fraction", "0.05")
+
+ // reduceByKey - should spill exactly 6 times
+ val rddA = sc.parallelize(0 until 10000).map(i => (i/2, i))
+ val resultA = rddA.reduceByKey(math.max(_, _)).collect()
+ assert(resultA.length == 5000)
+ resultA.foreach { case(k, v) =>
+ k match {
+ case 0 => assert(v == 1)
+ case 2500 => assert(v == 5001)
+ case 4999 => assert(v == 9999)
+ case _ =>
+ }
+ }
+
+ // groupByKey - should spill exactly 11 times
+ val rddB = sc.parallelize(0 until 10000).map(i => (i/4, i))
+ val resultB = rddB.groupByKey().collect()
+ assert(resultB.length == 2500)
+ resultB.foreach { case(i, seq) =>
+ i match {
+ case 0 => assert(seq.toSet == Set[Int](0, 1, 2, 3))
+ case 1250 => assert(seq.toSet == Set[Int](5000, 5001, 5002, 5003))
+ case 2499 => assert(seq.toSet == Set[Int](9996, 9997, 9998, 9999))
+ case _ =>
+ }
+ }
+
+ // cogroup - should spill exactly 7 times
+ val rddC1 = sc.parallelize(0 until 1000).map(i => (i, i))
+ val rddC2 = sc.parallelize(0 until 1000).map(i => (i%100, i))
+ val resultC = rddC1.cogroup(rddC2).collect()
+ assert(resultC.length == 1000)
+ resultC.foreach { case(i, (seq1, seq2)) =>
+ i match {
+ case 0 =>
+ assert(seq1.toSet == Set[Int](0))
+ assert(seq2.toSet == Set[Int](0, 100, 200, 300, 400, 500, 600, 700, 800, 900))
+ case 500 =>
+ assert(seq1.toSet == Set[Int](500))
+ assert(seq2.toSet == Set[Int]())
+ case 999 =>
+ assert(seq1.toSet == Set[Int](999))
+ assert(seq2.toSet == Set[Int]())
+ case _ =>
+ }
+ }
+ }
+
+ // TODO: Test memory allocation for multiple concurrently running tasks
+}