diff options
author | Charles Reiss <charles@eecs.berkeley.edu> | 2012-11-27 12:50:40 -0800 |
---|---|---|
committer | Charles Reiss <charles@eecs.berkeley.edu> | 2012-11-27 16:05:36 -0800 |
commit | 5fa868b98bfd57b4feeed127ea68635f4fd909f9 (patch) | |
tree | fd6c300ff16e7813a30d780b00be2db0029ef64b /core/src | |
parent | cd16eab0dbad9d6186f064e5d95259562eb51628 (diff) | |
download | spark-5fa868b98bfd57b4feeed127ea68635f4fd909f9.tar.gz spark-5fa868b98bfd57b4feeed127ea68635f4fd909f9.tar.bz2 spark-5fa868b98bfd57b4feeed127ea68635f4fd909f9.zip |
Tests for MapOutputTracker.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/test/scala/spark/MapOutputTrackerSuite.scala | 51 |
1 files changed, 51 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala index 4e9717d871..529445e861 100644 --- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala @@ -2,6 +2,10 @@ package spark import org.scalatest.FunSuite +import akka.actor._ +import spark.scheduler.MapStatus +import spark.storage.BlockManagerId + class MapOutputTrackerSuite extends FunSuite { test("compressSize") { assert(MapOutputTracker.compressSize(0L) === 0) @@ -22,4 +26,51 @@ class MapOutputTrackerSuite extends FunSuite { "size " + size + " decompressed to " + size2 + ", which is out of range") } } + + test("master start and stop") { + val actorSystem = ActorSystem("test") + val tracker = new MapOutputTracker(actorSystem, true) + tracker.stop() + } + + test("master register and fetch") { + val actorSystem = ActorSystem("test") + val tracker = new MapOutputTracker(actorSystem, true) + tracker.registerShuffle(10, 2) + val compressedSize1000 = MapOutputTracker.compressSize(1000L) + val compressedSize10000 = MapOutputTracker.compressSize(10000L) + val size1000 = MapOutputTracker.decompressSize(compressedSize1000) + val size10000 = MapOutputTracker.decompressSize(compressedSize10000) + tracker.registerMapOutput(10, 0, new MapStatus(new BlockManagerId("hostA", 1000), + Array(compressedSize1000, compressedSize10000))) + tracker.registerMapOutput(10, 1, new MapStatus(new BlockManagerId("hostB", 1000), + Array(compressedSize10000, compressedSize1000))) + val statuses = tracker.getServerStatuses(10, 0) + assert(statuses.toSeq === Seq((new BlockManagerId("hostA", 1000), size1000), + (new BlockManagerId("hostB", 1000), size10000))) + tracker.stop() + } + + test("master register and unregister and fetch") { + val actorSystem = ActorSystem("test") + val tracker = new MapOutputTracker(actorSystem, true) + tracker.registerShuffle(10, 2) + val compressedSize1000 = MapOutputTracker.compressSize(1000L) + val compressedSize10000 = MapOutputTracker.compressSize(10000L) + val size1000 = MapOutputTracker.decompressSize(compressedSize1000) + val size10000 = MapOutputTracker.decompressSize(compressedSize10000) + tracker.registerMapOutput(10, 0, new MapStatus(new BlockManagerId("hostA", 1000), + Array(compressedSize1000, compressedSize1000, compressedSize1000))) + tracker.registerMapOutput(10, 1, new MapStatus(new BlockManagerId("hostB", 1000), + Array(compressedSize10000, compressedSize1000, compressedSize1000))) + + // As if we had two simulatenous fetch failures + tracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000)) + tracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000)) + + // The remaining reduce task might try to grab the output dispite the shuffle failure; + // this should cause it to fail, and the scheduler will ignore the failure due to the + // stage already being aborted. + intercept[Exception] { tracker.getServerStatuses(10, 1) } + } } |