aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorerenavsarogullari <erenavsarogullari@gmail.com>2017-03-15 15:57:51 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2017-03-15 15:57:51 -0700
commit046b8d4aef00b0701cf7e4b99aeaf450cacb42fe (patch)
tree42736e59c3c3db5ec4a16501b5d91c904dc807be
parent54a3697f1fb562ef9ed8fed9caffc62b84763049 (diff)
downloadspark-046b8d4aef00b0701cf7e4b99aeaf450cacb42fe.tar.gz
spark-046b8d4aef00b0701cf7e4b99aeaf450cacb42fe.tar.bz2
spark-046b8d4aef00b0701cf7e4b99aeaf450cacb42fe.zip
[SPARK-18066][CORE][TESTS] Add Pool usage policies test coverage for FIFO & FAIR Schedulers
## What changes were proposed in this pull request? The following FIFO & FAIR Schedulers Pool usage cases need to have unit test coverage : - FIFO Scheduler just uses **root pool** so even if `spark.scheduler.pool` property is set, related pool is not created and `TaskSetManagers` are added to **root pool**. - FAIR Scheduler uses `default pool` when `spark.scheduler.pool` property is not set. This can be happened when - `Properties` object is **null**, - `Properties` object is **empty**(`new Properties()`), - **default pool** is set(`spark.scheduler.pool=default`). - FAIR Scheduler creates a **new pool** with **default values** when `spark.scheduler.pool` property points a **non-existent** pool. This can be happened when **scheduler allocation file** is not set or it does not contain related pool. ## How was this patch tested? New Unit tests are added. Author: erenavsarogullari <erenavsarogullari@gmail.com> Closes #15604 from erenavsarogullari/SPARK-18066.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala97
2 files changed, 96 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index e53c4fb5b4..20cedaf060 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -191,8 +191,11 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
- logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
- poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
+ logWarning(s"A job was submitted with scheduler pool $poolName, which has not been " +
+ "configured. This can happen when the file that pools are read from isn't set, or " +
+ s"when that file doesn't contain $poolName. Created $poolName with default " +
+ s"configuration (schedulingMode: $DEFAULT_SCHEDULING_MODE, " +
+ s"minShare: $DEFAULT_MINIMUM_SHARE, weight: $DEFAULT_WEIGHT)")
}
}
parentPool.addSchedulable(manager)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
index 520736ab64..cddff3dd35 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
@@ -31,6 +31,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val LOCAL = "local"
val APP_NAME = "PoolSuite"
val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file"
+ val TEST_POOL = "testPool"
def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl)
: TaskSetManager = {
@@ -40,7 +41,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 0)
}
- def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int) {
+ def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int): Unit = {
val taskSetQueue = rootPool.getSortedTaskSetQueue
val nextTaskSetToSchedule =
taskSetQueue.find(t => (t.runningTasks + t.tasksSuccessful) < t.numTasks)
@@ -201,12 +202,96 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
verifyPool(rootPool, "pool_with_surrounded_whitespace", 3, 2, FAIR)
}
+ /**
+ * spark.scheduler.pool property should be ignored for the FIFO scheduler,
+ * because pools are only needed for fair scheduling.
+ */
+ test("FIFO scheduler uses root pool and not spark.scheduler.pool property") {
+ sc = new SparkContext("local", "PoolSuite")
+ val taskScheduler = new TaskSchedulerImpl(sc)
+
+ val rootPool = new Pool("", SchedulingMode.FIFO, initMinShare = 0, initWeight = 0)
+ val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
+
+ val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler)
+ val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler)
+
+ val properties = new Properties()
+ properties.setProperty("spark.scheduler.pool", TEST_POOL)
+
+ // When FIFO Scheduler is used and task sets are submitted, they should be added to
+ // the root pool, and no additional pools should be created
+ // (even though there's a configured default pool).
+ schedulableBuilder.addTaskSetManager(taskSetManager0, properties)
+ schedulableBuilder.addTaskSetManager(taskSetManager1, properties)
+
+ assert(rootPool.getSchedulableByName(TEST_POOL) === null)
+ assert(rootPool.schedulableQueue.size === 2)
+ assert(rootPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0)
+ assert(rootPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1)
+ }
+
+ test("FAIR Scheduler uses default pool when spark.scheduler.pool property is not set") {
+ sc = new SparkContext("local", "PoolSuite")
+ val taskScheduler = new TaskSchedulerImpl(sc)
+
+ val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0)
+ val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+ schedulableBuilder.buildPools()
+
+ // Submit a new task set manager with pool properties set to null. This should result
+ // in the task set manager getting added to the default pool.
+ val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler)
+ schedulableBuilder.addTaskSetManager(taskSetManager0, null)
+
+ val defaultPool = rootPool.getSchedulableByName(schedulableBuilder.DEFAULT_POOL_NAME)
+ assert(defaultPool !== null)
+ assert(defaultPool.schedulableQueue.size === 1)
+ assert(defaultPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0)
+
+ // When a task set manager is submitted with spark.scheduler.pool unset, it should be added to
+ // the default pool (as above).
+ val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler)
+ schedulableBuilder.addTaskSetManager(taskSetManager1, new Properties())
+
+ assert(defaultPool.schedulableQueue.size === 2)
+ assert(defaultPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1)
+ }
+
+ test("FAIR Scheduler creates a new pool when spark.scheduler.pool property points to " +
+ "a non-existent pool") {
+ sc = new SparkContext("local", "PoolSuite")
+ val taskScheduler = new TaskSchedulerImpl(sc)
+
+ val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0)
+ val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+ schedulableBuilder.buildPools()
+
+ assert(rootPool.getSchedulableByName(TEST_POOL) === null)
+
+ val taskSetManager = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler)
+
+ val properties = new Properties()
+ properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, TEST_POOL)
+
+ // The fair scheduler should create a new pool with default values when spark.scheduler.pool
+ // points to a pool that doesn't exist yet (this can happen when the file that pools are read
+ // from isn't set, or when that file doesn't contain the pool name specified
+ // by spark.scheduler.pool).
+ schedulableBuilder.addTaskSetManager(taskSetManager, properties)
+
+ verifyPool(rootPool, TEST_POOL, schedulableBuilder.DEFAULT_MINIMUM_SHARE,
+ schedulableBuilder.DEFAULT_WEIGHT, schedulableBuilder.DEFAULT_SCHEDULING_MODE)
+ val testPool = rootPool.getSchedulableByName(TEST_POOL)
+ assert(testPool.getSchedulableByName(taskSetManager.name) === taskSetManager)
+ }
+
private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int,
expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = {
- assert(rootPool.getSchedulableByName(poolName) != null)
- assert(rootPool.getSchedulableByName(poolName).minShare === expectedInitMinShare)
- assert(rootPool.getSchedulableByName(poolName).weight === expectedInitWeight)
- assert(rootPool.getSchedulableByName(poolName).schedulingMode === expectedSchedulingMode)
+ val selectedPool = rootPool.getSchedulableByName(poolName)
+ assert(selectedPool !== null)
+ assert(selectedPool.minShare === expectedInitMinShare)
+ assert(selectedPool.weight === expectedInitWeight)
+ assert(selectedPool.schedulingMode === expectedSchedulingMode)
}
-
}