From 1cf19760d61a5a17bd175a906d34a2940141b76d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sun, 26 Jul 2015 13:03:13 -0700 Subject: [SPARK-9352] [SPARK-9353] Add tests for standalone scheduling code This also fixes a small issue in the standalone Master that was uncovered by the new tests. For more detail, read the description of SPARK-9353. Author: Andrew Or Closes #7668 from andrewor14/standalone-scheduling-tests and squashes the following commits: d852faf [Andrew Or] Add tests + fix scheduling with memory limits --- .../org/apache/spark/deploy/master/Master.scala | 8 +- .../apache/spark/deploy/master/MasterSuite.scala | 199 ++++++++++++++++++++- 2 files changed, 202 insertions(+), 5 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 029f94d102..51b3f0dead 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -559,7 +559,7 @@ private[master] class Master( * allocated at a time, 12 cores from each worker would be assigned to each executor. * Since 12 < 16, no executors would launch [SPARK-8881]. */ - private[master] def scheduleExecutorsOnWorkers( + private def scheduleExecutorsOnWorkers( app: ApplicationInfo, usableWorkers: Array[WorkerInfo], spreadOutApps: Boolean): Array[Int] = { @@ -585,7 +585,11 @@ private[master] class Master( while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) { coresToAssign -= coresPerExecutor assignedCores(pos) += coresPerExecutor - assignedMemory(pos) += memoryPerExecutor + // If cores per executor is not set, we are assigning 1 core at a time + // without actually meaning to launch 1 executor for each core assigned + if (app.desc.coresPerExecutor.isDefined) { + assignedMemory(pos) += memoryPerExecutor + } // Spreading out an application means spreading out its executors across as // many workers as possible. If we are not spreading out, then we should keep diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index a8fbaf1d9d..4d7016d1e5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -25,14 +25,15 @@ import scala.language.postfixOps import org.json4s._ import org.json4s.jackson.JsonMethods._ -import org.scalatest.Matchers +import org.scalatest.{Matchers, PrivateMethodTester} import org.scalatest.concurrent.Eventually import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory} -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy._ +import org.apache.spark.rpc.RpcEnv -class MasterSuite extends SparkFunSuite with Matchers with Eventually { +class MasterSuite extends SparkFunSuite with Matchers with Eventually with PrivateMethodTester { test("can use a custom recovery mode factory") { val conf = new SparkConf(loadDefaults = false) @@ -142,4 +143,196 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually { } } + test("basic scheduling - spread out") { + testBasicScheduling(spreadOut = true) + } + + test("basic scheduling - no spread out") { + testBasicScheduling(spreadOut = false) + } + + test("scheduling with max cores - spread out") { + testSchedulingWithMaxCores(spreadOut = true) + } + + test("scheduling with max cores - no spread out") { + testSchedulingWithMaxCores(spreadOut = false) + } + + test("scheduling with cores per executor - spread out") { + testSchedulingWithCoresPerExecutor(spreadOut = true) + } + + test("scheduling with cores per executor - no spread out") { + testSchedulingWithCoresPerExecutor(spreadOut = false) + } + + test("scheduling with cores per executor AND max cores - spread out") { + testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = true) + } + + test("scheduling with cores per executor AND max cores - no spread out") { + testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = false) + } + + private def testBasicScheduling(spreadOut: Boolean): Unit = { + val master = makeMaster() + val appInfo = makeAppInfo(1024) + val workerInfo = makeWorkerInfo(4096, 10) + val workerInfos = Array(workerInfo, workerInfo, workerInfo) + val scheduledCores = master.invokePrivate( + _scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut)) + assert(scheduledCores.length === 3) + assert(scheduledCores(0) === 10) + assert(scheduledCores(1) === 10) + assert(scheduledCores(2) === 10) + } + + private def testSchedulingWithMaxCores(spreadOut: Boolean): Unit = { + val master = makeMaster() + val appInfo1 = makeAppInfo(1024, maxCores = Some(8)) + val appInfo2 = makeAppInfo(1024, maxCores = Some(16)) + val workerInfo = makeWorkerInfo(4096, 10) + val workerInfos = Array(workerInfo, workerInfo, workerInfo) + var scheduledCores = master.invokePrivate( + _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut)) + assert(scheduledCores.length === 3) + // With spreading out, each worker should be assigned a few cores + if (spreadOut) { + assert(scheduledCores(0) === 3) + assert(scheduledCores(1) === 3) + assert(scheduledCores(2) === 2) + } else { + // Without spreading out, the cores should be concentrated on the first worker + assert(scheduledCores(0) === 8) + assert(scheduledCores(1) === 0) + assert(scheduledCores(2) === 0) + } + // Now test the same thing with max cores > cores per worker + scheduledCores = master.invokePrivate( + _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut)) + assert(scheduledCores.length === 3) + if (spreadOut) { + assert(scheduledCores(0) === 6) + assert(scheduledCores(1) === 5) + assert(scheduledCores(2) === 5) + } else { + // Without spreading out, the first worker should be fully booked, + // and the leftover cores should spill over to the second worker only. + assert(scheduledCores(0) === 10) + assert(scheduledCores(1) === 6) + assert(scheduledCores(2) === 0) + } + } + + private def testSchedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = { + val master = makeMaster() + val appInfo1 = makeAppInfo(1024, coresPerExecutor = Some(2)) + val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2)) + val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3)) + val workerInfo = makeWorkerInfo(4096, 10) + val workerInfos = Array(workerInfo, workerInfo, workerInfo) + // Each worker should end up with 4 executors with 2 cores each + // This should be 4 because of the memory restriction on each worker + var scheduledCores = master.invokePrivate( + _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut)) + assert(scheduledCores.length === 3) + assert(scheduledCores(0) === 8) + assert(scheduledCores(1) === 8) + assert(scheduledCores(2) === 8) + // Now test the same thing without running into the worker memory limit + // Each worker should now end up with 5 executors with 2 cores each + scheduledCores = master.invokePrivate( + _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut)) + assert(scheduledCores.length === 3) + assert(scheduledCores(0) === 10) + assert(scheduledCores(1) === 10) + assert(scheduledCores(2) === 10) + // Now test the same thing with a cores per executor that 10 is not divisible by + scheduledCores = master.invokePrivate( + _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut)) + assert(scheduledCores.length === 3) + assert(scheduledCores(0) === 9) + assert(scheduledCores(1) === 9) + assert(scheduledCores(2) === 9) + } + + // Sorry for the long method name! + private def testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = { + val master = makeMaster() + val appInfo1 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(4)) + val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(20)) + val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3), maxCores = Some(20)) + val workerInfo = makeWorkerInfo(4096, 10) + val workerInfos = Array(workerInfo, workerInfo, workerInfo) + // We should only launch two executors, each with exactly 2 cores + var scheduledCores = master.invokePrivate( + _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut)) + assert(scheduledCores.length === 3) + if (spreadOut) { + assert(scheduledCores(0) === 2) + assert(scheduledCores(1) === 2) + assert(scheduledCores(2) === 0) + } else { + assert(scheduledCores(0) === 4) + assert(scheduledCores(1) === 0) + assert(scheduledCores(2) === 0) + } + // Test max cores > number of cores per worker + scheduledCores = master.invokePrivate( + _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut)) + assert(scheduledCores.length === 3) + if (spreadOut) { + assert(scheduledCores(0) === 8) + assert(scheduledCores(1) === 6) + assert(scheduledCores(2) === 6) + } else { + assert(scheduledCores(0) === 10) + assert(scheduledCores(1) === 10) + assert(scheduledCores(2) === 0) + } + // Test max cores > number of cores per worker AND + // a cores per executor that is 10 is not divisible by + scheduledCores = master.invokePrivate( + _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut)) + assert(scheduledCores.length === 3) + if (spreadOut) { + assert(scheduledCores(0) === 6) + assert(scheduledCores(1) === 6) + assert(scheduledCores(2) === 6) + } else { + assert(scheduledCores(0) === 9) + assert(scheduledCores(1) === 9) + assert(scheduledCores(2) === 0) + } + } + + // =============================== + // | Utility methods for testing | + // =============================== + + private val _scheduleExecutorsOnWorkers = PrivateMethod[Array[Int]]('scheduleExecutorsOnWorkers) + + private def makeMaster(conf: SparkConf = new SparkConf): Master = { + val securityMgr = new SecurityManager(conf) + val rpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 7077, conf, securityMgr) + val master = new Master(rpcEnv, rpcEnv.address, 8080, securityMgr, conf) + master + } + + private def makeAppInfo( + memoryPerExecutorMb: Int, + coresPerExecutor: Option[Int] = None, + maxCores: Option[Int] = None): ApplicationInfo = { + val desc = new ApplicationDescription( + "test", maxCores, memoryPerExecutorMb, null, "", None, None, coresPerExecutor) + val appId = System.currentTimeMillis.toString + new ApplicationInfo(0, appId, desc, new Date, null, Int.MaxValue) + } + + private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = { + val workerId = System.currentTimeMillis.toString + new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, "address") + } + } -- cgit v1.2.3