aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala58
1 files changed, 52 insertions, 6 deletions
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 8efa072a97..a5bd72eb0a 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark
import scala.concurrent.Await
import akka.actor._
+import akka.testkit.TestActorRef
import org.scalatest.FunSuite
import org.apache.spark.scheduler.MapStatus
@@ -51,14 +52,16 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master start and stop") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster(conf)
- tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
+ tracker.trackerActor =
+ actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
tracker.stop()
}
test("master register and fetch") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster(conf)
- tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
+ tracker.trackerActor =
+ actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
@@ -77,7 +80,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master register and unregister and fetch") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster(conf)
- tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
+ tracker.trackerActor =
+ actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
@@ -100,11 +104,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val hostname = "localhost"
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf,
securityManager = new SecurityManager(conf))
- System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
+
+ // Will be cleared by LocalSparkContext
+ System.setProperty("spark.driver.port", boundPort.toString)
val masterTracker = new MapOutputTrackerMaster(conf)
masterTracker.trackerActor = actorSystem.actorOf(
- Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
+ Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf,
securityManager = new SecurityManager(conf))
@@ -126,7 +132,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
- Seq((BlockManagerId("a", "hostA", 1000, 0), size1000)))
+ Seq((BlockManagerId("a", "hostA", 1000, 0), size1000)))
masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0))
masterTracker.incrementEpoch()
@@ -136,4 +142,44 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
// failure should be cached
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
}
+
+ test("remote fetch below akka frame size") {
+ val newConf = new SparkConf
+ newConf.set("spark.akka.frameSize", "1")
+ newConf.set("spark.akka.askTimeout", "1") // Fail fast
+
+ val masterTracker = new MapOutputTrackerMaster(conf)
+ val actorSystem = ActorSystem("test")
+ val actorRef = TestActorRef[MapOutputTrackerMasterActor](
+ new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
+ val masterActor = actorRef.underlyingActor
+
+ // Frame size should be ~123B, and no exception should be thrown
+ masterTracker.registerShuffle(10, 1)
+ masterTracker.registerMapOutput(10, 0, new MapStatus(
+ BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0)))
+ masterActor.receive(GetMapOutputStatuses(10))
+ }
+
+ test("remote fetch exceeds akka frame size") {
+ val newConf = new SparkConf
+ newConf.set("spark.akka.frameSize", "1")
+ newConf.set("spark.akka.askTimeout", "1") // Fail fast
+
+ val masterTracker = new MapOutputTrackerMaster(conf)
+ val actorSystem = ActorSystem("test")
+ val actorRef = TestActorRef[MapOutputTrackerMasterActor](
+ new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
+ val masterActor = actorRef.underlyingActor
+
+ // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.
+ // Note that the size is hand-selected here because map output statuses are compressed before
+ // being sent.
+ masterTracker.registerShuffle(20, 100)
+ (0 until 100).foreach { i =>
+ masterTracker.registerMapOutput(20, i, new MapStatus(
+ BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0)))
+ }
+ intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
+ }
}