aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-07-26 13:03:13 -0700
committerAndrew Or <andrew@databricks.com>2015-07-26 13:03:13 -0700
commit1cf19760d61a5a17bd175a906d34a2940141b76d (patch)
tree8b5a40e04c067f72f1946791d01712618050f540 /core
parentfb5d43fb2529d78d55f1fe8d365191c946153640 (diff)
downloadspark-1cf19760d61a5a17bd175a906d34a2940141b76d.tar.gz
spark-1cf19760d61a5a17bd175a906d34a2940141b76d.tar.bz2
spark-1cf19760d61a5a17bd175a906d34a2940141b76d.zip
[SPARK-9352] [SPARK-9353] Add tests for standalone scheduling code
This also fixes a small issue in the standalone Master that was uncovered by the new tests. For more detail, read the description of SPARK-9353. Author: Andrew Or <andrew@databricks.com> Closes #7668 from andrewor14/standalone-scheduling-tests and squashes the following commits: d852faf [Andrew Or] Add tests + fix scheduling with memory limits
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala199
2 files changed, 202 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 029f94d102..51b3f0dead 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -559,7 +559,7 @@ private[master] class Master(
* allocated at a time, 12 cores from each worker would be assigned to each executor.
* Since 12 < 16, no executors would launch [SPARK-8881].
*/
- private[master] def scheduleExecutorsOnWorkers(
+ private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
@@ -585,7 +585,11 @@ private[master] class Master(
while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) {
coresToAssign -= coresPerExecutor
assignedCores(pos) += coresPerExecutor
- assignedMemory(pos) += memoryPerExecutor
+ // If cores per executor is not set, we are assigning 1 core at a time
+ // without actually meaning to launch 1 executor for each core assigned
+ if (app.desc.coresPerExecutor.isDefined) {
+ assignedMemory(pos) += memoryPerExecutor
+ }
// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index a8fbaf1d9d..4d7016d1e5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -25,14 +25,15 @@ import scala.language.postfixOps
import org.json4s._
import org.json4s.jackson.JsonMethods._
-import org.scalatest.Matchers
+import org.scalatest.{Matchers, PrivateMethodTester}
import org.scalatest.concurrent.Eventually
import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy._
+import org.apache.spark.rpc.RpcEnv
-class MasterSuite extends SparkFunSuite with Matchers with Eventually {
+class MasterSuite extends SparkFunSuite with Matchers with Eventually with PrivateMethodTester {
test("can use a custom recovery mode factory") {
val conf = new SparkConf(loadDefaults = false)
@@ -142,4 +143,196 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually {
}
}
+ test("basic scheduling - spread out") {
+ testBasicScheduling(spreadOut = true)
+ }
+
+ test("basic scheduling - no spread out") {
+ testBasicScheduling(spreadOut = false)
+ }
+
+ test("scheduling with max cores - spread out") {
+ testSchedulingWithMaxCores(spreadOut = true)
+ }
+
+ test("scheduling with max cores - no spread out") {
+ testSchedulingWithMaxCores(spreadOut = false)
+ }
+
+ test("scheduling with cores per executor - spread out") {
+ testSchedulingWithCoresPerExecutor(spreadOut = true)
+ }
+
+ test("scheduling with cores per executor - no spread out") {
+ testSchedulingWithCoresPerExecutor(spreadOut = false)
+ }
+
+ test("scheduling with cores per executor AND max cores - spread out") {
+ testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = true)
+ }
+
+ test("scheduling with cores per executor AND max cores - no spread out") {
+ testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = false)
+ }
+
+ private def testBasicScheduling(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo = makeAppInfo(1024)
+ val workerInfo = makeWorkerInfo(4096, 10)
+ val workerInfos = Array(workerInfo, workerInfo, workerInfo)
+ val scheduledCores = master.invokePrivate(
+ _scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut))
+ assert(scheduledCores.length === 3)
+ assert(scheduledCores(0) === 10)
+ assert(scheduledCores(1) === 10)
+ assert(scheduledCores(2) === 10)
+ }
+
+ private def testSchedulingWithMaxCores(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo1 = makeAppInfo(1024, maxCores = Some(8))
+ val appInfo2 = makeAppInfo(1024, maxCores = Some(16))
+ val workerInfo = makeWorkerInfo(4096, 10)
+ val workerInfos = Array(workerInfo, workerInfo, workerInfo)
+ var scheduledCores = master.invokePrivate(
+ _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
+ assert(scheduledCores.length === 3)
+ // With spreading out, each worker should be assigned a few cores
+ if (spreadOut) {
+ assert(scheduledCores(0) === 3)
+ assert(scheduledCores(1) === 3)
+ assert(scheduledCores(2) === 2)
+ } else {
+ // Without spreading out, the cores should be concentrated on the first worker
+ assert(scheduledCores(0) === 8)
+ assert(scheduledCores(1) === 0)
+ assert(scheduledCores(2) === 0)
+ }
+ // Now test the same thing with max cores > cores per worker
+ scheduledCores = master.invokePrivate(
+ _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
+ assert(scheduledCores.length === 3)
+ if (spreadOut) {
+ assert(scheduledCores(0) === 6)
+ assert(scheduledCores(1) === 5)
+ assert(scheduledCores(2) === 5)
+ } else {
+ // Without spreading out, the first worker should be fully booked,
+ // and the leftover cores should spill over to the second worker only.
+ assert(scheduledCores(0) === 10)
+ assert(scheduledCores(1) === 6)
+ assert(scheduledCores(2) === 0)
+ }
+ }
+
+ private def testSchedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo1 = makeAppInfo(1024, coresPerExecutor = Some(2))
+ val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2))
+ val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3))
+ val workerInfo = makeWorkerInfo(4096, 10)
+ val workerInfos = Array(workerInfo, workerInfo, workerInfo)
+ // Each worker should end up with 4 executors with 2 cores each
+ // This should be 4 because of the memory restriction on each worker
+ var scheduledCores = master.invokePrivate(
+ _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
+ assert(scheduledCores.length === 3)
+ assert(scheduledCores(0) === 8)
+ assert(scheduledCores(1) === 8)
+ assert(scheduledCores(2) === 8)
+ // Now test the same thing without running into the worker memory limit
+ // Each worker should now end up with 5 executors with 2 cores each
+ scheduledCores = master.invokePrivate(
+ _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
+ assert(scheduledCores.length === 3)
+ assert(scheduledCores(0) === 10)
+ assert(scheduledCores(1) === 10)
+ assert(scheduledCores(2) === 10)
+ // Now test the same thing with a cores per executor that 10 is not divisible by
+ scheduledCores = master.invokePrivate(
+ _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
+ assert(scheduledCores.length === 3)
+ assert(scheduledCores(0) === 9)
+ assert(scheduledCores(1) === 9)
+ assert(scheduledCores(2) === 9)
+ }
+
+ // Sorry for the long method name!
+ private def testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo1 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(4))
+ val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(20))
+ val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3), maxCores = Some(20))
+ val workerInfo = makeWorkerInfo(4096, 10)
+ val workerInfos = Array(workerInfo, workerInfo, workerInfo)
+ // We should only launch two executors, each with exactly 2 cores
+ var scheduledCores = master.invokePrivate(
+ _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
+ assert(scheduledCores.length === 3)
+ if (spreadOut) {
+ assert(scheduledCores(0) === 2)
+ assert(scheduledCores(1) === 2)
+ assert(scheduledCores(2) === 0)
+ } else {
+ assert(scheduledCores(0) === 4)
+ assert(scheduledCores(1) === 0)
+ assert(scheduledCores(2) === 0)
+ }
+ // Test max cores > number of cores per worker
+ scheduledCores = master.invokePrivate(
+ _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
+ assert(scheduledCores.length === 3)
+ if (spreadOut) {
+ assert(scheduledCores(0) === 8)
+ assert(scheduledCores(1) === 6)
+ assert(scheduledCores(2) === 6)
+ } else {
+ assert(scheduledCores(0) === 10)
+ assert(scheduledCores(1) === 10)
+ assert(scheduledCores(2) === 0)
+ }
+ // Test max cores > number of cores per worker AND
+ // a cores per executor that is 10 is not divisible by
+ scheduledCores = master.invokePrivate(
+ _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
+ assert(scheduledCores.length === 3)
+ if (spreadOut) {
+ assert(scheduledCores(0) === 6)
+ assert(scheduledCores(1) === 6)
+ assert(scheduledCores(2) === 6)
+ } else {
+ assert(scheduledCores(0) === 9)
+ assert(scheduledCores(1) === 9)
+ assert(scheduledCores(2) === 0)
+ }
+ }
+
+ // ===============================
+ // | Utility methods for testing |
+ // ===============================
+
+ private val _scheduleExecutorsOnWorkers = PrivateMethod[Array[Int]]('scheduleExecutorsOnWorkers)
+
+ private def makeMaster(conf: SparkConf = new SparkConf): Master = {
+ val securityMgr = new SecurityManager(conf)
+ val rpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 7077, conf, securityMgr)
+ val master = new Master(rpcEnv, rpcEnv.address, 8080, securityMgr, conf)
+ master
+ }
+
+ private def makeAppInfo(
+ memoryPerExecutorMb: Int,
+ coresPerExecutor: Option[Int] = None,
+ maxCores: Option[Int] = None): ApplicationInfo = {
+ val desc = new ApplicationDescription(
+ "test", maxCores, memoryPerExecutorMb, null, "", None, None, coresPerExecutor)
+ val appId = System.currentTimeMillis.toString
+ new ApplicationInfo(0, appId, desc, new Date, null, Int.MaxValue)
+ }
+
+ private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = {
+ val workerId = System.currentTimeMillis.toString
+ new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, "address")
+ }
+
}