aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-02-19 15:35:23 -0800
committerAndrew Or <andrew@databricks.com>2015-02-19 15:35:23 -0800
commit34b7c35380c88569a1396fb4ed991a0bed4288e7 (patch)
treedeb25f1dd88477aff0dc19904cbdc9aedc0cc7d8 /streaming
parentad6b169dee84df175b51933b7a3ad7f0bbc52cf3 (diff)
downloadspark-34b7c35380c88569a1396fb4ed991a0bed4288e7.tar.gz
spark-34b7c35380c88569a1396fb4ed991a0bed4288e7.tar.bz2
spark-34b7c35380c88569a1396fb4ed991a0bed4288e7.zip
SPARK-4682 [CORE] Consolidate various 'Clock' classes
Another one from JoshRosen 's wish list. The first commit is much smaller and removes 2 of the 4 Clock classes. The second is much larger, necessary for consolidating the streaming one. I put together implementations in the way that seemed simplest. Almost all the change is standardizing class and method names. Author: Sean Owen <sowen@cloudera.com> Closes #4514 from srowen/SPARK-4682 and squashes the following commits: 5ed3a03 [Sean Owen] Javadoc Clock classes; make ManualClock private[spark] 169dd13 [Sean Owen] Add support for legacy org.apache.spark.streaming clock class names 277785a [Sean Owen] Reduce the net change in this patch by reversing some unnecessary syntax changes along the way b5e53df [Sean Owen] FakeClock -> ManualClock; getTime() -> getTimeMillis() 160863a [Sean Owen] Consolidate Streaming Clock class into common util Clock 7c956b2 [Sean Owen] Consolidate Clocks except for Streaming Clock
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala89
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala5
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala33
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala37
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala10
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala15
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala10
16 files changed, 82 insertions, 171 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 4f7db41abe..22de8c02e6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -88,7 +88,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
// Initial ignore threshold based on which old, existing files in the directory (at the time of
// starting the streaming application) will be ignored or considered
- private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.currentTime() else 0L
+ private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.getTimeMillis() else 0L
/*
* Make sure that the information of files selected in the last few batches are remembered.
@@ -161,7 +161,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
*/
private def findNewFiles(currentTime: Long): Array[String] = {
try {
- lastNewFileFindingTime = clock.currentTime()
+ lastNewFileFindingTime = clock.getTimeMillis()
// Calculate ignore threshold
val modTimeIgnoreThreshold = math.max(
@@ -174,7 +174,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
- val timeTaken = clock.currentTime() - lastNewFileFindingTime
+ val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
if (timeTaken > slideDuration.milliseconds) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 79263a7183..ee5e639b26 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -23,7 +23,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StreamBlockId
-import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
+import org.apache.spark.streaming.util.RecurringTimer
+import org.apache.spark.util.SystemClock
/** Listener object for BlockGenerator events */
private[streaming] trait BlockGeneratorListener {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index f7a8ebee8a..dcdc27d29c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -27,8 +27,8 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage._
-import org.apache.spark.streaming.util.{Clock, SystemClock, WriteAheadLogFileSegment, WriteAheadLogManager}
-import org.apache.spark.util.Utils
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogManager}
+import org.apache.spark.util.{Clock, SystemClock, Utils}
/** Trait that represents the metadata related to storage of blocks */
private[streaming] trait ReceivedBlockStoreResult {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 8632c94349..ac92774a38 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -23,7 +23,8 @@ import akka.actor.{ActorRef, Props, Actor}
import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
-import org.apache.spark.streaming.util.{Clock, ManualClock, RecurringTimer}
+import org.apache.spark.streaming.util.RecurringTimer
+import org.apache.spark.util.{Clock, ManualClock}
/** Event classes for JobGenerator */
private[scheduler] sealed trait JobGeneratorEvent
@@ -45,8 +46,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
val clock = {
val clockClass = ssc.sc.conf.get(
- "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
- Class.forName(clockClass).newInstance().asInstanceOf[Clock]
+ "spark.streaming.clock", "org.apache.spark.util.SystemClock")
+ try {
+ Class.forName(clockClass).newInstance().asInstanceOf[Clock]
+ } catch {
+ case e: ClassNotFoundException if clockClass.startsWith("org.apache.spark.streaming") =>
+ val newClockClass = clockClass.replace("org.apache.spark.streaming", "org.apache.spark")
+ Class.forName(newClockClass).newInstance().asInstanceOf[Clock]
+ }
}
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index e19ac939f9..200cf4ef4b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -27,8 +27,8 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkException, Logging, SparkConf}
import org.apache.spark.streaming.Time
-import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
-import org.apache.spark.util.Utils
+import org.apache.spark.streaming.util.WriteAheadLogManager
+import org.apache.spark.util.{Clock, Utils}
/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
private[streaming] sealed trait ReceivedBlockTrackerLogEvent
@@ -150,7 +150,7 @@ private[streaming] class ReceivedBlockTracker(
* returns only after the files are cleaned up.
*/
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
- assert(cleanupThreshTime.milliseconds < clock.currentTime())
+ assert(cleanupThreshTime.milliseconds < clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
logInfo("Deleting batches " + timesToCleanup)
writeToLog(BatchCleanupEvent(timesToCleanup))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
deleted file mode 100644
index d6d96d7ba0..0000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.util
-
-private[streaming]
-trait Clock {
- def currentTime(): Long
- def waitTillTime(targetTime: Long): Long
-}
-
-private[streaming]
-class SystemClock() extends Clock {
-
- val minPollTime = 25L
-
- def currentTime(): Long = {
- System.currentTimeMillis()
- }
-
- def waitTillTime(targetTime: Long): Long = {
- var currentTime = 0L
- currentTime = System.currentTimeMillis()
-
- var waitTime = targetTime - currentTime
- if (waitTime <= 0) {
- return currentTime
- }
-
- val pollTime = math.max(waitTime / 10.0, minPollTime).toLong
-
- while (true) {
- currentTime = System.currentTimeMillis()
- waitTime = targetTime - currentTime
- if (waitTime <= 0) {
- return currentTime
- }
- val sleepTime = math.min(waitTime, pollTime)
- Thread.sleep(sleepTime)
- }
- -1
- }
-}
-
-private[streaming]
-class ManualClock() extends Clock {
-
- private var time = 0L
-
- def currentTime() = this.synchronized {
- time
- }
-
- def setTime(timeToSet: Long) = {
- this.synchronized {
- time = timeToSet
- this.notifyAll()
- }
- }
-
- def addToTime(timeToAdd: Long) = {
- this.synchronized {
- time += timeToAdd
- this.notifyAll()
- }
- }
- def waitTillTime(targetTime: Long): Long = {
- this.synchronized {
- while (time < targetTime) {
- this.wait(100)
- }
- }
- currentTime()
- }
-}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
index 1a616a0434..c8eef833eb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.util
import org.apache.spark.Logging
+import org.apache.spark.util.{Clock, SystemClock}
private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
@@ -38,7 +39,7 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
* current system time.
*/
def getStartTime(): Long = {
- (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
+ (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period
}
/**
@@ -48,7 +49,7 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
* more than current time.
*/
def getRestartTime(originalStartTime: Long): Long = {
- val gap = clock.currentTime - originalStartTime
+ val gap = clock.getTimeMillis() - originalStartTime
(math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
index 166661b749..985ded9111 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
@@ -19,13 +19,12 @@ package org.apache.spark.streaming.util
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, SystemClock, Utils}
import WriteAheadLogManager._
/**
@@ -82,7 +81,7 @@ private[streaming] class WriteAheadLogManager(
var succeeded = false
while (!succeeded && failures < maxFailures) {
try {
- fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
+ fileSegment = getLogWriter(clock.getTimeMillis()).write(byteBuffer)
succeeded = true
} catch {
case ex: Exception =>
diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 1e24da7f5f..cfedb5a042 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -31,7 +31,7 @@ public abstract class LocalJavaStreamingContext {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index e8f4a7779e..cf191715d2 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -22,13 +22,12 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.language.existentials
import scala.reflect.ClassTag
-import util.ManualClock
-
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, WindowedDStream}
+import org.apache.spark.util.{Clock, ManualClock}
import org.apache.spark.HashPartitioner
class BasicOperationsSuite extends TestSuiteBase {
@@ -586,7 +585,7 @@ class BasicOperationsSuite extends TestSuiteBase {
for (i <- 0 until input.size) {
testServer.send(input(i).toString + "\n")
Thread.sleep(200)
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
collectRddInfo()
}
@@ -637,8 +636,8 @@ class BasicOperationsSuite extends TestSuiteBase {
ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]]
if (rememberDuration != null) ssc.remember(rememberDuration)
val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- assert(clock.currentTime() === Seconds(10).milliseconds)
+ val clock = ssc.scheduler.clock.asInstanceOf[Clock]
+ assert(clock.getTimeMillis() === Seconds(10).milliseconds)
assert(output.size === numExpectedOutput)
operatedStream
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 8f8bc61437..03c448f1df 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -32,8 +32,7 @@ import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutput
import org.scalatest.concurrent.Eventually._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
-import org.apache.spark.streaming.util.ManualClock
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, ManualClock, Utils}
/**
* This test suites tests the checkpointing functionality of DStreams -
@@ -61,7 +60,7 @@ class CheckpointSuite extends TestSuiteBase {
assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
- conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
val stateStreamCheckpointInterval = Seconds(1)
val fs = FileSystem.getLocal(new Configuration())
@@ -324,13 +323,13 @@ class CheckpointSuite extends TestSuiteBase {
* Writes a file named `i` (which contains the number `i`) to the test directory and sets its
* modification time to `clock`'s current time.
*/
- def writeFile(i: Int, clock: ManualClock): Unit = {
+ def writeFile(i: Int, clock: Clock): Unit = {
val file = new File(testDir, i.toString)
Files.write(i + "\n", file, Charsets.UTF_8)
- assert(file.setLastModified(clock.currentTime()))
+ assert(file.setLastModified(clock.getTimeMillis()))
// Check that the file's modification date is actually the value we wrote, since rounding or
// truncation will break the test:
- assert(file.lastModified() === clock.currentTime())
+ assert(file.lastModified() === clock.getTimeMillis())
}
/**
@@ -372,13 +371,13 @@ class CheckpointSuite extends TestSuiteBase {
ssc.start()
// Advance half a batch so that the first file is created after the StreamingContext starts
- clock.addToTime(batchDuration.milliseconds / 2)
+ clock.advance(batchDuration.milliseconds / 2)
// Create files and advance manual clock to process them
for (i <- Seq(1, 2, 3)) {
writeFile(i, clock)
// Advance the clock after creating the file to avoid a race when
// setting its modification time
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
if (i != 3) {
// Since we want to shut down while the 3rd batch is processing
eventually(eventuallyTimeout) {
@@ -386,7 +385,7 @@ class CheckpointSuite extends TestSuiteBase {
}
}
}
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
eventually(eventuallyTimeout) {
// Wait until all files have been recorded and all batches have started
assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3)
@@ -410,7 +409,7 @@ class CheckpointSuite extends TestSuiteBase {
writeFile(i, clock)
// Advance the clock after creating the file to avoid a race when
// setting its modification time
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
}
// Recover context from checkpoint file and verify whether the files that were
@@ -419,7 +418,7 @@ class CheckpointSuite extends TestSuiteBase {
withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
// So that the restarted StreamingContext's clock has gone forward in time since failure
ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration * 3).milliseconds.toString)
- val oldClockTime = clock.currentTime()
+ val oldClockTime = clock.getTimeMillis()
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val batchCounter = new BatchCounter(ssc)
val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
@@ -430,7 +429,7 @@ class CheckpointSuite extends TestSuiteBase {
ssc.start()
// Verify that the clock has traveled forward to the expected time
eventually(eventuallyTimeout) {
- clock.currentTime() === oldClockTime
+ clock.getTimeMillis() === oldClockTime
}
// Wait for pre-failure batch to be recomputed (3 while SSC was down plus last batch)
val numBatchesAfterRestart = 4
@@ -441,12 +440,12 @@ class CheckpointSuite extends TestSuiteBase {
writeFile(i, clock)
// Advance the clock after creating the file to avoid a race when
// setting its modification time
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
eventually(eventuallyTimeout) {
assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1)
}
}
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]"))
assert(outputStream.output.size > 0, "No files processed after restart")
ssc.stop()
@@ -521,12 +520,12 @@ class CheckpointSuite extends TestSuiteBase {
*/
def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- logInfo("Manual clock before advancing = " + clock.currentTime())
+ logInfo("Manual clock before advancing = " + clock.getTimeMillis())
for (i <- 1 to numBatches.toInt) {
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
Thread.sleep(batchDuration.milliseconds)
}
- logInfo("Manual clock after advancing = " + clock.currentTime())
+ logInfo("Manual clock after advancing = " + clock.getTimeMillis())
Thread.sleep(batchDuration.milliseconds)
val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 01084a457d..7ed6320a3d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -17,12 +17,8 @@
package org.apache.spark.streaming
-import akka.actor.Actor
-import akka.actor.Props
-import akka.util.ByteString
-
import java.io.{File, BufferedWriter, OutputStreamWriter}
-import java.net.{InetSocketAddress, SocketException, ServerSocket}
+import java.net.{SocketException, ServerSocket}
import java.nio.charset.Charset
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
import java.util.concurrent.atomic.AtomicInteger
@@ -36,9 +32,8 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.util.ManualClock
-import org.apache.spark.util.Utils
-import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
+import org.apache.spark.util.{ManualClock, Utils}
+import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.rdd.RDD
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
@@ -69,7 +64,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
for (i <- 0 until input.size) {
testServer.send(input(i).toString + "\n")
Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
}
Thread.sleep(1000)
logInfo("Stopping server")
@@ -120,19 +115,19 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Advance the clock so that the files are created after StreamingContext starts, but
// not enough to trigger a batch
- clock.addToTime(batchDuration.milliseconds / 2)
+ clock.advance(batchDuration.milliseconds / 2)
val input = Seq(1, 2, 3, 4, 5)
input.foreach { i =>
Thread.sleep(batchDuration.milliseconds)
val file = new File(testDir, i.toString)
Files.write(Array[Byte](i.toByte), file)
- assert(file.setLastModified(clock.currentTime()))
- assert(file.lastModified === clock.currentTime)
+ assert(file.setLastModified(clock.getTimeMillis()))
+ assert(file.lastModified === clock.getTimeMillis())
logInfo("Created file " + file)
// Advance the clock after creating the file to avoid a race when
// setting its modification time
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
eventually(eventuallyTimeout) {
assert(batchCounter.getNumCompletedBatches === i)
}
@@ -179,7 +174,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
while((!MultiThreadTestReceiver.haveAllThreadsFinished || output.sum < numTotalRecords) &&
System.currentTimeMillis() - startTime < 5000) {
Thread.sleep(100)
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
}
Thread.sleep(1000)
logInfo("Stopping context")
@@ -214,7 +209,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
for (i <- 0 until input.size) {
// Enqueue more than 1 item per tick but they should dequeue one at a time
inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
}
Thread.sleep(1000)
logInfo("Stopping context")
@@ -256,12 +251,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Enqueue the first 3 items (one by one), they should be merged in the next batch
val inputIterator = input.toIterator
inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
Thread.sleep(1000)
// Enqueue the remaining items (again one by one), merged in the final batch
inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
Thread.sleep(1000)
logInfo("Stopping context")
ssc.stop()
@@ -308,19 +303,19 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Advance the clock so that the files are created after StreamingContext starts, but
// not enough to trigger a batch
- clock.addToTime(batchDuration.milliseconds / 2)
+ clock.advance(batchDuration.milliseconds / 2)
// Over time, create files in the directory
val input = Seq(1, 2, 3, 4, 5)
input.foreach { i =>
val file = new File(testDir, i.toString)
Files.write(i + "\n", file, Charset.forName("UTF-8"))
- assert(file.setLastModified(clock.currentTime()))
- assert(file.lastModified === clock.currentTime)
+ assert(file.setLastModified(clock.getTimeMillis()))
+ assert(file.lastModified === clock.getTimeMillis())
logInfo("Created file " + file)
// Advance the clock after creating the file to avoid a race when
// setting its modification time
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
eventually(eventuallyTimeout) {
assert(batchCounter.getNumCompletedBatches === i)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 132ff2443f..818f551dbe 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -39,7 +39,7 @@ import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import org.apache.spark.streaming.util._
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.{AkkaUtils, ManualClock}
import WriteAheadLogBasedBlockHandler._
import WriteAheadLogSuite._
@@ -165,7 +165,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
preCleanupLogFiles.size should be > 1
// this depends on the number of blocks inserted using generateAndStoreData()
- manualClock.currentTime() shouldEqual 5000L
+ manualClock.getTimeMillis() shouldEqual 5000L
val cleanupThreshTime = 3000L
handler.cleanupOldBlocks(cleanupThreshTime)
@@ -243,7 +243,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
val blockIds = Seq.fill(blocks.size)(generateBlockId())
val storeResults = blocks.zip(blockIds).map {
case (block, id) =>
- manualClock.addToTime(500) // log rolling interval set to 1000 ms through SparkConf
+ manualClock.advance(500) // log rolling interval set to 1000 ms through SparkConf
logDebug("Inserting block " + id)
receivedBlockHandler.storeBlock(id, block)
}.toList
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index fbb7b0bfeb..a3a0fd5187 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -34,9 +34,9 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.util.{Clock, ManualClock, SystemClock, WriteAheadLogReader}
+import org.apache.spark.streaming.util.WriteAheadLogReader
import org.apache.spark.streaming.util.WriteAheadLogSuite._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
class ReceivedBlockTrackerSuite
extends FunSuite with BeforeAndAfter with Matchers with Logging {
@@ -100,7 +100,7 @@ class ReceivedBlockTrackerSuite
def incrementTime() {
val timeIncrementMillis = 2000L
- manualClock.addToTime(timeIncrementMillis)
+ manualClock.advance(timeIncrementMillis)
}
// Generate and add blocks to the given tracker
@@ -138,13 +138,13 @@ class ReceivedBlockTrackerSuite
tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
// Allocate blocks to batch and verify whether the unallocated blocks got allocated
- val batchTime1 = manualClock.currentTime
+ val batchTime1 = manualClock.getTimeMillis()
tracker2.allocateBlocksToBatch(batchTime1)
tracker2.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1
// Add more blocks and allocate to another batch
incrementTime()
- val batchTime2 = manualClock.currentTime
+ val batchTime2 = manualClock.getTimeMillis()
val blockInfos2 = addBlockInfos(tracker2)
tracker2.allocateBlocksToBatch(batchTime2)
tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 7d82c3e4aa..c2375ff65e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -31,10 +31,9 @@ import org.scalatest.concurrent.PatienceConfiguration
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
import org.apache.spark.streaming.scheduler.{StreamingListenerBatchStarted, StreamingListenerBatchCompleted, StreamingListener}
-import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.rdd.RDD
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ManualClock, Utils}
/**
* This is a input stream just for the testsuites. This is equivalent to a checkpointable,
@@ -189,10 +188,10 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
def beforeFunction() {
if (useManualClock) {
logInfo("Using manual clock")
- conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
} else {
logInfo("Using real clock")
- conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.util.SystemClock")
}
}
@@ -333,17 +332,17 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Advance manual clock
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- logInfo("Manual clock before advancing = " + clock.currentTime())
+ logInfo("Manual clock before advancing = " + clock.getTimeMillis())
if (actuallyWait) {
for (i <- 1 to numBatches) {
logInfo("Actually waiting for " + batchDuration)
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
Thread.sleep(batchDuration.milliseconds)
}
} else {
- clock.addToTime(numBatches * batchDuration.milliseconds)
+ clock.advance(numBatches * batchDuration.milliseconds)
}
- logInfo("Manual clock after advancing = " + clock.currentTime())
+ logInfo("Manual clock after advancing = " + clock.getTimeMillis())
// Wait until expected number of output items have been generated
val startTime = System.currentTimeMillis()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 7ce9499dc6..8335659667 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -26,7 +26,7 @@ import scala.language.{implicitConversions, postfixOps}
import WriteAheadLogSuite._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ManualClock, Utils}
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Eventually._
@@ -197,7 +197,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
val logFiles = getLogFilesInDirectory(testDir)
assert(logFiles.size > 1)
- manager.cleanupOldLogs(manualClock.currentTime() / 2, waitForCompletion)
+ manager.cleanupOldLogs(manualClock.getTimeMillis() / 2, waitForCompletion)
if (waitForCompletion) {
assert(getLogFilesInDirectory(testDir).size < logFiles.size)
@@ -219,7 +219,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
// Recover old files and generate a second set of log files
val dataToWrite2 = generateRandomData()
- manualClock.addToTime(100000)
+ manualClock.advance(100000)
writeDataUsingManager(testDir, dataToWrite2, manualClock)
val logFiles2 = getLogFilesInDirectory(testDir)
assert(logFiles2.size > logFiles1.size)
@@ -279,12 +279,12 @@ object WriteAheadLogSuite {
manualClock: ManualClock = new ManualClock,
stopManager: Boolean = true
): WriteAheadLogManager = {
- if (manualClock.currentTime < 100000) manualClock.setTime(10000)
+ if (manualClock.getTimeMillis() < 100000) manualClock.setTime(10000)
val manager = new WriteAheadLogManager(logDirectory, hadoopConf,
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = manualClock)
// Ensure that 500 does not get sorted after 2000, so put a high base value.
data.foreach { item =>
- manualClock.addToTime(500)
+ manualClock.advance(500)
manager.writeToLog(item)
}
if (stopManager) manager.stop()