aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala65
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala36
-rw-r--r--core/src/test/scala/org/apache/spark/util/FakeClock.scala26
4 files changed, 53 insertions, 77 deletions
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index d3123e8540..abfcee7572 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.{FunSuite, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.util.ManualClock
/**
* Test add and remove behavior of ExecutorAllocationManager.
@@ -321,7 +322,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("starting/canceling add timer") {
sc = createSparkContext(2, 10)
- val clock = new TestClock(8888L)
+ val clock = new ManualClock(8888L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -330,21 +331,21 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
onSchedulerBacklogged(manager)
val firstAddTime = addTime(manager)
assert(firstAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
- clock.tick(100L)
+ clock.advance(100L)
onSchedulerBacklogged(manager)
assert(addTime(manager) === firstAddTime) // timer is already started
- clock.tick(200L)
+ clock.advance(200L)
onSchedulerBacklogged(manager)
assert(addTime(manager) === firstAddTime)
onSchedulerQueueEmpty(manager)
// Restart add timer
- clock.tick(1000L)
+ clock.advance(1000L)
assert(addTime(manager) === NOT_SET)
onSchedulerBacklogged(manager)
val secondAddTime = addTime(manager)
assert(secondAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
- clock.tick(100L)
+ clock.advance(100L)
onSchedulerBacklogged(manager)
assert(addTime(manager) === secondAddTime) // timer is already started
assert(addTime(manager) !== firstAddTime)
@@ -353,7 +354,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("starting/canceling remove timers") {
sc = createSparkContext(2, 10)
- val clock = new TestClock(14444L)
+ val clock = new ManualClock(14444L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -366,17 +367,17 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).contains("1"))
val firstRemoveTime = removeTimes(manager)("1")
assert(firstRemoveTime === clock.getTimeMillis + executorIdleTimeout * 1000)
- clock.tick(100L)
+ clock.advance(100L)
onExecutorIdle(manager, "1")
assert(removeTimes(manager)("1") === firstRemoveTime) // timer is already started
- clock.tick(200L)
+ clock.advance(200L)
onExecutorIdle(manager, "1")
assert(removeTimes(manager)("1") === firstRemoveTime)
- clock.tick(300L)
+ clock.advance(300L)
onExecutorIdle(manager, "2")
assert(removeTimes(manager)("2") !== firstRemoveTime) // different executor
assert(removeTimes(manager)("2") === clock.getTimeMillis + executorIdleTimeout * 1000)
- clock.tick(400L)
+ clock.advance(400L)
onExecutorIdle(manager, "3")
assert(removeTimes(manager)("3") !== firstRemoveTime)
assert(removeTimes(manager)("3") === clock.getTimeMillis + executorIdleTimeout * 1000)
@@ -385,7 +386,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).contains("3"))
// Restart remove timer
- clock.tick(1000L)
+ clock.advance(1000L)
onExecutorBusy(manager, "1")
assert(removeTimes(manager).size === 2)
onExecutorIdle(manager, "1")
@@ -401,7 +402,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("mock polling loop with no events") {
sc = createSparkContext(1, 20)
val manager = sc.executorAllocationManager.get
- val clock = new TestClock(2020L)
+ val clock = new ManualClock(2020L)
manager.setClock(clock)
// No events - we should not be adding or removing
@@ -410,15 +411,15 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
schedule(manager)
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(100L)
+ clock.advance(100L)
schedule(manager)
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(1000L)
+ clock.advance(1000L)
schedule(manager)
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(10000L)
+ clock.advance(10000L)
schedule(manager)
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
@@ -426,57 +427,57 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("mock polling loop add behavior") {
sc = createSparkContext(1, 20)
- val clock = new TestClock(2020L)
+ val clock = new ManualClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
// Scheduler queue backlogged
onSchedulerBacklogged(manager)
- clock.tick(schedulerBacklogTimeout * 1000 / 2)
+ clock.advance(schedulerBacklogTimeout * 1000 / 2)
schedule(manager)
assert(numExecutorsPending(manager) === 0) // timer not exceeded yet
- clock.tick(schedulerBacklogTimeout * 1000)
+ clock.advance(schedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 1) // first timer exceeded
- clock.tick(sustainedSchedulerBacklogTimeout * 1000 / 2)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000 / 2)
schedule(manager)
assert(numExecutorsPending(manager) === 1) // second timer not exceeded yet
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 1 + 2) // second timer exceeded
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 1 + 2 + 4) // third timer exceeded
// Scheduler queue drained
onSchedulerQueueEmpty(manager)
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7) // timer is canceled
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7)
// Scheduler queue backlogged again
onSchedulerBacklogged(manager)
- clock.tick(schedulerBacklogTimeout * 1000)
+ clock.advance(schedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7 + 1) // timer restarted
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7 + 1 + 2)
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7 + 1 + 2 + 4)
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 20) // limit reached
}
test("mock polling loop remove behavior") {
sc = createSparkContext(1, 20)
- val clock = new TestClock(2020L)
+ val clock = new ManualClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -486,11 +487,11 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
onExecutorAdded(manager, "executor-3")
assert(removeTimes(manager).size === 3)
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(executorIdleTimeout * 1000 / 2)
+ clock.advance(executorIdleTimeout * 1000 / 2)
schedule(manager)
assert(removeTimes(manager).size === 3) // idle threshold not reached yet
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(executorIdleTimeout * 1000)
+ clock.advance(executorIdleTimeout * 1000)
schedule(manager)
assert(removeTimes(manager).isEmpty) // idle threshold exceeded
assert(executorsPendingToRemove(manager).size === 2) // limit reached (1 executor remaining)
@@ -511,7 +512,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(!removeTimes(manager).contains("executor-5"))
assert(!removeTimes(manager).contains("executor-6"))
assert(executorsPendingToRemove(manager).size === 2)
- clock.tick(executorIdleTimeout * 1000)
+ clock.advance(executorIdleTimeout * 1000)
schedule(manager)
assert(removeTimes(manager).isEmpty) // idle executors are removed
assert(executorsPendingToRemove(manager).size === 4)
@@ -529,7 +530,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).contains("executor-5"))
assert(removeTimes(manager).contains("executor-6"))
assert(executorsPendingToRemove(manager).size === 4)
- clock.tick(executorIdleTimeout * 1000)
+ clock.advance(executorIdleTimeout * 1000)
schedule(manager)
assert(removeTimes(manager).isEmpty)
assert(executorsPendingToRemove(manager).size === 6) // limit reached (1 executor remaining)
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index b6f4411e05..aa6e4874ce 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -27,6 +27,7 @@ import org.scalatest.FunSuite
import org.apache.spark.SparkConf
import org.apache.spark.deploy.{Command, DriverDescription}
+import org.apache.spark.util.Clock
class DriverRunnerTest extends FunSuite {
private def createDriverRunner() = {
@@ -129,7 +130,7 @@ class DriverRunnerTest extends FunSuite {
.thenReturn(-1) // fail 3
.thenReturn(-1) // fail 4
.thenReturn(0) // success
- when(clock.currentTimeMillis())
+ when(clock.getTimeMillis())
.thenReturn(0).thenReturn(1000) // fail 1 (short)
.thenReturn(1000).thenReturn(2000) // fail 2 (short)
.thenReturn(2000).thenReturn(10000) // fail 3 (long)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 59580561cb..12330d8f63 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.FunSuite
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.FakeClock
+import org.apache.spark.util.ManualClock
class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
extends DAGScheduler(sc) {
@@ -164,7 +164,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// Offer a host with NO_PREF as the constraint,
@@ -213,7 +213,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// An executor that is not NODE_LOCAL should be rejected.
@@ -234,7 +234,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host1"), TaskLocation("host2", "exec2")),
Seq() // Last task has no locality prefs
)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -263,7 +263,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host2", "exec3")),
Seq() // Last task has no locality prefs
)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).get.index === 0)
@@ -283,7 +283,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host3")),
Seq(TaskLocation("host2"))
)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1: first task should be chosen
@@ -321,7 +321,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host2")),
Seq(TaskLocation("host3"))
)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1: first task should be chosen
@@ -353,7 +353,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -370,7 +370,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
@@ -402,7 +402,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
("exec1.1", "host1"), ("exec2", "host2"))
// affinity to exec1 on host1 - which we will fail.
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, 4, clock)
{
@@ -486,7 +486,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host1", "execB")),
Seq(TaskLocation("host2", "execC")),
Seq())
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// Only ANY is valid
assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
@@ -522,7 +522,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val taskSet = FakeTask.createTaskSet(2,
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host1", "execA")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
@@ -611,7 +611,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host2"), TaskLocation("host1")),
Seq(),
Seq(TaskLocation("host3", "execC")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0)
@@ -637,7 +637,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host2")),
Seq(),
Seq(TaskLocation("host3")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// node-local tasks are scheduled without delay
@@ -658,7 +658,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host2")),
Seq(ExecutorCacheTaskLocation("host1", "execA")),
Seq(ExecutorCacheTaskLocation("host2", "execB")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// process-local tasks are scheduled first
@@ -678,7 +678,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(),
Seq(ExecutorCacheTaskLocation("host1", "execA")),
Seq(ExecutorCacheTaskLocation("host2", "execB")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// process-local tasks are scheduled first
@@ -698,7 +698,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val taskSet = FakeTask.createTaskSet(2,
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host2", "execB.1")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// Only ANY is valid
assert(manager.myLocalityLevels.sameElements(Array(ANY)))
@@ -732,7 +732,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(HostTaskLocation("host1")),
Seq(HostTaskLocation("host2")),
Seq(HDFSCacheTaskLocation("host3")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
sched.removeExecutor("execA")
diff --git a/core/src/test/scala/org/apache/spark/util/FakeClock.scala b/core/src/test/scala/org/apache/spark/util/FakeClock.scala
deleted file mode 100644
index 0a45917b08..0000000000
--- a/core/src/test/scala/org/apache/spark/util/FakeClock.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-class FakeClock extends Clock {
- private var time = 0L
-
- def advance(millis: Long): Unit = time += millis
-
- def getTime(): Long = time
-}