diff options
Diffstat (limited to 'streaming/src/test')
8 files changed, 57 insertions, 65 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java index 1e24da7f5f..cfedb5a042 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -31,7 +31,7 @@ public abstract class LocalJavaStreamingContext { SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("test") - .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); ssc = new JavaStreamingContext(conf, new Duration(1000)); ssc.checkpoint("checkpoint"); } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index e8f4a7779e..cf191715d2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -22,13 +22,12 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.language.existentials import scala.reflect.ClassTag -import util.ManualClock - import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.SparkContext._ import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, WindowedDStream} +import org.apache.spark.util.{Clock, ManualClock} import org.apache.spark.HashPartitioner class BasicOperationsSuite extends TestSuiteBase { @@ -586,7 +585,7 @@ class BasicOperationsSuite extends TestSuiteBase { for (i <- 0 until input.size) { testServer.send(input(i).toString + "\n") Thread.sleep(200) - clock.addToTime(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) collectRddInfo() } @@ -637,8 +636,8 @@ class BasicOperationsSuite extends TestSuiteBase { ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]] if (rememberDuration != null) ssc.remember(rememberDuration) val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput) - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - assert(clock.currentTime() === Seconds(10).milliseconds) + val clock = ssc.scheduler.clock.asInstanceOf[Clock] + assert(clock.getTimeMillis() === Seconds(10).milliseconds) assert(output.size === numExpectedOutput) operatedStream } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 8f8bc61437..03c448f1df 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -32,8 +32,7 @@ import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutput import org.scalatest.concurrent.Eventually._ import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} -import org.apache.spark.streaming.util.ManualClock -import org.apache.spark.util.Utils +import org.apache.spark.util.{Clock, ManualClock, Utils} /** * This test suites tests the checkpointing functionality of DStreams - @@ -61,7 +60,7 @@ class CheckpointSuite extends TestSuiteBase { assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second") - conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock") val stateStreamCheckpointInterval = Seconds(1) val fs = FileSystem.getLocal(new Configuration()) @@ -324,13 +323,13 @@ class CheckpointSuite extends TestSuiteBase { * Writes a file named `i` (which contains the number `i`) to the test directory and sets its * modification time to `clock`'s current time. */ - def writeFile(i: Int, clock: ManualClock): Unit = { + def writeFile(i: Int, clock: Clock): Unit = { val file = new File(testDir, i.toString) Files.write(i + "\n", file, Charsets.UTF_8) - assert(file.setLastModified(clock.currentTime())) + assert(file.setLastModified(clock.getTimeMillis())) // Check that the file's modification date is actually the value we wrote, since rounding or // truncation will break the test: - assert(file.lastModified() === clock.currentTime()) + assert(file.lastModified() === clock.getTimeMillis()) } /** @@ -372,13 +371,13 @@ class CheckpointSuite extends TestSuiteBase { ssc.start() // Advance half a batch so that the first file is created after the StreamingContext starts - clock.addToTime(batchDuration.milliseconds / 2) + clock.advance(batchDuration.milliseconds / 2) // Create files and advance manual clock to process them for (i <- Seq(1, 2, 3)) { writeFile(i, clock) // Advance the clock after creating the file to avoid a race when // setting its modification time - clock.addToTime(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) if (i != 3) { // Since we want to shut down while the 3rd batch is processing eventually(eventuallyTimeout) { @@ -386,7 +385,7 @@ class CheckpointSuite extends TestSuiteBase { } } } - clock.addToTime(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) eventually(eventuallyTimeout) { // Wait until all files have been recorded and all batches have started assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3) @@ -410,7 +409,7 @@ class CheckpointSuite extends TestSuiteBase { writeFile(i, clock) // Advance the clock after creating the file to avoid a race when // setting its modification time - clock.addToTime(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) } // Recover context from checkpoint file and verify whether the files that were @@ -419,7 +418,7 @@ class CheckpointSuite extends TestSuiteBase { withStreamingContext(new StreamingContext(checkpointDir)) { ssc => // So that the restarted StreamingContext's clock has gone forward in time since failure ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration * 3).milliseconds.toString) - val oldClockTime = clock.currentTime() + val oldClockTime = clock.getTimeMillis() clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val batchCounter = new BatchCounter(ssc) val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] @@ -430,7 +429,7 @@ class CheckpointSuite extends TestSuiteBase { ssc.start() // Verify that the clock has traveled forward to the expected time eventually(eventuallyTimeout) { - clock.currentTime() === oldClockTime + clock.getTimeMillis() === oldClockTime } // Wait for pre-failure batch to be recomputed (3 while SSC was down plus last batch) val numBatchesAfterRestart = 4 @@ -441,12 +440,12 @@ class CheckpointSuite extends TestSuiteBase { writeFile(i, clock) // Advance the clock after creating the file to avoid a race when // setting its modification time - clock.addToTime(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) eventually(eventuallyTimeout) { assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1) } } - clock.addToTime(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]")) assert(outputStream.output.size > 0, "No files processed after restart") ssc.stop() @@ -521,12 +520,12 @@ class CheckpointSuite extends TestSuiteBase { */ def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - logInfo("Manual clock before advancing = " + clock.currentTime()) + logInfo("Manual clock before advancing = " + clock.getTimeMillis()) for (i <- 1 to numBatches.toInt) { - clock.addToTime(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) Thread.sleep(batchDuration.milliseconds) } - logInfo("Manual clock after advancing = " + clock.currentTime()) + logInfo("Manual clock after advancing = " + clock.getTimeMillis()) Thread.sleep(batchDuration.milliseconds) val outputStream = ssc.graph.getOutputStreams.filter { dstream => diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 01084a457d..7ed6320a3d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -17,12 +17,8 @@ package org.apache.spark.streaming -import akka.actor.Actor -import akka.actor.Props -import akka.util.ByteString - import java.io.{File, BufferedWriter, OutputStreamWriter} -import java.net.{InetSocketAddress, SocketException, ServerSocket} +import java.net.{SocketException, ServerSocket} import java.nio.charset.Charset import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} import java.util.concurrent.atomic.AtomicInteger @@ -36,9 +32,8 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.util.ManualClock -import org.apache.spark.util.Utils -import org.apache.spark.streaming.receiver.{ActorHelper, Receiver} +import org.apache.spark.util.{ManualClock, Utils} +import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.rdd.RDD import org.apache.hadoop.io.{Text, LongWritable} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat @@ -69,7 +64,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { for (i <- 0 until input.size) { testServer.send(input(i).toString + "\n") Thread.sleep(500) - clock.addToTime(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) } Thread.sleep(1000) logInfo("Stopping server") @@ -120,19 +115,19 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Advance the clock so that the files are created after StreamingContext starts, but // not enough to trigger a batch - clock.addToTime(batchDuration.milliseconds / 2) + clock.advance(batchDuration.milliseconds / 2) val input = Seq(1, 2, 3, 4, 5) input.foreach { i => Thread.sleep(batchDuration.milliseconds) val file = new File(testDir, i.toString) Files.write(Array[Byte](i.toByte), file) - assert(file.setLastModified(clock.currentTime())) - assert(file.lastModified === clock.currentTime) + assert(file.setLastModified(clock.getTimeMillis())) + assert(file.lastModified === clock.getTimeMillis()) logInfo("Created file " + file) // Advance the clock after creating the file to avoid a race when // setting its modification time - clock.addToTime(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) eventually(eventuallyTimeout) { assert(batchCounter.getNumCompletedBatches === i) } @@ -179,7 +174,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { while((!MultiThreadTestReceiver.haveAllThreadsFinished || output.sum < numTotalRecords) && System.currentTimeMillis() - startTime < 5000) { Thread.sleep(100) - clock.addToTime(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) } Thread.sleep(1000) logInfo("Stopping context") @@ -214,7 +209,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { for (i <- 0 until input.size) { // Enqueue more than 1 item per tick but they should dequeue one at a time inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i))) - clock.addToTime(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) } Thread.sleep(1000) logInfo("Stopping context") @@ -256,12 +251,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Enqueue the first 3 items (one by one), they should be merged in the next batch val inputIterator = input.toIterator inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i))) - clock.addToTime(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) Thread.sleep(1000) // Enqueue the remaining items (again one by one), merged in the final batch inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i))) - clock.addToTime(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) Thread.sleep(1000) logInfo("Stopping context") ssc.stop() @@ -308,19 +303,19 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Advance the clock so that the files are created after StreamingContext starts, but // not enough to trigger a batch - clock.addToTime(batchDuration.milliseconds / 2) + clock.advance(batchDuration.milliseconds / 2) // Over time, create files in the directory val input = Seq(1, 2, 3, 4, 5) input.foreach { i => val file = new File(testDir, i.toString) Files.write(i + "\n", file, Charset.forName("UTF-8")) - assert(file.setLastModified(clock.currentTime())) - assert(file.lastModified === clock.currentTime) + assert(file.setLastModified(clock.getTimeMillis())) + assert(file.lastModified === clock.getTimeMillis()) logInfo("Created file " + file) // Advance the clock after creating the file to avoid a race when // setting its modification time - clock.addToTime(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) eventually(eventuallyTimeout) { assert(batchCounter.getNumCompletedBatches === i) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 132ff2443f..818f551dbe 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -39,7 +39,7 @@ import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage._ import org.apache.spark.streaming.receiver._ import org.apache.spark.streaming.util._ -import org.apache.spark.util.AkkaUtils +import org.apache.spark.util.{AkkaUtils, ManualClock} import WriteAheadLogBasedBlockHandler._ import WriteAheadLogSuite._ @@ -165,7 +165,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche preCleanupLogFiles.size should be > 1 // this depends on the number of blocks inserted using generateAndStoreData() - manualClock.currentTime() shouldEqual 5000L + manualClock.getTimeMillis() shouldEqual 5000L val cleanupThreshTime = 3000L handler.cleanupOldBlocks(cleanupThreshTime) @@ -243,7 +243,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche val blockIds = Seq.fill(blocks.size)(generateBlockId()) val storeResults = blocks.zip(blockIds).map { case (block, id) => - manualClock.addToTime(500) // log rolling interval set to 1000 ms through SparkConf + manualClock.advance(500) // log rolling interval set to 1000 ms through SparkConf logDebug("Inserting block " + id) receivedBlockHandler.storeBlock(id, block) }.toList diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index fbb7b0bfeb..a3a0fd5187 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -34,9 +34,9 @@ import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult import org.apache.spark.streaming.scheduler._ -import org.apache.spark.streaming.util.{Clock, ManualClock, SystemClock, WriteAheadLogReader} +import org.apache.spark.streaming.util.WriteAheadLogReader import org.apache.spark.streaming.util.WriteAheadLogSuite._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} class ReceivedBlockTrackerSuite extends FunSuite with BeforeAndAfter with Matchers with Logging { @@ -100,7 +100,7 @@ class ReceivedBlockTrackerSuite def incrementTime() { val timeIncrementMillis = 2000L - manualClock.addToTime(timeIncrementMillis) + manualClock.advance(timeIncrementMillis) } // Generate and add blocks to the given tracker @@ -138,13 +138,13 @@ class ReceivedBlockTrackerSuite tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1 // Allocate blocks to batch and verify whether the unallocated blocks got allocated - val batchTime1 = manualClock.currentTime + val batchTime1 = manualClock.getTimeMillis() tracker2.allocateBlocksToBatch(batchTime1) tracker2.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1 // Add more blocks and allocate to another batch incrementTime() - val batchTime2 = manualClock.currentTime + val batchTime2 = manualClock.getTimeMillis() val blockInfos2 = addBlockInfos(tracker2) tracker2.allocateBlocksToBatch(batchTime2) tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 7d82c3e4aa..c2375ff65e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -31,10 +31,9 @@ import org.scalatest.concurrent.PatienceConfiguration import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream} import org.apache.spark.streaming.scheduler.{StreamingListenerBatchStarted, StreamingListenerBatchCompleted, StreamingListener} -import org.apache.spark.streaming.util.ManualClock import org.apache.spark.{SparkConf, Logging} import org.apache.spark.rdd.RDD -import org.apache.spark.util.Utils +import org.apache.spark.util.{ManualClock, Utils} /** * This is a input stream just for the testsuites. This is equivalent to a checkpointable, @@ -189,10 +188,10 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { def beforeFunction() { if (useManualClock) { logInfo("Using manual clock") - conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock") } else { logInfo("Using real clock") - conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") + conf.set("spark.streaming.clock", "org.apache.spark.util.SystemClock") } } @@ -333,17 +332,17 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // Advance manual clock val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - logInfo("Manual clock before advancing = " + clock.currentTime()) + logInfo("Manual clock before advancing = " + clock.getTimeMillis()) if (actuallyWait) { for (i <- 1 to numBatches) { logInfo("Actually waiting for " + batchDuration) - clock.addToTime(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) Thread.sleep(batchDuration.milliseconds) } } else { - clock.addToTime(numBatches * batchDuration.milliseconds) + clock.advance(numBatches * batchDuration.milliseconds) } - logInfo("Manual clock after advancing = " + clock.currentTime()) + logInfo("Manual clock after advancing = " + clock.getTimeMillis()) // Wait until expected number of output items have been generated val startTime = System.currentTimeMillis() diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 7ce9499dc6..8335659667 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -26,7 +26,7 @@ import scala.language.{implicitConversions, postfixOps} import WriteAheadLogSuite._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.util.Utils +import org.apache.spark.util.{ManualClock, Utils} import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.concurrent.Eventually._ @@ -197,7 +197,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { val logFiles = getLogFilesInDirectory(testDir) assert(logFiles.size > 1) - manager.cleanupOldLogs(manualClock.currentTime() / 2, waitForCompletion) + manager.cleanupOldLogs(manualClock.getTimeMillis() / 2, waitForCompletion) if (waitForCompletion) { assert(getLogFilesInDirectory(testDir).size < logFiles.size) @@ -219,7 +219,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { // Recover old files and generate a second set of log files val dataToWrite2 = generateRandomData() - manualClock.addToTime(100000) + manualClock.advance(100000) writeDataUsingManager(testDir, dataToWrite2, manualClock) val logFiles2 = getLogFilesInDirectory(testDir) assert(logFiles2.size > logFiles1.size) @@ -279,12 +279,12 @@ object WriteAheadLogSuite { manualClock: ManualClock = new ManualClock, stopManager: Boolean = true ): WriteAheadLogManager = { - if (manualClock.currentTime < 100000) manualClock.setTime(10000) + if (manualClock.getTimeMillis() < 100000) manualClock.setTime(10000) val manager = new WriteAheadLogManager(logDirectory, hadoopConf, rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = manualClock) // Ensure that 500 does not get sorted after 2000, so put a high base value. data.foreach { item => - manualClock.addToTime(500) + manualClock.advance(500) manager.writeToLog(item) } if (stopManager) manager.stop() |