aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/util/Clock.scala44
-rw-r--r--core/src/main/scala/org/apache/spark/util/ManualClock.scala69
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala65
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala36
-rw-r--r--core/src/test/scala/org/apache/spark/util/FakeClock.scala26
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java2
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala7
-rw-r--r--external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java2
-rw-r--r--external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java2
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java2
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala10
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala2
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala25
-rw-r--r--mllib/src/test/java/org/apache/spark/ml/classification/JavaStreamingLogisticRegressionSuite.java2
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java2
-rw-r--r--project/MimaExcludes.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala89
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala5
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala33
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala37
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala10
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala15
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala10
37 files changed, 301 insertions, 337 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 998695b6ac..21c6e6ffa6 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -20,6 +20,7 @@ package org.apache.spark
import scala.collection.mutable
import org.apache.spark.scheduler._
+import org.apache.spark.util.{SystemClock, Clock}
/**
* An agent that dynamically allocates and removes executors based on the workload.
@@ -123,7 +124,7 @@ private[spark] class ExecutorAllocationManager(
private val intervalMillis: Long = 100
// Clock used to schedule when executors should be added and removed
- private var clock: Clock = new RealClock
+ private var clock: Clock = new SystemClock()
// Listener for Spark events that impact the allocation policy
private val listener = new ExecutorAllocationListener
@@ -588,28 +589,3 @@ private[spark] class ExecutorAllocationManager(
private object ExecutorAllocationManager {
val NOT_SET = Long.MaxValue
}
-
-/**
- * An abstract clock for measuring elapsed time.
- */
-private trait Clock {
- def getTimeMillis: Long
-}
-
-/**
- * A clock backed by a monotonically increasing time source.
- * The time returned by this clock does not correspond to any notion of wall-clock time.
- */
-private class RealClock extends Clock {
- override def getTimeMillis: Long = System.nanoTime / (1000 * 1000)
-}
-
-/**
- * A clock that allows the caller to customize the time.
- * This is used mainly for testing.
- */
-private class TestClock(startTimeMillis: Long) extends Clock {
- private var time: Long = startTimeMillis
- override def getTimeMillis: Long = time
- def tick(ms: Long): Unit = { time += ms }
-}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index b964a09bdb..e16bccb24d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -20,19 +20,18 @@ package org.apache.spark.deploy.worker
import java.io._
import scala.collection.JavaConversions._
-import scala.collection.Map
import akka.actor.ActorRef
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.deploy.{Command, DriverDescription, SparkHadoopUtil}
+import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.master.DriverState.DriverState
+import org.apache.spark.util.{Clock, SystemClock}
/**
* Manages the execution of one driver, including automatically restarting the driver on failure.
@@ -59,9 +58,7 @@ private[spark] class DriverRunner(
// Decoupled for testing
private[deploy] def setClock(_clock: Clock) = clock = _clock
private[deploy] def setSleeper(_sleeper: Sleeper) = sleeper = _sleeper
- private var clock = new Clock {
- def currentTimeMillis(): Long = System.currentTimeMillis()
- }
+ private var clock: Clock = new SystemClock()
private var sleeper = new Sleeper {
def sleep(seconds: Int): Unit = (0 until seconds).takeWhile(f => {Thread.sleep(1000); !killed})
}
@@ -190,9 +187,9 @@ private[spark] class DriverRunner(
initialize(process.get)
}
- val processStart = clock.currentTimeMillis()
+ val processStart = clock.getTimeMillis()
val exitCode = process.get.waitFor()
- if (clock.currentTimeMillis() - processStart > successfulRunDuration * 1000) {
+ if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
waitSeconds = 1
}
@@ -208,10 +205,6 @@ private[spark] class DriverRunner(
}
}
-private[deploy] trait Clock {
- def currentTimeMillis(): Long
-}
-
private[deploy] trait Sleeper {
def sleep(seconds: Int)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8b62d2405e..c58721c2c8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -63,7 +63,7 @@ class DAGScheduler(
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
- clock: org.apache.spark.util.Clock = SystemClock)
+ clock: Clock = new SystemClock())
extends Logging {
def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
@@ -657,7 +657,7 @@ class DAGScheduler(
// completion events or stage abort
stageIdToStage -= s.id
jobIdToStageIds -= job.jobId
- listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), jobResult))
+ listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), jobResult))
}
}
@@ -706,7 +706,7 @@ class DAGScheduler(
stage.latestInfo.stageFailed(stageFailedMessage)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
}
- listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
+ listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
}
}
@@ -745,7 +745,7 @@ class DAGScheduler(
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val shouldRunLocally =
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
- val jobSubmissionTime = clock.getTime()
+ val jobSubmissionTime = clock.getTimeMillis()
if (shouldRunLocally) {
// Compute very short actions like first() or take() with no parent stages locally.
listenerBus.post(
@@ -871,7 +871,7 @@ class DAGScheduler(
logDebug("New pending tasks: " + stage.pendingTasks)
taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
- stage.latestInfo.submissionTime = Some(clock.getTime())
+ stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should post
// SparkListenerStageCompleted here in case there are no tasks to run.
@@ -940,12 +940,12 @@ class DAGScheduler(
def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = {
val serviceTime = stage.latestInfo.submissionTime match {
- case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
+ case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
case _ => "Unknown"
}
if (errorMessage.isEmpty) {
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
- stage.latestInfo.completionTime = Some(clock.getTime())
+ stage.latestInfo.completionTime = Some(clock.getTimeMillis())
} else {
stage.latestInfo.stageFailed(errorMessage.get)
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
@@ -971,7 +971,7 @@ class DAGScheduler(
markStageAsFinished(stage)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(
- SparkListenerJobEnd(job.jobId, clock.getTime(), JobSucceeded))
+ SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
}
// taskSucceeded runs some user code that might throw an exception. Make sure
@@ -1187,7 +1187,7 @@ class DAGScheduler(
}
val dependentJobs: Seq[ActiveJob] =
activeJobs.filter(job => stageDependsOn(job.finalStage, failedStage)).toSeq
- failedStage.latestInfo.completionTime = Some(clock.getTime())
+ failedStage.latestInfo.completionTime = Some(clock.getTimeMillis())
for (job <- dependentJobs) {
failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason")
}
@@ -1242,7 +1242,7 @@ class DAGScheduler(
if (ableToCancelStages) {
job.listener.jobFailed(error)
cleanupStateForJobAndIndependentStages(job)
- listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
+ listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 99a5f71177..529237f0d3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -51,7 +51,7 @@ private[spark] class TaskSetManager(
sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
- clock: Clock = SystemClock)
+ clock: Clock = new SystemClock())
extends Schedulable with Logging {
val conf = sched.sc.conf
@@ -166,7 +166,7 @@ private[spark] class TaskSetManager(
// last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
// We then move down if we manage to launch a "more local" task.
var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
- var lastLaunchTime = clock.getTime() // Time we last launched a task at this level
+ var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level
override def schedulableQueue = null
@@ -281,7 +281,7 @@ private[spark] class TaskSetManager(
val failed = failedExecutors.get(taskId).get
return failed.contains(execId) &&
- clock.getTime() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
+ clock.getTimeMillis() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
}
false
@@ -428,7 +428,7 @@ private[spark] class TaskSetManager(
: Option[TaskDescription] =
{
if (!isZombie) {
- val curTime = clock.getTime()
+ val curTime = clock.getTimeMillis()
var allowedLocality = maxLocality
@@ -459,7 +459,7 @@ private[spark] class TaskSetManager(
lastLaunchTime = curTime
}
// Serialize and return the task
- val startTime = clock.getTime()
+ val startTime = clock.getTimeMillis()
val serializedTask: ByteBuffer = try {
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
} catch {
@@ -674,7 +674,7 @@ private[spark] class TaskSetManager(
return
}
val key = ef.description
- val now = clock.getTime()
+ val now = clock.getTimeMillis()
val (printFull, dupCount) = {
if (recentExceptions.contains(key)) {
val (dupCount, printTime) = recentExceptions(key)
@@ -706,7 +706,7 @@ private[spark] class TaskSetManager(
}
// always add to failed executors
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
- put(info.executorId, clock.getTime())
+ put(info.executorId, clock.getTimeMillis())
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
addPendingTask(index)
if (!isZombie && state != TaskState.KILLED && !reason.isInstanceOf[TaskCommitDenied]) {
@@ -821,7 +821,7 @@ private[spark] class TaskSetManager(
val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
- val time = clock.getTime()
+ val time = clock.getTimeMillis()
val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
Arrays.sort(durations)
val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.size - 1))
diff --git a/core/src/main/scala/org/apache/spark/util/Clock.scala b/core/src/main/scala/org/apache/spark/util/Clock.scala
index 97c2b45aab..e92ed11bd1 100644
--- a/core/src/main/scala/org/apache/spark/util/Clock.scala
+++ b/core/src/main/scala/org/apache/spark/util/Clock.scala
@@ -21,9 +21,47 @@ package org.apache.spark.util
* An interface to represent clocks, so that they can be mocked out in unit tests.
*/
private[spark] trait Clock {
- def getTime(): Long
+ def getTimeMillis(): Long
+ def waitTillTime(targetTime: Long): Long
}
-private[spark] object SystemClock extends Clock {
- def getTime(): Long = System.currentTimeMillis()
+/**
+ * A clock backed by the actual time from the OS as reported by the `System` API.
+ */
+private[spark] class SystemClock extends Clock {
+
+ val minPollTime = 25L
+
+ /**
+ * @return the same time (milliseconds since the epoch)
+ * as is reported by `System.currentTimeMillis()`
+ */
+ def getTimeMillis(): Long = System.currentTimeMillis()
+
+ /**
+ * @param targetTime block until the current time is at least this value
+ * @return current system time when wait has completed
+ */
+ def waitTillTime(targetTime: Long): Long = {
+ var currentTime = 0L
+ currentTime = System.currentTimeMillis()
+
+ var waitTime = targetTime - currentTime
+ if (waitTime <= 0) {
+ return currentTime
+ }
+
+ val pollTime = math.max(waitTime / 10.0, minPollTime).toLong
+
+ while (true) {
+ currentTime = System.currentTimeMillis()
+ waitTime = targetTime - currentTime
+ if (waitTime <= 0) {
+ return currentTime
+ }
+ val sleepTime = math.min(waitTime, pollTime)
+ Thread.sleep(sleepTime)
+ }
+ -1
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala b/core/src/main/scala/org/apache/spark/util/ManualClock.scala
new file mode 100644
index 0000000000..cf89c1782f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/**
+ * A `Clock` whose time can be manually set and modified. Its reported time does not change
+ * as time elapses, but only as its time is modified by callers. This is mainly useful for
+ * testing.
+ *
+ * @param time initial time (in milliseconds since the epoch)
+ */
+private[spark] class ManualClock(private var time: Long) extends Clock {
+
+ /**
+ * @return `ManualClock` with initial time 0
+ */
+ def this() = this(0L)
+
+ def getTimeMillis(): Long =
+ synchronized {
+ time
+ }
+
+ /**
+ * @param timeToSet new time (in milliseconds) that the clock should represent
+ */
+ def setTime(timeToSet: Long) =
+ synchronized {
+ time = timeToSet
+ notifyAll()
+ }
+
+ /**
+ * @param timeToAdd time (in milliseconds) to add to the clock's time
+ */
+ def advance(timeToAdd: Long) =
+ synchronized {
+ time += timeToAdd
+ notifyAll()
+ }
+
+ /**
+ * @param targetTime block until the clock time is set or advanced to at least this time
+ * @return current time reported by the clock when waiting finishes
+ */
+ def waitTillTime(targetTime: Long): Long =
+ synchronized {
+ while (time < targetTime) {
+ wait(100)
+ }
+ getTimeMillis()
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index d3123e8540..abfcee7572 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.{FunSuite, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.util.ManualClock
/**
* Test add and remove behavior of ExecutorAllocationManager.
@@ -321,7 +322,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("starting/canceling add timer") {
sc = createSparkContext(2, 10)
- val clock = new TestClock(8888L)
+ val clock = new ManualClock(8888L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -330,21 +331,21 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
onSchedulerBacklogged(manager)
val firstAddTime = addTime(manager)
assert(firstAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
- clock.tick(100L)
+ clock.advance(100L)
onSchedulerBacklogged(manager)
assert(addTime(manager) === firstAddTime) // timer is already started
- clock.tick(200L)
+ clock.advance(200L)
onSchedulerBacklogged(manager)
assert(addTime(manager) === firstAddTime)
onSchedulerQueueEmpty(manager)
// Restart add timer
- clock.tick(1000L)
+ clock.advance(1000L)
assert(addTime(manager) === NOT_SET)
onSchedulerBacklogged(manager)
val secondAddTime = addTime(manager)
assert(secondAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
- clock.tick(100L)
+ clock.advance(100L)
onSchedulerBacklogged(manager)
assert(addTime(manager) === secondAddTime) // timer is already started
assert(addTime(manager) !== firstAddTime)
@@ -353,7 +354,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("starting/canceling remove timers") {
sc = createSparkContext(2, 10)
- val clock = new TestClock(14444L)
+ val clock = new ManualClock(14444L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -366,17 +367,17 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).contains("1"))
val firstRemoveTime = removeTimes(manager)("1")
assert(firstRemoveTime === clock.getTimeMillis + executorIdleTimeout * 1000)
- clock.tick(100L)
+ clock.advance(100L)
onExecutorIdle(manager, "1")
assert(removeTimes(manager)("1") === firstRemoveTime) // timer is already started
- clock.tick(200L)
+ clock.advance(200L)
onExecutorIdle(manager, "1")
assert(removeTimes(manager)("1") === firstRemoveTime)
- clock.tick(300L)
+ clock.advance(300L)
onExecutorIdle(manager, "2")
assert(removeTimes(manager)("2") !== firstRemoveTime) // different executor
assert(removeTimes(manager)("2") === clock.getTimeMillis + executorIdleTimeout * 1000)
- clock.tick(400L)
+ clock.advance(400L)
onExecutorIdle(manager, "3")
assert(removeTimes(manager)("3") !== firstRemoveTime)
assert(removeTimes(manager)("3") === clock.getTimeMillis + executorIdleTimeout * 1000)
@@ -385,7 +386,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).contains("3"))
// Restart remove timer
- clock.tick(1000L)
+ clock.advance(1000L)
onExecutorBusy(manager, "1")
assert(removeTimes(manager).size === 2)
onExecutorIdle(manager, "1")
@@ -401,7 +402,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("mock polling loop with no events") {
sc = createSparkContext(1, 20)
val manager = sc.executorAllocationManager.get
- val clock = new TestClock(2020L)
+ val clock = new ManualClock(2020L)
manager.setClock(clock)
// No events - we should not be adding or removing
@@ -410,15 +411,15 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
schedule(manager)
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(100L)
+ clock.advance(100L)
schedule(manager)
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(1000L)
+ clock.advance(1000L)
schedule(manager)
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(10000L)
+ clock.advance(10000L)
schedule(manager)
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
@@ -426,57 +427,57 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("mock polling loop add behavior") {
sc = createSparkContext(1, 20)
- val clock = new TestClock(2020L)
+ val clock = new ManualClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
// Scheduler queue backlogged
onSchedulerBacklogged(manager)
- clock.tick(schedulerBacklogTimeout * 1000 / 2)
+ clock.advance(schedulerBacklogTimeout * 1000 / 2)
schedule(manager)
assert(numExecutorsPending(manager) === 0) // timer not exceeded yet
- clock.tick(schedulerBacklogTimeout * 1000)
+ clock.advance(schedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 1) // first timer exceeded
- clock.tick(sustainedSchedulerBacklogTimeout * 1000 / 2)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000 / 2)
schedule(manager)
assert(numExecutorsPending(manager) === 1) // second timer not exceeded yet
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 1 + 2) // second timer exceeded
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 1 + 2 + 4) // third timer exceeded
// Scheduler queue drained
onSchedulerQueueEmpty(manager)
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7) // timer is canceled
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7)
// Scheduler queue backlogged again
onSchedulerBacklogged(manager)
- clock.tick(schedulerBacklogTimeout * 1000)
+ clock.advance(schedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7 + 1) // timer restarted
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7 + 1 + 2)
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 7 + 1 + 2 + 4)
- clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
assert(numExecutorsPending(manager) === 20) // limit reached
}
test("mock polling loop remove behavior") {
sc = createSparkContext(1, 20)
- val clock = new TestClock(2020L)
+ val clock = new ManualClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -486,11 +487,11 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
onExecutorAdded(manager, "executor-3")
assert(removeTimes(manager).size === 3)
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(executorIdleTimeout * 1000 / 2)
+ clock.advance(executorIdleTimeout * 1000 / 2)
schedule(manager)
assert(removeTimes(manager).size === 3) // idle threshold not reached yet
assert(executorsPendingToRemove(manager).isEmpty)
- clock.tick(executorIdleTimeout * 1000)
+ clock.advance(executorIdleTimeout * 1000)
schedule(manager)
assert(removeTimes(manager).isEmpty) // idle threshold exceeded
assert(executorsPendingToRemove(manager).size === 2) // limit reached (1 executor remaining)
@@ -511,7 +512,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(!removeTimes(manager).contains("executor-5"))
assert(!removeTimes(manager).contains("executor-6"))
assert(executorsPendingToRemove(manager).size === 2)
- clock.tick(executorIdleTimeout * 1000)
+ clock.advance(executorIdleTimeout * 1000)
schedule(manager)
assert(removeTimes(manager).isEmpty) // idle executors are removed
assert(executorsPendingToRemove(manager).size === 4)
@@ -529,7 +530,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).contains("executor-5"))
assert(removeTimes(manager).contains("executor-6"))
assert(executorsPendingToRemove(manager).size === 4)
- clock.tick(executorIdleTimeout * 1000)
+ clock.advance(executorIdleTimeout * 1000)
schedule(manager)
assert(removeTimes(manager).isEmpty)
assert(executorsPendingToRemove(manager).size === 6) // limit reached (1 executor remaining)
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index b6f4411e05..aa6e4874ce 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -27,6 +27,7 @@ import org.scalatest.FunSuite
import org.apache.spark.SparkConf
import org.apache.spark.deploy.{Command, DriverDescription}
+import org.apache.spark.util.Clock
class DriverRunnerTest extends FunSuite {
private def createDriverRunner() = {
@@ -129,7 +130,7 @@ class DriverRunnerTest extends FunSuite {
.thenReturn(-1) // fail 3
.thenReturn(-1) // fail 4
.thenReturn(0) // success
- when(clock.currentTimeMillis())
+ when(clock.getTimeMillis())
.thenReturn(0).thenReturn(1000) // fail 1 (short)
.thenReturn(1000).thenReturn(2000) // fail 2 (short)
.thenReturn(2000).thenReturn(10000) // fail 3 (long)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 59580561cb..12330d8f63 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.FunSuite
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.FakeClock
+import org.apache.spark.util.ManualClock
class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
extends DAGScheduler(sc) {
@@ -164,7 +164,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// Offer a host with NO_PREF as the constraint,
@@ -213,7 +213,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// An executor that is not NODE_LOCAL should be rejected.
@@ -234,7 +234,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host1"), TaskLocation("host2", "exec2")),
Seq() // Last task has no locality prefs
)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -263,7 +263,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host2", "exec3")),
Seq() // Last task has no locality prefs
)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).get.index === 0)
@@ -283,7 +283,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host3")),
Seq(TaskLocation("host2"))
)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1: first task should be chosen
@@ -321,7 +321,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host2")),
Seq(TaskLocation("host3"))
)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1: first task should be chosen
@@ -353,7 +353,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -370,7 +370,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
@@ -402,7 +402,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
("exec1.1", "host1"), ("exec2", "host2"))
// affinity to exec1 on host1 - which we will fail.
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, 4, clock)
{
@@ -486,7 +486,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host1", "execB")),
Seq(TaskLocation("host2", "execC")),
Seq())
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// Only ANY is valid
assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
@@ -522,7 +522,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val taskSet = FakeTask.createTaskSet(2,
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host1", "execA")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
@@ -611,7 +611,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host2"), TaskLocation("host1")),
Seq(),
Seq(TaskLocation("host3", "execC")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0)
@@ -637,7 +637,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host2")),
Seq(),
Seq(TaskLocation("host3")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// node-local tasks are scheduled without delay
@@ -658,7 +658,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(TaskLocation("host2")),
Seq(ExecutorCacheTaskLocation("host1", "execA")),
Seq(ExecutorCacheTaskLocation("host2", "execB")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// process-local tasks are scheduled first
@@ -678,7 +678,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(),
Seq(ExecutorCacheTaskLocation("host1", "execA")),
Seq(ExecutorCacheTaskLocation("host2", "execB")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// process-local tasks are scheduled first
@@ -698,7 +698,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val taskSet = FakeTask.createTaskSet(2,
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host2", "execB.1")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// Only ANY is valid
assert(manager.myLocalityLevels.sameElements(Array(ANY)))
@@ -732,7 +732,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq(HostTaskLocation("host1")),
Seq(HostTaskLocation("host2")),
Seq(HDFSCacheTaskLocation("host3")))
- val clock = new FakeClock
+ val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
sched.removeExecutor("execA")
diff --git a/core/src/test/scala/org/apache/spark/util/FakeClock.scala b/core/src/test/scala/org/apache/spark/util/FakeClock.scala
deleted file mode 100644
index 0a45917b08..0000000000
--- a/core/src/test/scala/org/apache/spark/util/FakeClock.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-class FakeClock extends Clock {
- private var time = 0L
-
- def advance(millis: Long): Unit = time += millis
-
- def getTime(): Long = time
-}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 1e24da7f5f..cfedb5a042 100644
--- a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -31,7 +31,7 @@ public abstract class LocalJavaStreamingContext {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index b57a1c71e3..e04d4088df 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -34,10 +34,9 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.{Seconds, TestOutputStream, StreamingContext}
import org.apache.spark.streaming.flume.sink._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ManualClock, Utils}
class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging {
@@ -54,7 +53,7 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
def beforeFunction() {
logInfo("Using manual clock")
- conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
}
before(beforeFunction())
@@ -236,7 +235,7 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
tx.commit()
tx.close()
Thread.sleep(500) // Allow some time for the events to reach
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
}
null
}
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 1e24da7f5f..cfedb5a042 100644
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -31,7 +31,7 @@ public abstract class LocalJavaStreamingContext {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 1e24da7f5f..cfedb5a042 100644
--- a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -31,7 +31,7 @@ public abstract class LocalJavaStreamingContext {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 1e24da7f5f..cfedb5a042 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -31,7 +31,7 @@ public abstract class LocalJavaStreamingContext {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
index 0b80b611cd..588e86a188 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
@@ -18,9 +18,7 @@ package org.apache.spark.streaming.kinesis
import org.apache.spark.Logging
import org.apache.spark.streaming.Duration
-import org.apache.spark.streaming.util.Clock
-import org.apache.spark.streaming.util.ManualClock
-import org.apache.spark.streaming.util.SystemClock
+import org.apache.spark.util.{Clock, ManualClock, SystemClock}
/**
* This is a helper class for managing checkpoint clocks.
@@ -35,7 +33,7 @@ private[kinesis] class KinesisCheckpointState(
/* Initialize the checkpoint clock using the given currentClock + checkpointInterval millis */
val checkpointClock = new ManualClock()
- checkpointClock.setTime(currentClock.currentTime() + checkpointInterval.milliseconds)
+ checkpointClock.setTime(currentClock.getTimeMillis() + checkpointInterval.milliseconds)
/**
* Check if it's time to checkpoint based on the current time and the derived time
@@ -44,13 +42,13 @@ private[kinesis] class KinesisCheckpointState(
* @return true if it's time to checkpoint
*/
def shouldCheckpoint(): Boolean = {
- new SystemClock().currentTime() > checkpointClock.currentTime()
+ new SystemClock().getTimeMillis() > checkpointClock.getTimeMillis()
}
/**
* Advance the checkpoint clock by the checkpoint interval.
*/
def advanceCheckpoint() = {
- checkpointClock.addToTime(checkpointInterval.milliseconds)
+ checkpointClock.advance(checkpointInterval.milliseconds)
}
}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index 8ecc2d9016..af8cd875b4 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -104,7 +104,7 @@ private[kinesis] class KinesisRecordProcessor(
logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint of ${batch.size}" +
s" records for shardId $shardId")
logDebug(s"Checkpoint: Next checkpoint is at " +
- s" ${checkpointState.checkpointClock.currentTime()} for shardId $shardId")
+ s" ${checkpointState.checkpointClock.getTimeMillis()} for shardId $shardId")
}
} catch {
case e: Throwable => {
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index f56898af02..255fe65819 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -25,8 +25,7 @@ import org.apache.spark.streaming.Milliseconds
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.TestSuiteBase
-import org.apache.spark.streaming.util.Clock
-import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.util.{ManualClock, Clock}
import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
@@ -129,45 +128,45 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
}
test("should set checkpoint time to currentTime + checkpoint interval upon instantiation") {
- when(currentClockMock.currentTime()).thenReturn(0)
+ when(currentClockMock.getTimeMillis()).thenReturn(0)
val checkpointIntervalMillis = 10
val checkpointState =
new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
- assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
+ assert(checkpointState.checkpointClock.getTimeMillis() == checkpointIntervalMillis)
- verify(currentClockMock, times(1)).currentTime()
+ verify(currentClockMock, times(1)).getTimeMillis()
}
test("should checkpoint if we have exceeded the checkpoint interval") {
- when(currentClockMock.currentTime()).thenReturn(0)
+ when(currentClockMock.getTimeMillis()).thenReturn(0)
val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock)
assert(checkpointState.shouldCheckpoint())
- verify(currentClockMock, times(1)).currentTime()
+ verify(currentClockMock, times(1)).getTimeMillis()
}
test("shouldn't checkpoint if we have not exceeded the checkpoint interval") {
- when(currentClockMock.currentTime()).thenReturn(0)
+ when(currentClockMock.getTimeMillis()).thenReturn(0)
val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock)
assert(!checkpointState.shouldCheckpoint())
- verify(currentClockMock, times(1)).currentTime()
+ verify(currentClockMock, times(1)).getTimeMillis()
}
test("should add to time when advancing checkpoint") {
- when(currentClockMock.currentTime()).thenReturn(0)
+ when(currentClockMock.getTimeMillis()).thenReturn(0)
val checkpointIntervalMillis = 10
val checkpointState =
new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
- assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
+ assert(checkpointState.checkpointClock.getTimeMillis() == checkpointIntervalMillis)
checkpointState.advanceCheckpoint()
- assert(checkpointState.checkpointClock.currentTime() == (2 * checkpointIntervalMillis))
+ assert(checkpointState.checkpointClock.getTimeMillis() == (2 * checkpointIntervalMillis))
- verify(currentClockMock, times(1)).currentTime()
+ verify(currentClockMock, times(1)).getTimeMillis()
}
test("shutdown should checkpoint if the reason is TERMINATE") {
diff --git a/mllib/src/test/java/org/apache/spark/ml/classification/JavaStreamingLogisticRegressionSuite.java b/mllib/src/test/java/org/apache/spark/ml/classification/JavaStreamingLogisticRegressionSuite.java
index ac945ba6f2..640d2ec55e 100644
--- a/mllib/src/test/java/org/apache/spark/ml/classification/JavaStreamingLogisticRegressionSuite.java
+++ b/mllib/src/test/java/org/apache/spark/ml/classification/JavaStreamingLogisticRegressionSuite.java
@@ -47,7 +47,7 @@ public class JavaStreamingLogisticRegressionSuite implements Serializable {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java
index a4dd1ac39a..899c4ea607 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java
@@ -45,7 +45,7 @@ public class JavaStreamingLinearRegressionSuite implements Serializable {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 4065a562a1..ee6229aa6b 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -148,6 +148,11 @@ object MimaExcludes {
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.mllib.linalg.VectorUDT"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.linalg.VectorUDT.serialize"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.linalg.VectorUDT.sqlType")
+ ) ++ Seq(
+ // SPARK-4682
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.RealClock"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Clock"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.TestClock")
)
case v if v.startsWith("1.2") =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 4f7db41abe..22de8c02e6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -88,7 +88,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
// Initial ignore threshold based on which old, existing files in the directory (at the time of
// starting the streaming application) will be ignored or considered
- private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.currentTime() else 0L
+ private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.getTimeMillis() else 0L
/*
* Make sure that the information of files selected in the last few batches are remembered.
@@ -161,7 +161,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
*/
private def findNewFiles(currentTime: Long): Array[String] = {
try {
- lastNewFileFindingTime = clock.currentTime()
+ lastNewFileFindingTime = clock.getTimeMillis()
// Calculate ignore threshold
val modTimeIgnoreThreshold = math.max(
@@ -174,7 +174,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
- val timeTaken = clock.currentTime() - lastNewFileFindingTime
+ val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
if (timeTaken > slideDuration.milliseconds) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 79263a7183..ee5e639b26 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -23,7 +23,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StreamBlockId
-import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
+import org.apache.spark.streaming.util.RecurringTimer
+import org.apache.spark.util.SystemClock
/** Listener object for BlockGenerator events */
private[streaming] trait BlockGeneratorListener {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index f7a8ebee8a..dcdc27d29c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -27,8 +27,8 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage._
-import org.apache.spark.streaming.util.{Clock, SystemClock, WriteAheadLogFileSegment, WriteAheadLogManager}
-import org.apache.spark.util.Utils
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogManager}
+import org.apache.spark.util.{Clock, SystemClock, Utils}
/** Trait that represents the metadata related to storage of blocks */
private[streaming] trait ReceivedBlockStoreResult {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 8632c94349..ac92774a38 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -23,7 +23,8 @@ import akka.actor.{ActorRef, Props, Actor}
import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
-import org.apache.spark.streaming.util.{Clock, ManualClock, RecurringTimer}
+import org.apache.spark.streaming.util.RecurringTimer
+import org.apache.spark.util.{Clock, ManualClock}
/** Event classes for JobGenerator */
private[scheduler] sealed trait JobGeneratorEvent
@@ -45,8 +46,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
val clock = {
val clockClass = ssc.sc.conf.get(
- "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
- Class.forName(clockClass).newInstance().asInstanceOf[Clock]
+ "spark.streaming.clock", "org.apache.spark.util.SystemClock")
+ try {
+ Class.forName(clockClass).newInstance().asInstanceOf[Clock]
+ } catch {
+ case e: ClassNotFoundException if clockClass.startsWith("org.apache.spark.streaming") =>
+ val newClockClass = clockClass.replace("org.apache.spark.streaming", "org.apache.spark")
+ Class.forName(newClockClass).newInstance().asInstanceOf[Clock]
+ }
}
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index e19ac939f9..200cf4ef4b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -27,8 +27,8 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkException, Logging, SparkConf}
import org.apache.spark.streaming.Time
-import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
-import org.apache.spark.util.Utils
+import org.apache.spark.streaming.util.WriteAheadLogManager
+import org.apache.spark.util.{Clock, Utils}
/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
private[streaming] sealed trait ReceivedBlockTrackerLogEvent
@@ -150,7 +150,7 @@ private[streaming] class ReceivedBlockTracker(
* returns only after the files are cleaned up.
*/
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
- assert(cleanupThreshTime.milliseconds < clock.currentTime())
+ assert(cleanupThreshTime.milliseconds < clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
logInfo("Deleting batches " + timesToCleanup)
writeToLog(BatchCleanupEvent(timesToCleanup))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
deleted file mode 100644
index d6d96d7ba0..0000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.util
-
-private[streaming]
-trait Clock {
- def currentTime(): Long
- def waitTillTime(targetTime: Long): Long
-}
-
-private[streaming]
-class SystemClock() extends Clock {
-
- val minPollTime = 25L
-
- def currentTime(): Long = {
- System.currentTimeMillis()
- }
-
- def waitTillTime(targetTime: Long): Long = {
- var currentTime = 0L
- currentTime = System.currentTimeMillis()
-
- var waitTime = targetTime - currentTime
- if (waitTime <= 0) {
- return currentTime
- }
-
- val pollTime = math.max(waitTime / 10.0, minPollTime).toLong
-
- while (true) {
- currentTime = System.currentTimeMillis()
- waitTime = targetTime - currentTime
- if (waitTime <= 0) {
- return currentTime
- }
- val sleepTime = math.min(waitTime, pollTime)
- Thread.sleep(sleepTime)
- }
- -1
- }
-}
-
-private[streaming]
-class ManualClock() extends Clock {
-
- private var time = 0L
-
- def currentTime() = this.synchronized {
- time
- }
-
- def setTime(timeToSet: Long) = {
- this.synchronized {
- time = timeToSet
- this.notifyAll()
- }
- }
-
- def addToTime(timeToAdd: Long) = {
- this.synchronized {
- time += timeToAdd
- this.notifyAll()
- }
- }
- def waitTillTime(targetTime: Long): Long = {
- this.synchronized {
- while (time < targetTime) {
- this.wait(100)
- }
- }
- currentTime()
- }
-}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
index 1a616a0434..c8eef833eb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.util
import org.apache.spark.Logging
+import org.apache.spark.util.{Clock, SystemClock}
private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
@@ -38,7 +39,7 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
* current system time.
*/
def getStartTime(): Long = {
- (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
+ (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period
}
/**
@@ -48,7 +49,7 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
* more than current time.
*/
def getRestartTime(originalStartTime: Long): Long = {
- val gap = clock.currentTime - originalStartTime
+ val gap = clock.getTimeMillis() - originalStartTime
(math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
index 166661b749..985ded9111 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
@@ -19,13 +19,12 @@ package org.apache.spark.streaming.util
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, SystemClock, Utils}
import WriteAheadLogManager._
/**
@@ -82,7 +81,7 @@ private[streaming] class WriteAheadLogManager(
var succeeded = false
while (!succeeded && failures < maxFailures) {
try {
- fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
+ fileSegment = getLogWriter(clock.getTimeMillis()).write(byteBuffer)
succeeded = true
} catch {
case ex: Exception =>
diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 1e24da7f5f..cfedb5a042 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -31,7 +31,7 @@ public abstract class LocalJavaStreamingContext {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index e8f4a7779e..cf191715d2 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -22,13 +22,12 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.language.existentials
import scala.reflect.ClassTag
-import util.ManualClock
-
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, WindowedDStream}
+import org.apache.spark.util.{Clock, ManualClock}
import org.apache.spark.HashPartitioner
class BasicOperationsSuite extends TestSuiteBase {
@@ -586,7 +585,7 @@ class BasicOperationsSuite extends TestSuiteBase {
for (i <- 0 until input.size) {
testServer.send(input(i).toString + "\n")
Thread.sleep(200)
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
collectRddInfo()
}
@@ -637,8 +636,8 @@ class BasicOperationsSuite extends TestSuiteBase {
ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]]
if (rememberDuration != null) ssc.remember(rememberDuration)
val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- assert(clock.currentTime() === Seconds(10).milliseconds)
+ val clock = ssc.scheduler.clock.asInstanceOf[Clock]
+ assert(clock.getTimeMillis() === Seconds(10).milliseconds)
assert(output.size === numExpectedOutput)
operatedStream
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 8f8bc61437..03c448f1df 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -32,8 +32,7 @@ import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutput
import org.scalatest.concurrent.Eventually._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
-import org.apache.spark.streaming.util.ManualClock
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, ManualClock, Utils}
/**
* This test suites tests the checkpointing functionality of DStreams -
@@ -61,7 +60,7 @@ class CheckpointSuite extends TestSuiteBase {
assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
- conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
val stateStreamCheckpointInterval = Seconds(1)
val fs = FileSystem.getLocal(new Configuration())
@@ -324,13 +323,13 @@ class CheckpointSuite extends TestSuiteBase {
* Writes a file named `i` (which contains the number `i`) to the test directory and sets its
* modification time to `clock`'s current time.
*/
- def writeFile(i: Int, clock: ManualClock): Unit = {
+ def writeFile(i: Int, clock: Clock): Unit = {
val file = new File(testDir, i.toString)
Files.write(i + "\n", file, Charsets.UTF_8)
- assert(file.setLastModified(clock.currentTime()))
+ assert(file.setLastModified(clock.getTimeMillis()))
// Check that the file's modification date is actually the value we wrote, since rounding or
// truncation will break the test:
- assert(file.lastModified() === clock.currentTime())
+ assert(file.lastModified() === clock.getTimeMillis())
}
/**
@@ -372,13 +371,13 @@ class CheckpointSuite extends TestSuiteBase {
ssc.start()
// Advance half a batch so that the first file is created after the StreamingContext starts
- clock.addToTime(batchDuration.milliseconds / 2)
+ clock.advance(batchDuration.milliseconds / 2)
// Create files and advance manual clock to process them
for (i <- Seq(1, 2, 3)) {
writeFile(i, clock)
// Advance the clock after creating the file to avoid a race when
// setting its modification time
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
if (i != 3) {
// Since we want to shut down while the 3rd batch is processing
eventually(eventuallyTimeout) {
@@ -386,7 +385,7 @@ class CheckpointSuite extends TestSuiteBase {
}
}
}
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
eventually(eventuallyTimeout) {
// Wait until all files have been recorded and all batches have started
assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3)
@@ -410,7 +409,7 @@ class CheckpointSuite extends TestSuiteBase {
writeFile(i, clock)
// Advance the clock after creating the file to avoid a race when
// setting its modification time
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
}
// Recover context from checkpoint file and verify whether the files that were
@@ -419,7 +418,7 @@ class CheckpointSuite extends TestSuiteBase {
withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
// So that the restarted StreamingContext's clock has gone forward in time since failure
ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration * 3).milliseconds.toString)
- val oldClockTime = clock.currentTime()
+ val oldClockTime = clock.getTimeMillis()
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val batchCounter = new BatchCounter(ssc)
val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
@@ -430,7 +429,7 @@ class CheckpointSuite extends TestSuiteBase {
ssc.start()
// Verify that the clock has traveled forward to the expected time
eventually(eventuallyTimeout) {
- clock.currentTime() === oldClockTime
+ clock.getTimeMillis() === oldClockTime
}
// Wait for pre-failure batch to be recomputed (3 while SSC was down plus last batch)
val numBatchesAfterRestart = 4
@@ -441,12 +440,12 @@ class CheckpointSuite extends TestSuiteBase {
writeFile(i, clock)
// Advance the clock after creating the file to avoid a race when
// setting its modification time
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
eventually(eventuallyTimeout) {
assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1)
}
}
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]"))
assert(outputStream.output.size > 0, "No files processed after restart")
ssc.stop()
@@ -521,12 +520,12 @@ class CheckpointSuite extends TestSuiteBase {
*/
def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- logInfo("Manual clock before advancing = " + clock.currentTime())
+ logInfo("Manual clock before advancing = " + clock.getTimeMillis())
for (i <- 1 to numBatches.toInt) {
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
Thread.sleep(batchDuration.milliseconds)
}
- logInfo("Manual clock after advancing = " + clock.currentTime())
+ logInfo("Manual clock after advancing = " + clock.getTimeMillis())
Thread.sleep(batchDuration.milliseconds)
val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 01084a457d..7ed6320a3d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -17,12 +17,8 @@
package org.apache.spark.streaming
-import akka.actor.Actor
-import akka.actor.Props
-import akka.util.ByteString
-
import java.io.{File, BufferedWriter, OutputStreamWriter}
-import java.net.{InetSocketAddress, SocketException, ServerSocket}
+import java.net.{SocketException, ServerSocket}
import java.nio.charset.Charset
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
import java.util.concurrent.atomic.AtomicInteger
@@ -36,9 +32,8 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.util.ManualClock
-import org.apache.spark.util.Utils
-import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
+import org.apache.spark.util.{ManualClock, Utils}
+import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.rdd.RDD
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
@@ -69,7 +64,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
for (i <- 0 until input.size) {
testServer.send(input(i).toString + "\n")
Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
}
Thread.sleep(1000)
logInfo("Stopping server")
@@ -120,19 +115,19 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Advance the clock so that the files are created after StreamingContext starts, but
// not enough to trigger a batch
- clock.addToTime(batchDuration.milliseconds / 2)
+ clock.advance(batchDuration.milliseconds / 2)
val input = Seq(1, 2, 3, 4, 5)
input.foreach { i =>
Thread.sleep(batchDuration.milliseconds)
val file = new File(testDir, i.toString)
Files.write(Array[Byte](i.toByte), file)
- assert(file.setLastModified(clock.currentTime()))
- assert(file.lastModified === clock.currentTime)
+ assert(file.setLastModified(clock.getTimeMillis()))
+ assert(file.lastModified === clock.getTimeMillis())
logInfo("Created file " + file)
// Advance the clock after creating the file to avoid a race when
// setting its modification time
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
eventually(eventuallyTimeout) {
assert(batchCounter.getNumCompletedBatches === i)
}
@@ -179,7 +174,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
while((!MultiThreadTestReceiver.haveAllThreadsFinished || output.sum < numTotalRecords) &&
System.currentTimeMillis() - startTime < 5000) {
Thread.sleep(100)
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
}
Thread.sleep(1000)
logInfo("Stopping context")
@@ -214,7 +209,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
for (i <- 0 until input.size) {
// Enqueue more than 1 item per tick but they should dequeue one at a time
inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
}
Thread.sleep(1000)
logInfo("Stopping context")
@@ -256,12 +251,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Enqueue the first 3 items (one by one), they should be merged in the next batch
val inputIterator = input.toIterator
inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
Thread.sleep(1000)
// Enqueue the remaining items (again one by one), merged in the final batch
inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
Thread.sleep(1000)
logInfo("Stopping context")
ssc.stop()
@@ -308,19 +303,19 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Advance the clock so that the files are created after StreamingContext starts, but
// not enough to trigger a batch
- clock.addToTime(batchDuration.milliseconds / 2)
+ clock.advance(batchDuration.milliseconds / 2)
// Over time, create files in the directory
val input = Seq(1, 2, 3, 4, 5)
input.foreach { i =>
val file = new File(testDir, i.toString)
Files.write(i + "\n", file, Charset.forName("UTF-8"))
- assert(file.setLastModified(clock.currentTime()))
- assert(file.lastModified === clock.currentTime)
+ assert(file.setLastModified(clock.getTimeMillis()))
+ assert(file.lastModified === clock.getTimeMillis())
logInfo("Created file " + file)
// Advance the clock after creating the file to avoid a race when
// setting its modification time
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
eventually(eventuallyTimeout) {
assert(batchCounter.getNumCompletedBatches === i)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 132ff2443f..818f551dbe 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -39,7 +39,7 @@ import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import org.apache.spark.streaming.util._
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.{AkkaUtils, ManualClock}
import WriteAheadLogBasedBlockHandler._
import WriteAheadLogSuite._
@@ -165,7 +165,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
preCleanupLogFiles.size should be > 1
// this depends on the number of blocks inserted using generateAndStoreData()
- manualClock.currentTime() shouldEqual 5000L
+ manualClock.getTimeMillis() shouldEqual 5000L
val cleanupThreshTime = 3000L
handler.cleanupOldBlocks(cleanupThreshTime)
@@ -243,7 +243,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
val blockIds = Seq.fill(blocks.size)(generateBlockId())
val storeResults = blocks.zip(blockIds).map {
case (block, id) =>
- manualClock.addToTime(500) // log rolling interval set to 1000 ms through SparkConf
+ manualClock.advance(500) // log rolling interval set to 1000 ms through SparkConf
logDebug("Inserting block " + id)
receivedBlockHandler.storeBlock(id, block)
}.toList
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index fbb7b0bfeb..a3a0fd5187 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -34,9 +34,9 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.util.{Clock, ManualClock, SystemClock, WriteAheadLogReader}
+import org.apache.spark.streaming.util.WriteAheadLogReader
import org.apache.spark.streaming.util.WriteAheadLogSuite._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
class ReceivedBlockTrackerSuite
extends FunSuite with BeforeAndAfter with Matchers with Logging {
@@ -100,7 +100,7 @@ class ReceivedBlockTrackerSuite
def incrementTime() {
val timeIncrementMillis = 2000L
- manualClock.addToTime(timeIncrementMillis)
+ manualClock.advance(timeIncrementMillis)
}
// Generate and add blocks to the given tracker
@@ -138,13 +138,13 @@ class ReceivedBlockTrackerSuite
tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
// Allocate blocks to batch and verify whether the unallocated blocks got allocated
- val batchTime1 = manualClock.currentTime
+ val batchTime1 = manualClock.getTimeMillis()
tracker2.allocateBlocksToBatch(batchTime1)
tracker2.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1
// Add more blocks and allocate to another batch
incrementTime()
- val batchTime2 = manualClock.currentTime
+ val batchTime2 = manualClock.getTimeMillis()
val blockInfos2 = addBlockInfos(tracker2)
tracker2.allocateBlocksToBatch(batchTime2)
tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 7d82c3e4aa..c2375ff65e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -31,10 +31,9 @@ import org.scalatest.concurrent.PatienceConfiguration
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
import org.apache.spark.streaming.scheduler.{StreamingListenerBatchStarted, StreamingListenerBatchCompleted, StreamingListener}
-import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.rdd.RDD
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ManualClock, Utils}
/**
* This is a input stream just for the testsuites. This is equivalent to a checkpointable,
@@ -189,10 +188,10 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
def beforeFunction() {
if (useManualClock) {
logInfo("Using manual clock")
- conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
} else {
logInfo("Using real clock")
- conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.util.SystemClock")
}
}
@@ -333,17 +332,17 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Advance manual clock
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- logInfo("Manual clock before advancing = " + clock.currentTime())
+ logInfo("Manual clock before advancing = " + clock.getTimeMillis())
if (actuallyWait) {
for (i <- 1 to numBatches) {
logInfo("Actually waiting for " + batchDuration)
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
Thread.sleep(batchDuration.milliseconds)
}
} else {
- clock.addToTime(numBatches * batchDuration.milliseconds)
+ clock.advance(numBatches * batchDuration.milliseconds)
}
- logInfo("Manual clock after advancing = " + clock.currentTime())
+ logInfo("Manual clock after advancing = " + clock.getTimeMillis())
// Wait until expected number of output items have been generated
val startTime = System.currentTimeMillis()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 7ce9499dc6..8335659667 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -26,7 +26,7 @@ import scala.language.{implicitConversions, postfixOps}
import WriteAheadLogSuite._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ManualClock, Utils}
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Eventually._
@@ -197,7 +197,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
val logFiles = getLogFilesInDirectory(testDir)
assert(logFiles.size > 1)
- manager.cleanupOldLogs(manualClock.currentTime() / 2, waitForCompletion)
+ manager.cleanupOldLogs(manualClock.getTimeMillis() / 2, waitForCompletion)
if (waitForCompletion) {
assert(getLogFilesInDirectory(testDir).size < logFiles.size)
@@ -219,7 +219,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
// Recover old files and generate a second set of log files
val dataToWrite2 = generateRandomData()
- manualClock.addToTime(100000)
+ manualClock.advance(100000)
writeDataUsingManager(testDir, dataToWrite2, manualClock)
val logFiles2 = getLogFilesInDirectory(testDir)
assert(logFiles2.size > logFiles1.size)
@@ -279,12 +279,12 @@ object WriteAheadLogSuite {
manualClock: ManualClock = new ManualClock,
stopManager: Boolean = true
): WriteAheadLogManager = {
- if (manualClock.currentTime < 100000) manualClock.setTime(10000)
+ if (manualClock.getTimeMillis() < 100000) manualClock.setTime(10000)
val manager = new WriteAheadLogManager(logDirectory, hadoopConf,
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = manualClock)
// Ensure that 500 does not get sorted after 2000, so put a high base value.
data.foreach { item =>
- manualClock.addToTime(500)
+ manualClock.advance(500)
manager.writeToLog(item)
}
if (stopManager) manager.stop()