aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-02-19 15:35:23 -0800
committerAndrew Or <andrew@databricks.com>2015-02-19 15:35:23 -0800
commit34b7c35380c88569a1396fb4ed991a0bed4288e7 (patch)
treedeb25f1dd88477aff0dc19904cbdc9aedc0cc7d8 /core
parentad6b169dee84df175b51933b7a3ad7f0bbc52cf3 (diff)
downloadspark-34b7c35380c88569a1396fb4ed991a0bed4288e7.tar.gz
spark-34b7c35380c88569a1396fb4ed991a0bed4288e7.tar.bz2
spark-34b7c35380c88569a1396fb4ed991a0bed4288e7.zip
SPARK-4682 [CORE] Consolidate various 'Clock' classes
Another one from JoshRosen 's wish list. The first commit is much smaller and removes 2 of the 4 Clock classes. The second is much larger, necessary for consolidating the streaming one. I put together implementations in the way that seemed simplest. Almost all the change is standardizing class and method names. Author: Sean Owen <sowen@cloudera.com> Closes #4514 from srowen/SPARK-4682 and squashes the following commits: 5ed3a03 [Sean Owen] Javadoc Clock classes; make ManualClock private[spark] 169dd13 [Sean Owen] Add support for legacy org.apache.spark.streaming clock class names 277785a [Sean Owen] Reduce the net change in this patch by reversing some unnecessary syntax changes along the way b5e53df [Sean Owen] FakeClock -> ManualClock; getTime() -> getTimeMillis() 160863a [Sean Owen] Consolidate Streaming Clock class into common util Clock 7c956b2 [Sean Owen] Consolidate Clocks except for Streaming Clock
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/util/Clock.scala44
-rw-r--r--core/src/main/scala/org/apache/spark/util/ManualClock.scala69
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala65
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala36
-rw-r--r--core/src/test/scala/org/apache/spark/util/FakeClock.scala26
10 files changed, 188 insertions, 136 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 998695b6ac..21c6e6ffa6 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -20,6 +20,7 @@ package org.apache.spark
import scala.collection.mutable
import org.apache.spark.scheduler._
+import org.apache.spark.util.{SystemClock, Clock}
/**
* An agent that dynamically allocates and removes executors based on the workload.
@@ -123,7 +124,7 @@ private[spark] class ExecutorAllocationManager(
private val intervalMillis: Long = 100
// Clock used to schedule when executors should be added and removed
- private var clock: Clock = new RealClock
+ private var clock: Clock = new SystemClock()
// Listener for Spark events that impact the allocation policy
private val listener = new ExecutorAllocationListener
@@ -588,28 +589,3 @@ private[spark] class ExecutorAllocationManager(
private object ExecutorAllocationManager {
val NOT_SET = Long.MaxValue
}
-
-/**
- * An abstract clock for measuring elapsed time.
- */
-private trait Clock {
- def getTimeMillis: Long
-}
-
-/**
- * A clock backed by a monotonically increasing time source.
- * The time returned by this clock does not correspond to any notion of wall-clock time.
- */
-private class RealClock extends Clock {
- override def getTimeMillis: Long = System.nanoTime / (1000 * 1000)
-}
-
-/**
- * A clock that allows the caller to customize the time.
- * This is used mainly for testing.
- */
-private class TestClock(startTimeMillis: Long) extends Clock {
- private var time: Long = startTimeMillis
- override def getTimeMillis: Long = time
- def tick(ms: Long): Unit = { time += ms }
-}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index b964a09bdb..e16bccb24d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -20,19 +20,18 @@ package org.apache.spark.deploy.worker
import java.io._
import scala.collection.JavaConversions._
-import scala.collection.Map
import akka.actor.ActorRef
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.deploy.{Command, DriverDescription, SparkHadoopUtil}
+import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.master.DriverState.DriverState
+import org.apache.spark.util.{Clock, SystemClock}
/**
* Manages the execution of one driver, including automatically restarting the driver on failure.
@@ -59,9 +58,7 @@ private[spark] class DriverRunner(
// Decoupled for testing
private[deploy] def setClock(_clock: Clock) = clock = _clock
private[deploy] def setSleeper(_sleeper: Sleeper) = sleeper = _sleeper
- private var clock = new Clock {
- def currentTimeMillis(): Long = System.currentTimeMillis()
- }
+ private var clock: Clock = new SystemClock()
private var sleeper = new Sleeper {
def sleep(seconds: Int): Unit = (0 until seconds).takeWhile(f => {Thread.sleep(1000); !killed})
}
@@ -190,9 +187,9 @@ private[spark] class DriverRunner(
initialize(process.get)
}
- val processStart = clock.currentTimeMillis()
+ val processStart = clock.getTimeMillis()
val exitCode = process.get.waitFor()
- if (clock.currentTimeMillis() - processStart > successfulRunDuration * 1000) {
+ if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
waitSeconds = 1
}
@@ -208,10 +205,6 @@ private[spark] class DriverRunner(
}
}
-private[deploy] trait Clock {
- def currentTimeMillis(): Long
-}
-
private[deploy] trait Sleeper {
def sleep(seconds: Int)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8b62d2405e..c58721c2c8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -63,7 +63,7 @@ class DAGScheduler(
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
- clock: org.apache.spark.util.Clock = SystemClock)
+ clock: Clock = new SystemClock())
extends Logging {
def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
@@ -657,7 +657,7 @@ class DAGScheduler(
// completion events or stage abort
stageIdToStage -= s.id
jobIdToStageIds -= job.jobId
- listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), jobResult))
+ listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), jobResult))
}
}
@@ -706,7 +706,7 @@ class DAGScheduler(
stage.latestInfo.stageFailed(stageFailedMessage)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
}
- listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
+ listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
}
}
@@ -745,7 +745,7 @@ class DAGScheduler(
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val shouldRunLocally =
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
- val jobSubmissionTime = clock.getTime()
+ val jobSubmissionTime = clock.getTimeMillis()
if (shouldRunLocally) {
// Compute very short actions like first() or take() with no parent stages locally.
listenerBus.post(
@@ -871,7 +871,7 @@ class DAGScheduler(
logDebug("New pending tasks: " + stage.pendingTasks)
taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
- stage.latestInfo.submissionTime = Some(clock.getTime())
+ stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should post
// SparkListenerStageCompleted here in case there are no tasks to run.
@@ -940,12 +940,12 @@ class DAGScheduler(
def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = {
val serviceTime = stage.latestInfo.submissionTime match {
- case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
+ case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
case _ => "Unknown"
}
if (errorMessage.isEmpty) {
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
- stage.latestInfo.completionTime = Some(clock.getTime())
+ stage.latestInfo.completionTime = Some(clock.getTimeMillis())
} else {
stage.latestInfo.stageFailed(errorMessage.get)
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
@@ -971,7 +971,7 @@ class DAGScheduler(
markStageAsFinished(stage)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(
- SparkListenerJobEnd(job.jobId, clock.getTime(), JobSucceeded))
+ SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
}
// taskSucceeded runs some user code that might throw an exception. Make sure
@@ -1187,7 +1187,7 @@ class DAGScheduler(
}
val dependentJobs: Seq[ActiveJob] =
activeJobs.filter(job => stageDependsOn(job.finalStage, failedStage)).toSeq
- failedStage.latestInfo.completionTime = Some(clock.getTime())
+ failedStage.latestInfo.completionTime = Some(clock.getTimeMillis())
for (job <- dependentJobs) {
failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason")
}
@@ -1242,7 +1242,7 @@ class DAGScheduler(
if (ableToCancelStages) {
job.listener.jobFailed(error)
cleanupStateForJobAndIndependentStages(job)
- listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
+ listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 99a5f71177..529237f0d3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -51,7 +51,7 @@ private[spark] class TaskSetManager(
sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
- clock: Clock = SystemClock)
+ clock: Clock = new SystemClock())
extends Schedulable with Logging {
val conf = sched.sc.conf
@@ -166,7 +166,7 @@ private[spark] class TaskSetManager(
// last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
// We then move down if we manage to launch a "more local" task.
var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
- var lastLaunchTime = clock.getTime() // Time we last launched a task at this level
+ var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level
override def schedulableQueue = null
@@ -281,7 +281,7 @@ private[spark] class TaskSetManager(
val failed = failedExecutors.get(taskId).get
return failed.contains(execId) &&
- clock.getTime() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
+ clock.getTimeMillis() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
}
false
@@ -428,7 +428,7 @@ private[spark] class TaskSetManager(
: Option[TaskDescription] =
{
if (!isZombie) {
- val curTime = clock.getTime()
+ val curTime = clock.getTimeMillis()
var allowedLocality = maxLocality
@@ -459,7 +459,7 @@ private[spark] class TaskSetManager(
lastLaunchTime = curTime
}
// Serialize and return the task
- val startTime = clock.getTime()
+ val startTime = clock.getTimeMillis()
val serializedTask: ByteBuffer = try {
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
} catch {
@@ -674,7 +674,7 @@ private[spark] class TaskSetManager(
return
}
val key = ef.description
- val now = clock.getTime()
+ val now = clock.getTimeMillis()
val (printFull, dupCount) = {
if (recentExceptions.contains(key)) {
val (dupCount, printTime) = recentExceptions(key)
@@ -706,7 +706,7 @@ private[spark] class TaskSetManager(
}
// always add to failed executors
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
- put(info.executorId, clock.getTime())
+ put(info.executorId, clock.getTimeMillis())
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
addPendingTask(index)
if (!isZombie && state != TaskState.KILLED && !reason.isInstanceOf[TaskCommitDenied]) {
@@ -821,7 +821,7 @@ private[spark] class TaskSetManager(
val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
- val time = clock.getTime()
+ val time = clock.getTimeMillis()
val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
Arrays.sort(durations)
val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.size - 1))
diff --git a/core/src/main/scala/org/apache/spark/util/Clock.scala b/core/src/main/scala/org/apache/spark/util/Clock.scala
index 97c2b45aab..e92ed11bd1 100644
--- a/core/src/main/scala/org/apache/spark/util/Clock.scala
+++ b/core/src/main/scala/org/apache/spark/util/Clock.scala
@@ -21,9 +21,47 @@ package org.apache.spark.util
* An interface to represent clocks, so that they can be mocked out in unit tests.
*/
private[spark] trait Clock {
- def getTime(): Long
+ def getTimeMillis(): Long
+ def waitTillTime(targetTime: Long): Long
}
-private[spark] object SystemClock extends Clock {
- def getTime(): Long = System.currentTimeMillis()
+/**
+ * A clock backed by the actual time from the OS as reported by the `System` API.
+ */
+private[spark] class SystemClock extends Clock {
+
+ val minPollTime = 25L
+
+ /**
+ * @return the same time (milliseconds since the epoch)
+ * as is reported by `System.currentTimeMillis()`
+ */
+ def getTimeMillis(): Long = System.currentTimeMillis()
+
+ /**
+ * @param targetTime block until the current time is at least this value
+ * @return current system time when wait has completed
+ */
+ def waitTillTime(targetTime: Long): Long = {
+ var currentTime = 0L
+ currentTime = System.currentTimeMillis()
+
+ var waitTime = targetTime - currentTime
+ if (waitTime <= 0) {
+ return currentTime
+ }
+
+ val pollTime = math.max(waitTime / 10.0, minPollTime).toLong
+
+ while (true) {
+ currentTime = System.currentTimeMillis()
+ waitTime = targetTime - currentTime
+ if (waitTime <= 0) {
+ return currentTime
+ }
+ val sleepTime = math.min(waitTime, pollTime)
+ Thread.sleep(sleepTime)
+ }
+ -1
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala b/core/src/main/scala/org/apache/spark/util/ManualClock.scala
new file mode 100644
index 0000000000..cf89c1782f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/**
+ * A `Clock` whose time can be manually set and modified. Its reported time does not change
+ * as time elapses, but only as its time is modified by callers. This is mainly useful for
+ * testing.
+ *
+ * @param time initial time (in milliseconds since the epoch)
+ */
+private[spark] class ManualClock(private var time: Long) extends Clock {
+
+ /**
+ * @return `ManualClock` with initial time 0
+ */
+ def this() = this(0L)
+
+ def getTimeMillis(): Long =
+ synchronized {
+ time
+ }
+
+ /**
+ * @param timeToSet new time (in milliseconds) that the clock should represent
+ */
+ def setTime(timeToSet: Long) =
+ synchronized {
+ time = timeToSet
+ notifyAll()
+ }
+
+ /**
+ * @param timeToAdd time (in milliseconds) to add to the clock's time
+ */
+ def advance(timeToAdd: Long) =
+ synchronized {
+ time += timeToAdd
+ notifyAll()
+ }
+
+ /**
+ * @param targetTime block until the clock time is set or advanced to at least this time
+ * @return current time reported by the clock when waiting finishes
+ */
+ def waitTillTime(targetTime: Long): Long =
+ synchronized {
+ while (time < targetTime) {
+ wait(100)
+ }
+ getTimeMillis()
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index d3123e8540..abfcee7572 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.{FunSuite, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.util.ManualClock
/**
* Test add and remove behavior of ExecutorAllocationManager.
@@ -321,7 +322,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("starting/canceling add timer") {
sc = createSparkContext(2, 10)
- val clock = new TestClock(8888L)
+ val clock = new ManualClock(8888L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -330,21 +331,21 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
onSchedulerBacklogged(manager)
val firstAddTime = addTime(manager)
assert(firstAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
- clock.tick(100L)
+ clock.advance(100L)
onSchedulerBacklogged(manager)
assert(addTime(manager) === firstAddTime) // timer is already started
- clock.tick(200L)
+ clock.advance(200L)
onSchedulerBacklogged(manager)
assert(addTime(manager) === firstAddTime)
onSchedulerQueueEmpty(manager)
// Restart add timer
- clock.tick(1000L)
+ clock.advance(1000L)
assert(addTime(manager) === NOT_SET)
onSchedulerBacklogged(manager)
val secondAddTime = addTime(manager)
assert(secondAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
- clock.tick(100L)
+ clock.advance(100L)
onSchedulerBacklogged(manager)
assert(addTime(manager) === secondAddTime) // timer is already started
assert(addTime(manager) !== firstAddTime)
@@ -353,7 +354,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("starting/canceling remove timers") {
sc = createSparkContext(2, 10)
- val clock = new TestClock(14444L)
+ val clock = new ManualClock(14444L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -366,17 +367,17 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).contains("1"))
val firstRemoveTime = removeTimes(manager)("1")
assert(firstRemoveTime === clock.getTimeMillis + executorIdleTimeout * 1000)
- clock.tick(100L)
+ clock.advance(100L)
onExecutorIdle(manager, "1")
assert(removeTimes(manager)("1") === firstRemoveTime) // timer is already started
- clock.tick(200L)
+ clock.advance(200L)
onExecutorIdle(manager, "1")
assert(removeTimes(manager)("1") === firstRemoveTime)
- clock.tick(300L)
+ clock.advance(300L)
onExecutorIdle(manager, "2")
assert(removeTimes(manager)("2") !== firstRemoveTime) // different executor
assert(removeTimes(manager)("2") === clock.getTimeMillis + executorIdleTimeout * 1000)
- clock.tick(400L)
+ clock.advance(400L)
onExecutorIdle(manager, "3")
assert(removeTimes(manager)("3") !== firstRemoveTime)
assert(removeTimes(manager)("3") === clock.getTimeMillis + executorIdleTimeout * 1000)
@@ -385,7 +386,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).contains("3"))
// Restart remove timer
- clock.tick(1000L)
+ clock.advance(1000L)
onExecutorBusy(manager, "1")
assert(removeTimes(manager).size === 2)
onExecutorIdle(manager, "1")
@@ -401,7 +402,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("mock polling loop with no events") {
sc = createSparkContext(1, 20)
val manager = sc.executorAllocationManager.get
- val clock = new TestClock(2020L)
+ val clock = new ManualClock(2020L)
manager.setClock(clock)
// No events - we should not be adding or removing
@@ -410,15 +411,15 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
schedule(manager)
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(100L)
+ clock.advance(100L)
schedule(manager)
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(1000L)
+ clock.advance(1000L)
schedule(manager)
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(10000L)
+ clock.advance(10000L)
schedule(manager)
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
@@ -426,57 +427,57 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("mock polling loop add behavior") {
sc = createSparkContext(1, 20)
- val clock = new TestClock(2020L)
+ val clock = new ManualClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
// Scheduler queue backlogged
onSchedulerBacklogged(manager)
- clock.tick(schedulerBacklogTimeout * 1000 / 2)
+ clock.advance(schedulerBacklogTimeout * 1000 / 2)
schedule(manager)
assert(numExecutorsPending(manager) === 0) // timer not exceeded yet
- clock.tick(schedulerBacklogTimeout * 1000)
+ clock.advance(schedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 1) // first timer exceeded
- clock.tick(sustainedSchedulerBacklogTimeout * 1000 / 2)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000 / 2)
schedule(manager)
assert(numExecutorsPending(manager) === 1) // second timer not exceeded yet
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 1 + 2) // second timer exceeded
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 1 + 2 + 4) // third timer exceeded
// Scheduler queue drained
onSchedulerQueueEmpty(manager)
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7) // timer is canceled
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7)
// Scheduler queue backlogged again
onSchedulerBacklogged(manager)
- clock.tick(schedulerBacklogTimeout * 1000)
+ clock.advance(schedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7 + 1) // timer restarted
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7 + 1 + 2)
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7 + 1 + 2 + 4)
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 20) // limit reached
}
test("mock polling loop remove behavior") {
sc = createSparkContext(1, 20)
- val clock = new TestClock(2020L)
+ val clock = new ManualClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -486,11 +487,11 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
onExecutorAdded(manager, "executor-3")
assert(removeTimes(manager).size === 3)
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(executorIdleTimeout * 1000 / 2)
+ clock.advance(executorIdleTimeout * 1000 / 2)
schedule(manager)
assert(removeTimes(manager).size === 3) // idle threshold not reached yet
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(executorIdleTimeout * 1000)
+ clock.advance(executorIdleTimeout * 1000)
schedule(manager)
assert(removeTimes(manager).isEmpty) // idle threshold exceeded
assert(executorsPendingToRemove(manager).size === 2) // limit reached (1 executor remaining)
@@ -511,7 +512,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(!removeTimes(manager).contains("executor-5"))
assert(!removeTimes(manager).contains("executor-6"))
assert(executorsPendingToRemove(manager).size === 2)
- clock.tick(executorIdleTimeout * 1000)
+ clock.advance(executorIdleTimeout * 1000)
schedule(manager)
assert(removeTimes(manager).isEmpty) // idle executors are removed
assert(executorsPendingToRemove(manager).size === 4)
@@ -529,7 +530,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).contains("executor-5"))
assert(removeTimes(manager).contains("executor-6"))
assert(executorsPendingToRemove(manager).size === 4)
- clock.tick(executorIdleTimeout * 1000)
+ clock.advance(executorIdleTimeout * 1000)
schedule(manager)
assert(removeTimes(manager).isEmpty)
assert(executorsPendingToRemove(manager).size === 6) // limit reached (1 executor remaining)
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index b6f4411e05..aa6e4874ce 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -27,6 +27,7 @@ import org.scalatest.FunSuite
import org.apache.spark.SparkConf
import org.apache.spark.deploy.{Command, DriverDescription}
+import org.apache.spark.util.Clock
class DriverRunnerTest extends FunSuite {
private def createDriverRunner() = {
@@ -129,7 +130,7 @@ class DriverRunnerTest extends FunSuite {
.thenReturn(-1) // fail 3
.thenReturn(-1) // fail 4
.thenReturn(0) // success
- when(clock.currentTimeMillis())
+ when(clock.getTimeMillis())
.thenReturn(0).thenReturn(1000) // fail 1 (short)
.thenReturn(1000).thenReturn(2000) // fail 2 (short)
.thenReturn(2000).thenReturn(10000) // fail 3 (long)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 59580561cb..12330d8f63 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.FunSuite
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.FakeClock
+import org.apache.spark.util.ManualClock
class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
extends DAGScheduler(sc) {
@@ -164,7 +164,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// Offer a host with NO_PREF as the constraint,
@@ -213,7 +213,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// An executor that is not NODE_LOCAL should be rejected.
@@ -234,7 +234,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host1"), TaskLocation("host2", "exec2")),
Seq() // Last task has no locality prefs
)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -263,7 +263,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host2", "exec3")),
Seq() // Last task has no locality prefs
)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).get.index === 0)
@@ -283,7 +283,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host3")),
Seq(TaskLocation("host2"))
)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1: first task should be chosen
@@ -321,7 +321,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host2")),
Seq(TaskLocation("host3"))
)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1: first task should be chosen
@@ -353,7 +353,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -370,7 +370,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
@@ -402,7 +402,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
("exec1.1", "host1"), ("exec2", "host2"))
// affinity to exec1 on host1 - which we will fail.
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, 4, clock)
{
@@ -486,7 +486,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host1", "execB")),
Seq(TaskLocation("host2", "execC")),
Seq())
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// Only ANY is valid
assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
@@ -522,7 +522,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val taskSet = FakeTask.createTaskSet(2,
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host1", "execA")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
@@ -611,7 +611,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host2"), TaskLocation("host1")),
Seq(),
Seq(TaskLocation("host3", "execC")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0)
@@ -637,7 +637,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host2")),
Seq(),
Seq(TaskLocation("host3")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// node-local tasks are scheduled without delay
@@ -658,7 +658,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host2")),
Seq(ExecutorCacheTaskLocation("host1", "execA")),
Seq(ExecutorCacheTaskLocation("host2", "execB")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// process-local tasks are scheduled first
@@ -678,7 +678,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(),
Seq(ExecutorCacheTaskLocation("host1", "execA")),
Seq(ExecutorCacheTaskLocation("host2", "execB")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// process-local tasks are scheduled first
@@ -698,7 +698,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val taskSet = FakeTask.createTaskSet(2,
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host2", "execB.1")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// Only ANY is valid
assert(manager.myLocalityLevels.sameElements(Array(ANY)))
@@ -732,7 +732,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(HostTaskLocation("host1")),
Seq(HostTaskLocation("host2")),
Seq(HDFSCacheTaskLocation("host3")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
sched.removeExecutor("execA")
diff --git a/core/src/test/scala/org/apache/spark/util/FakeClock.scala b/core/src/test/scala/org/apache/spark/util/FakeClock.scala
deleted file mode 100644
index 0a45917b08..0000000000
--- a/core/src/test/scala/org/apache/spark/util/FakeClock.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-class FakeClock extends Clock {
- private var time = 0L
-
- def advance(millis: Long): Unit = time += millis
-
- def getTime(): Long = time
-}