aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala65
1 files changed, 33 insertions, 32 deletions
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index d3123e8540..abfcee7572 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.{FunSuite, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.util.ManualClock
/**
* Test add and remove behavior of ExecutorAllocationManager.
@@ -321,7 +322,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("starting/canceling add timer") {
sc = createSparkContext(2, 10)
- val clock = new TestClock(8888L)
+ val clock = new ManualClock(8888L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -330,21 +331,21 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
onSchedulerBacklogged(manager)
val firstAddTime = addTime(manager)
assert(firstAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
- clock.tick(100L)
+ clock.advance(100L)
onSchedulerBacklogged(manager)
assert(addTime(manager) === firstAddTime) // timer is already started
- clock.tick(200L)
+ clock.advance(200L)
onSchedulerBacklogged(manager)
assert(addTime(manager) === firstAddTime)
onSchedulerQueueEmpty(manager)
// Restart add timer
- clock.tick(1000L)
+ clock.advance(1000L)
assert(addTime(manager) === NOT_SET)
onSchedulerBacklogged(manager)
val secondAddTime = addTime(manager)
assert(secondAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
- clock.tick(100L)
+ clock.advance(100L)
onSchedulerBacklogged(manager)
assert(addTime(manager) === secondAddTime) // timer is already started
assert(addTime(manager) !== firstAddTime)
@@ -353,7 +354,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("starting/canceling remove timers") {
sc = createSparkContext(2, 10)
- val clock = new TestClock(14444L)
+ val clock = new ManualClock(14444L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -366,17 +367,17 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).contains("1"))
val firstRemoveTime = removeTimes(manager)("1")
assert(firstRemoveTime === clock.getTimeMillis + executorIdleTimeout * 1000)
- clock.tick(100L)
+ clock.advance(100L)
onExecutorIdle(manager, "1")
assert(removeTimes(manager)("1") === firstRemoveTime) // timer is already started
- clock.tick(200L)
+ clock.advance(200L)
onExecutorIdle(manager, "1")
assert(removeTimes(manager)("1") === firstRemoveTime)
- clock.tick(300L)
+ clock.advance(300L)
onExecutorIdle(manager, "2")
assert(removeTimes(manager)("2") !== firstRemoveTime) // different executor
assert(removeTimes(manager)("2") === clock.getTimeMillis + executorIdleTimeout * 1000)
- clock.tick(400L)
+ clock.advance(400L)
onExecutorIdle(manager, "3")
assert(removeTimes(manager)("3") !== firstRemoveTime)
assert(removeTimes(manager)("3") === clock.getTimeMillis + executorIdleTimeout * 1000)
@@ -385,7 +386,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).contains("3"))
// Restart remove timer
- clock.tick(1000L)
+ clock.advance(1000L)
onExecutorBusy(manager, "1")
assert(removeTimes(manager).size === 2)
onExecutorIdle(manager, "1")
@@ -401,7 +402,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("mock polling loop with no events") {
sc = createSparkContext(1, 20)
val manager = sc.executorAllocationManager.get
- val clock = new TestClock(2020L)
+ val clock = new ManualClock(2020L)
manager.setClock(clock)
// No events - we should not be adding or removing
@@ -410,15 +411,15 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
schedule(manager)
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(100L)
+ clock.advance(100L)
schedule(manager)
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(1000L)
+ clock.advance(1000L)
schedule(manager)
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(10000L)
+ clock.advance(10000L)
schedule(manager)
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
@@ -426,57 +427,57 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("mock polling loop add behavior") {
sc = createSparkContext(1, 20)
- val clock = new TestClock(2020L)
+ val clock = new ManualClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
// Scheduler queue backlogged
onSchedulerBacklogged(manager)
- clock.tick(schedulerBacklogTimeout * 1000 / 2)
+ clock.advance(schedulerBacklogTimeout * 1000 / 2)
schedule(manager)
assert(numExecutorsPending(manager) === 0) // timer not exceeded yet
- clock.tick(schedulerBacklogTimeout * 1000)
+ clock.advance(schedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 1) // first timer exceeded
- clock.tick(sustainedSchedulerBacklogTimeout * 1000 / 2)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000 / 2)
schedule(manager)
assert(numExecutorsPending(manager) === 1) // second timer not exceeded yet
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 1 + 2) // second timer exceeded
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 1 + 2 + 4) // third timer exceeded
// Scheduler queue drained
onSchedulerQueueEmpty(manager)
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7) // timer is canceled
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7)
// Scheduler queue backlogged again
onSchedulerBacklogged(manager)
- clock.tick(schedulerBacklogTimeout * 1000)
+ clock.advance(schedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7 + 1) // timer restarted
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7 + 1 + 2)
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7 + 1 + 2 + 4)
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 20) // limit reached
}
test("mock polling loop remove behavior") {
sc = createSparkContext(1, 20)
- val clock = new TestClock(2020L)
+ val clock = new ManualClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -486,11 +487,11 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
onExecutorAdded(manager, "executor-3")
assert(removeTimes(manager).size === 3)
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(executorIdleTimeout * 1000 / 2)
+ clock.advance(executorIdleTimeout * 1000 / 2)
schedule(manager)
assert(removeTimes(manager).size === 3) // idle threshold not reached yet
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(executorIdleTimeout * 1000)
+ clock.advance(executorIdleTimeout * 1000)
schedule(manager)
assert(removeTimes(manager).isEmpty) // idle threshold exceeded
assert(executorsPendingToRemove(manager).size === 2) // limit reached (1 executor remaining)
@@ -511,7 +512,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(!removeTimes(manager).contains("executor-5"))
assert(!removeTimes(manager).contains("executor-6"))
assert(executorsPendingToRemove(manager).size === 2)
- clock.tick(executorIdleTimeout * 1000)
+ clock.advance(executorIdleTimeout * 1000)
schedule(manager)
assert(removeTimes(manager).isEmpty) // idle executors are removed
assert(executorsPendingToRemove(manager).size === 4)
@@ -529,7 +530,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).contains("executor-5"))
assert(removeTimes(manager).contains("executor-6"))
assert(executorsPendingToRemove(manager).size === 4)
- clock.tick(executorIdleTimeout * 1000)
+ clock.advance(executorIdleTimeout * 1000)
schedule(manager)
assert(removeTimes(manager).isEmpty)
assert(executorsPendingToRemove(manager).size === 6) // limit reached (1 executor remaining)