aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala256
1 files changed, 146 insertions, 110 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 4d7016d1e5..30780a0da7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -120,7 +120,7 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
CustomRecoveryModeFactory.instantiationAttempts should be > instantiationAttempts
}
- test("Master & worker web ui available") {
+ test("master/worker web ui available") {
implicit val formats = org.json4s.DefaultFormats
val conf = new SparkConf()
val localCluster = new LocalSparkCluster(2, 2, 512, conf)
@@ -144,174 +144,202 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
}
test("basic scheduling - spread out") {
- testBasicScheduling(spreadOut = true)
+ basicScheduling(spreadOut = true)
}
test("basic scheduling - no spread out") {
- testBasicScheduling(spreadOut = false)
+ basicScheduling(spreadOut = false)
}
test("scheduling with max cores - spread out") {
- testSchedulingWithMaxCores(spreadOut = true)
+ schedulingWithMaxCores(spreadOut = true)
}
test("scheduling with max cores - no spread out") {
- testSchedulingWithMaxCores(spreadOut = false)
+ schedulingWithMaxCores(spreadOut = false)
}
test("scheduling with cores per executor - spread out") {
- testSchedulingWithCoresPerExecutor(spreadOut = true)
+ schedulingWithCoresPerExecutor(spreadOut = true)
}
test("scheduling with cores per executor - no spread out") {
- testSchedulingWithCoresPerExecutor(spreadOut = false)
+ schedulingWithCoresPerExecutor(spreadOut = false)
}
test("scheduling with cores per executor AND max cores - spread out") {
- testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = true)
+ schedulingWithCoresPerExecutorAndMaxCores(spreadOut = true)
}
test("scheduling with cores per executor AND max cores - no spread out") {
- testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = false)
+ schedulingWithCoresPerExecutorAndMaxCores(spreadOut = false)
}
- private def testBasicScheduling(spreadOut: Boolean): Unit = {
+ test("scheduling with executor limit - spread out") {
+ schedulingWithExecutorLimit(spreadOut = true)
+ }
+
+ test("scheduling with executor limit - no spread out") {
+ schedulingWithExecutorLimit(spreadOut = false)
+ }
+
+ test("scheduling with executor limit AND max cores - spread out") {
+ schedulingWithExecutorLimitAndMaxCores(spreadOut = true)
+ }
+
+ test("scheduling with executor limit AND max cores - no spread out") {
+ schedulingWithExecutorLimitAndMaxCores(spreadOut = false)
+ }
+
+ test("scheduling with executor limit AND cores per executor - spread out") {
+ schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut = true)
+ }
+
+ test("scheduling with executor limit AND cores per executor - no spread out") {
+ schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut = false)
+ }
+
+ test("scheduling with executor limit AND cores per executor AND max cores - spread out") {
+ schedulingWithEverything(spreadOut = true)
+ }
+
+ test("scheduling with executor limit AND cores per executor AND max cores - no spread out") {
+ schedulingWithEverything(spreadOut = false)
+ }
+
+ private def basicScheduling(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo = makeAppInfo(1024)
- val workerInfo = makeWorkerInfo(4096, 10)
- val workerInfos = Array(workerInfo, workerInfo, workerInfo)
- val scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
- assert(scheduledCores(0) === 10)
- assert(scheduledCores(1) === 10)
- assert(scheduledCores(2) === 10)
+ val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ assert(scheduledCores === Array(10, 10, 10))
}
- private def testSchedulingWithMaxCores(spreadOut: Boolean): Unit = {
+ private def schedulingWithMaxCores(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo1 = makeAppInfo(1024, maxCores = Some(8))
val appInfo2 = makeAppInfo(1024, maxCores = Some(16))
- val workerInfo = makeWorkerInfo(4096, 10)
- val workerInfos = Array(workerInfo, workerInfo, workerInfo)
- var scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
- // With spreading out, each worker should be assigned a few cores
- if (spreadOut) {
- assert(scheduledCores(0) === 3)
- assert(scheduledCores(1) === 3)
- assert(scheduledCores(2) === 2)
- } else {
- // Without spreading out, the cores should be concentrated on the first worker
- assert(scheduledCores(0) === 8)
- assert(scheduledCores(1) === 0)
- assert(scheduledCores(2) === 0)
- }
- // Now test the same thing with max cores > cores per worker
- scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
if (spreadOut) {
- assert(scheduledCores(0) === 6)
- assert(scheduledCores(1) === 5)
- assert(scheduledCores(2) === 5)
+ assert(scheduledCores1 === Array(3, 3, 2))
+ assert(scheduledCores2 === Array(6, 5, 5))
} else {
- // Without spreading out, the first worker should be fully booked,
- // and the leftover cores should spill over to the second worker only.
- assert(scheduledCores(0) === 10)
- assert(scheduledCores(1) === 6)
- assert(scheduledCores(2) === 0)
+ assert(scheduledCores1 === Array(8, 0, 0))
+ assert(scheduledCores2 === Array(10, 6, 0))
}
}
- private def testSchedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
+ private def schedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo1 = makeAppInfo(1024, coresPerExecutor = Some(2))
val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2))
val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3))
- val workerInfo = makeWorkerInfo(4096, 10)
- val workerInfos = Array(workerInfo, workerInfo, workerInfo)
- // Each worker should end up with 4 executors with 2 cores each
- // This should be 4 because of the memory restriction on each worker
- var scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
- assert(scheduledCores(0) === 8)
- assert(scheduledCores(1) === 8)
- assert(scheduledCores(2) === 8)
- // Now test the same thing without running into the worker memory limit
- // Each worker should now end up with 5 executors with 2 cores each
- scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
- assert(scheduledCores(0) === 10)
- assert(scheduledCores(1) === 10)
- assert(scheduledCores(2) === 10)
- // Now test the same thing with a cores per executor that 10 is not divisible by
- scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
- assert(scheduledCores(0) === 9)
- assert(scheduledCores(1) === 9)
- assert(scheduledCores(2) === 9)
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
+ assert(scheduledCores1 === Array(8, 8, 8)) // 4 * 2 because of memory limits
+ assert(scheduledCores2 === Array(10, 10, 10)) // 5 * 2
+ assert(scheduledCores3 === Array(9, 9, 9)) // 3 * 3
}
// Sorry for the long method name!
- private def testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = {
+ private def schedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo1 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(4))
val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(20))
val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3), maxCores = Some(20))
- val workerInfo = makeWorkerInfo(4096, 10)
- val workerInfos = Array(workerInfo, workerInfo, workerInfo)
- // We should only launch two executors, each with exactly 2 cores
- var scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
+ if (spreadOut) {
+ assert(scheduledCores1 === Array(2, 2, 0))
+ assert(scheduledCores2 === Array(8, 6, 6))
+ assert(scheduledCores3 === Array(6, 6, 6))
+ } else {
+ assert(scheduledCores1 === Array(4, 0, 0))
+ assert(scheduledCores2 === Array(10, 10, 0))
+ assert(scheduledCores3 === Array(9, 9, 0))
+ }
+ }
+
+ private def schedulingWithExecutorLimit(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo = makeAppInfo(256)
+ appInfo.executorLimit = 0
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.executorLimit = 2
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.executorLimit = 5
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ assert(scheduledCores1 === Array(0, 0, 0))
+ assert(scheduledCores2 === Array(10, 10, 0))
+ assert(scheduledCores3 === Array(10, 10, 10))
+ }
+
+ private def schedulingWithExecutorLimitAndMaxCores(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo = makeAppInfo(256, maxCores = Some(16))
+ appInfo.executorLimit = 0
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.executorLimit = 2
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.executorLimit = 5
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ assert(scheduledCores1 === Array(0, 0, 0))
if (spreadOut) {
- assert(scheduledCores(0) === 2)
- assert(scheduledCores(1) === 2)
- assert(scheduledCores(2) === 0)
+ assert(scheduledCores2 === Array(8, 8, 0))
+ assert(scheduledCores3 === Array(6, 5, 5))
} else {
- assert(scheduledCores(0) === 4)
- assert(scheduledCores(1) === 0)
- assert(scheduledCores(2) === 0)
+ assert(scheduledCores2 === Array(10, 6, 0))
+ assert(scheduledCores3 === Array(10, 6, 0))
}
- // Test max cores > number of cores per worker
- scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
+ }
+
+ private def schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo = makeAppInfo(256, coresPerExecutor = Some(4))
+ appInfo.executorLimit = 0
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.executorLimit = 2
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.executorLimit = 5
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ assert(scheduledCores1 === Array(0, 0, 0))
if (spreadOut) {
- assert(scheduledCores(0) === 8)
- assert(scheduledCores(1) === 6)
- assert(scheduledCores(2) === 6)
+ assert(scheduledCores2 === Array(4, 4, 0))
} else {
- assert(scheduledCores(0) === 10)
- assert(scheduledCores(1) === 10)
- assert(scheduledCores(2) === 0)
+ assert(scheduledCores2 === Array(8, 0, 0))
}
- // Test max cores > number of cores per worker AND
- // a cores per executor that is 10 is not divisible by
- scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
+ assert(scheduledCores3 === Array(8, 8, 4))
+ }
+
+ // Everything being: executor limit + cores per executor + max cores
+ private def schedulingWithEverything(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo = makeAppInfo(256, coresPerExecutor = Some(4), maxCores = Some(18))
+ appInfo.executorLimit = 0
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.executorLimit = 2
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.executorLimit = 5
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ assert(scheduledCores1 === Array(0, 0, 0))
if (spreadOut) {
- assert(scheduledCores(0) === 6)
- assert(scheduledCores(1) === 6)
- assert(scheduledCores(2) === 6)
+ assert(scheduledCores2 === Array(4, 4, 0))
+ assert(scheduledCores3 === Array(8, 4, 4))
} else {
- assert(scheduledCores(0) === 9)
- assert(scheduledCores(1) === 9)
- assert(scheduledCores(2) === 0)
+ assert(scheduledCores2 === Array(8, 0, 0))
+ assert(scheduledCores3 === Array(8, 8, 0))
}
}
- // ===============================
- // | Utility methods for testing |
- // ===============================
+ // ==========================================
+ // | Utility methods and fields for testing |
+ // ==========================================
private val _scheduleExecutorsOnWorkers = PrivateMethod[Array[Int]]('scheduleExecutorsOnWorkers)
+ private val workerInfo = makeWorkerInfo(4096, 10)
+ private val workerInfos = Array(workerInfo, workerInfo, workerInfo)
private def makeMaster(conf: SparkConf = new SparkConf): Master = {
val securityMgr = new SecurityManager(conf)
@@ -335,4 +363,12 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, "address")
}
+ private def scheduleExecutorsOnWorkers(
+ master: Master,
+ appInfo: ApplicationInfo,
+ workerInfos: Array[WorkerInfo],
+ spreadOut: Boolean): Array[Int] = {
+ master.invokePrivate(_scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut))
+ }
+
}