From f8bb719cd212f7e7f821c3f69b897985f47a2f83 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 5 Nov 2012 17:53:56 -0800 Subject: Added a few more comments to the checkpoint-related functions. --- streaming/src/main/scala/spark/streaming/DStream.scala | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 922ff5088d..40744eac19 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -288,20 +288,27 @@ extends Serializable with Logging { * this method to save custom checkpoint data. */ protected[streaming] def updateCheckpointData(currentTime: Time) { + // Get the checkpointed RDDs from the generated RDDs val newCheckpointData = generatedRDDs.filter(_._2.getCheckpointData() != null) .map(x => (x._1, x._2.getCheckpointData())) + // Make a copy of the existing checkpoint data val oldCheckpointData = checkpointData.clone() + + // If the new checkpoint has checkpoints then replace existing with the new one if (newCheckpointData.size > 0) { checkpointData.clear() checkpointData ++= newCheckpointData } + // Make dependencies update their checkpoint data dependencies.foreach(_.updateCheckpointData(currentTime)) + // TODO: remove this, this is just for debugging newCheckpointData.foreach { case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") } } + // If old checkpoint files have been removed from checkpoint data, then remove the files if (newCheckpointData.size > 0) { (oldCheckpointData -- newCheckpointData.keySet).foreach { case (time, data) => { @@ -322,6 +329,7 @@ extends Serializable with Logging { * override the updateCheckpointData() method would also need to override this method. */ protected[streaming] def restoreCheckpointData() { + // Create RDDs from the checkpoint data logInfo("Restoring checkpoint data from " + checkpointData.size + " checkpointed RDDs") checkpointData.foreach { case(time, data) => { -- cgit v1.2.3 From fc3d0b602a08fdd182c2138506d1cd9952631f95 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 6 Nov 2012 17:23:31 -0800 Subject: Added FailureTestsuite for testing multiple, repeated master failures. --- .../main/scala/spark/streaming/Checkpoint.scala | 6 +- .../src/main/scala/spark/streaming/DStream.scala | 4 +- .../src/main/scala/spark/streaming/Scheduler.scala | 6 +- .../scala/spark/streaming/StreamingContext.scala | 17 +- .../scala/spark/streaming/CheckpointSuite.scala | 75 ++++---- .../test/scala/spark/streaming/FailureSuite.scala | 188 +++++++++++++++++++++ .../test/scala/spark/streaming/TestSuiteBase.scala | 9 +- 7 files changed, 256 insertions(+), 49 deletions(-) create mode 100644 streaming/src/test/scala/spark/streaming/FailureSuite.scala (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index 1643f45ffb..a70fb8f73a 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -32,7 +32,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val file = new Path(path, "graph") val conf = new Configuration() val fs = file.getFileSystem(conf) - logDebug("Saved checkpoint for time " + checkpointTime + " to file '" + file + "'") + logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") if (fs.exists(file)) { val bkFile = new Path(file.getParent, file.getName + ".bk") FileUtil.copy(fs, file, fs, bkFile, true, true, conf) @@ -43,7 +43,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) oos.writeObject(this) oos.close() fs.close() - logInfo("Saved checkpoint for time " + checkpointTime + " to file '" + file + "'") + logInfo("Checkpoint of streaming context for time " + checkpointTime + " saved successfully to file '" + file + "'") } def toBytes(): Array[Byte] = { @@ -58,7 +58,6 @@ object Checkpoint extends Logging { val fs = new Path(path).getFileSystem(new Configuration()) val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk")) - var detailedLog: String = "" attempts.foreach(file => { if (fs.exists(file)) { @@ -76,6 +75,7 @@ object Checkpoint extends Logging { fs.close() cp.validate() logInfo("Checkpoint successfully loaded from file '" + file + "'") + logInfo("Checkpoint was generated at time " + cp.checkpointTime) return cp } catch { case e: Exception => diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 40744eac19..73096edec5 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -288,6 +288,7 @@ extends Serializable with Logging { * this method to save custom checkpoint data. */ protected[streaming] def updateCheckpointData(currentTime: Time) { + logInfo("Updating checkpoint data for time " + currentTime) // Get the checkpointed RDDs from the generated RDDs val newCheckpointData = generatedRDDs.filter(_._2.getCheckpointData() != null) .map(x => (x._1, x._2.getCheckpointData())) @@ -319,7 +320,7 @@ extends Serializable with Logging { } } } - logInfo("Updated checkpoint data") + logInfo("Updated checkpoint data for time " + currentTime) } /** @@ -338,6 +339,7 @@ extends Serializable with Logging { } } dependencies.foreach(_.restoreCheckpointData()) + logInfo("Restored checkpoint data") } @throws(classOf[IOException]) diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index 2b3f5a4829..de0fb1f3ad 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -29,10 +29,12 @@ extends Logging { // on this first trigger time of the timer. if (ssc.isCheckpointPresent) { // If manual clock is being used for testing, then - // set manual clock to the last checkpointed time + // either set the manual clock to the last checkpointed time, + // or if the property is defined set it to that time if (clock.isInstanceOf[ManualClock]) { val lastTime = ssc.getInitialCheckpoint.checkpointTime.milliseconds - clock.asInstanceOf[ManualClock].setTime(lastTime) + val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong + clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) } timer.restart(graph.zeroTime.milliseconds) logInfo("Scheduler's timer restarted") diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 25caaf7d39..eb83aaee7a 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -18,7 +18,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import java.util.UUID -class StreamingContext ( +final class StreamingContext ( sc_ : SparkContext, cp_ : Checkpoint ) extends Logging { @@ -61,12 +61,12 @@ class StreamingContext ( } } - val nextNetworkInputStreamId = new AtomicInteger(0) - var networkInputTracker: NetworkInputTracker = null + private[streaming] val nextNetworkInputStreamId = new AtomicInteger(0) + private[streaming] var networkInputTracker: NetworkInputTracker = null private[streaming] var checkpointDir: String = { if (isCheckpointPresent) { - sc.setCheckpointDir(cp_.checkpointDir, true) + sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true) cp_.checkpointDir } else { null @@ -87,7 +87,7 @@ class StreamingContext ( def checkpoint(dir: String, interval: Time) { if (dir != null) { - sc.setCheckpointDir(new Path(dir, "rdds-" + UUID.randomUUID.toString).toString) + sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(dir)) checkpointDir = dir checkpointInterval = interval } else { @@ -227,8 +227,11 @@ class StreamingContext ( } def doCheckpoint(currentTime: Time) { + val startTime = System.currentTimeMillis() graph.updateCheckpointData(currentTime) new Checkpoint(this, currentTime).save(checkpointDir) + val stopTime = System.currentTimeMillis() + logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms") } } @@ -247,5 +250,9 @@ object StreamingContext { prefix + "-" + time.milliseconds + "." + suffix } } + + def getSparkCheckpointDir(sscCheckpointDir: String): String = { + new Path(sscCheckpointDir, UUID.randomUUID.toString).toString + } } diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index 9fdfd50be2..038827ddb0 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -52,15 +52,13 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { .checkpoint(stateStreamCheckpointInterval) .map(t => (t._1, t._2.self)) } - val ssc = setupStreams(input, operation) - val stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head + var ssc = setupStreams(input, operation) + var stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head - // Run till a time such that at least one RDD in the stream should have been checkpointed + // Run till a time such that at least one RDD in the stream should have been checkpointed, + // then check whether some RDD has been checkpointed or not ssc.start() - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - advanceClock(clock, firstNumBatches) - - // Check whether some RDD has been checkpointed or not + runStreamsWithRealDelay(ssc, firstNumBatches) logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.mkString(",\n") + "]") assert(!stateStream.checkpointData.isEmpty, "No checkpointed RDDs in state stream before first failure") stateStream.checkpointData.foreach { @@ -73,42 +71,45 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Run till a further time such that previous checkpoint files in the stream would be deleted // and check whether the earlier checkpoint files are deleted val checkpointFiles = stateStream.checkpointData.map(x => new File(x._2.toString)) - advanceClock(clock, secondNumBatches) + runStreamsWithRealDelay(ssc, secondNumBatches) checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted")) + ssc.stop() // Restart stream computation using the checkpoint file and check whether // checkpointed RDDs have been restored or not - ssc.stop() - val sscNew = new StreamingContext(checkpointDir) - val stateStreamNew = sscNew.graph.getOutputStreams().head.dependencies.head.dependencies.head - logInfo("Restored data of state stream = \n[" + stateStreamNew.generatedRDDs.mkString("\n") + "]") - assert(!stateStreamNew.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from first failure") + ssc = new StreamingContext(checkpointDir) + stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head + logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]") + assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from first failure") - // Run one batch to generate a new checkpoint file - sscNew.start() - val clockNew = sscNew.scheduler.clock.asInstanceOf[ManualClock] - advanceClock(clockNew, 1) - - // Check whether some RDD is present in the checkpoint data or not - assert(!stateStreamNew.checkpointData.isEmpty, "No checkpointed RDDs in state stream before second failure") + // Run one batch to generate a new checkpoint file and check whether some RDD + // is present in the checkpoint data or not + ssc.start() + runStreamsWithRealDelay(ssc, 1) + assert(!stateStream.checkpointData.isEmpty, "No checkpointed RDDs in state stream before second failure") stateStream.checkpointData.foreach { case (time, data) => { val file = new File(data.toString) - assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before seconds failure does not exist") + assert(file.exists(), + "Checkpoint file '" + file +"' for time " + time + " for state stream before seconds failure does not exist") } } + ssc.stop() // Restart stream computation from the new checkpoint file to see whether that file has // correct checkpoint data - sscNew.stop() - val sscNewNew = new StreamingContext(checkpointDir) - val stateStreamNewNew = sscNew.graph.getOutputStreams().head.dependencies.head.dependencies.head - logInfo("Restored data of state stream = \n[" + stateStreamNew.generatedRDDs.mkString("\n") + "]") - assert(!stateStreamNewNew.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure") - sscNewNew.start() - advanceClock(sscNewNew.scheduler.clock.asInstanceOf[ManualClock], 1) - sscNewNew.stop() + ssc = new StreamingContext(checkpointDir) + stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head + logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]") + assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure") + + // Adjust manual clock time as if it is being restarted after a delay + System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString) + ssc.start() + runStreamsWithRealDelay(ssc, 4) + ssc.stop() + System.clearProperty("spark.streaming.manualClock.jump") } test("map and reduceByKey") { @@ -123,10 +124,12 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { test("reduceByKeyAndWindowInv") { val n = 10 val w = 4 - val input = (1 to n).map(x => Seq("a")).toSeq + val input = (1 to n).map(_ => Seq("a")).toSeq val output = Seq(Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 3))) ++ (1 to (n - w + 1)).map(x => Seq(("a", 4))) val operation = (st: DStream[String]) => { - st.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, batchDuration * 4, batchDuration) + st.map(x => (x, 1)) + .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration) + .checkpoint(Seconds(2)) } for (i <- Seq(2, 3, 4)) { testCheckpointedOperation(input, operation, output, i) @@ -184,7 +187,14 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true) } - def advanceClock(clock: ManualClock, numBatches: Long) { + /** + * Advances the manual clock on the streaming scheduler by given number of batches. + * It also wait for the expected amount of time for each batch. + */ + + + def runStreamsWithRealDelay(ssc: StreamingContext, numBatches: Long) { + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] logInfo("Manual clock before advancing = " + clock.time) for (i <- 1 to numBatches.toInt) { clock.addToTime(batchDuration.milliseconds) @@ -193,4 +203,5 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { logInfo("Manual clock after advancing = " + clock.time) Thread.sleep(batchDuration.milliseconds) } + } \ No newline at end of file diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala new file mode 100644 index 0000000000..5b414117fc --- /dev/null +++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala @@ -0,0 +1,188 @@ +package spark.streaming + +import org.scalatest.BeforeAndAfter +import org.apache.commons.io.FileUtils +import java.io.File +import scala.runtime.RichInt +import scala.util.Random +import spark.streaming.StreamingContext._ +import collection.mutable.ArrayBuffer +import spark.Logging + +/** + * This testsuite tests master failures at random times while the stream is running using + * the real clock. + */ +class FailureSuite extends TestSuiteBase with BeforeAndAfter { + + before { + FileUtils.deleteDirectory(new File(checkpointDir)) + } + + after { + FailureSuite.reset() + FileUtils.deleteDirectory(new File(checkpointDir)) + } + + override def framework = "CheckpointSuite" + + override def batchDuration = Milliseconds(500) + + override def checkpointDir = "checkpoint" + + override def checkpointInterval = batchDuration + + test("multiple failures with updateStateByKey") { + val n = 30 + // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ... + val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq + // Last output: [ (a, 465) ] for n=30 + val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) ) + + val operation = (st: DStream[String]) => { + val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { + Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) + } + st.map(x => (x, 1)) + .updateStateByKey[RichInt](updateFunc) + .checkpoint(Seconds(2)) + .map(t => (t._1, t._2.self)) + } + + testOperationWithMultipleFailures(input, operation, lastOutput, n, n) + } + + test("multiple failures with reduceByKeyAndWindow") { + val n = 30 + val w = 100 + assert(w > n, "Window should be much larger than the number of input sets in this test") + // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ... + val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq + // Last output: [ (a, 465) ] + val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) ) + + val operation = (st: DStream[String]) => { + st.map(x => (x, 1)) + .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration) + .checkpoint(Seconds(2)) + } + + testOperationWithMultipleFailures(input, operation, lastOutput, n, n) + } + + + /** + * Tests stream operation with multiple master failures, and verifies whether the + * final set of output values is as expected or not. Checking the final value is + * proof that no intermediate data was lost due to master failures. + */ + def testOperationWithMultipleFailures[U: ClassManifest, V: ClassManifest]( + input: Seq[Seq[U]], + operation: DStream[U] => DStream[V], + lastExpectedOutput: Seq[V], + numBatches: Int, + numExpectedOutput: Int + ) { + var ssc = setupStreams[U, V](input, operation) + val mergedOutput = new ArrayBuffer[Seq[V]]() + + var totalTimeRan = 0L + while(totalTimeRan <= numBatches * batchDuration.milliseconds * 2) { + new KillingThread(ssc, numBatches * batchDuration.milliseconds.toInt / 4).start() + val (output, timeRan) = runStreamsWithRealClock[V](ssc, numBatches, numExpectedOutput) + + mergedOutput ++= output + totalTimeRan += timeRan + logInfo("New output = " + output) + logInfo("Merged output = " + mergedOutput) + logInfo("Total time spent = " + totalTimeRan) + val sleepTime = Random.nextInt(numBatches * batchDuration.milliseconds.toInt / 8) + logInfo( + "\n-------------------------------------------\n" + + " Restarting stream computation in " + sleepTime + " ms " + + "\n-------------------------------------------\n" + ) + Thread.sleep(sleepTime) + FailureSuite.failed = false + ssc = new StreamingContext(checkpointDir) + } + ssc.stop() + ssc = null + + // Verify whether the last output is the expected one + val lastOutput = mergedOutput(mergedOutput.lastIndexWhere(!_.isEmpty)) + assert(lastOutput.toSet === lastExpectedOutput.toSet) + logInfo("Finished computation after " + FailureSuite.failureCount + " failures") + } + + /** + * Runs the streams set up in `ssc` on real clock until the expected max number of + */ + def runStreamsWithRealClock[V: ClassManifest]( + ssc: StreamingContext, + numBatches: Int, + maxExpectedOutput: Int + ): (Seq[Seq[V]], Long) = { + + System.clearProperty("spark.streaming.clock") + + assert(numBatches > 0, "Number of batches to run stream computation is zero") + assert(maxExpectedOutput > 0, "Max expected outputs after " + numBatches + " is zero") + logInfo("numBatches = " + numBatches + ", maxExpectedOutput = " + maxExpectedOutput) + + // Get the output buffer + val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] + val output = outputStream.output + val waitTime = (batchDuration.millis * (numBatches.toDouble + 0.5)).toLong + val startTime = System.currentTimeMillis() + + try { + // Start computation + ssc.start() + + // Wait until expected number of output items have been generated + while (output.size < maxExpectedOutput && System.currentTimeMillis() - startTime < waitTime && !FailureSuite.failed) { + logInfo("output.size = " + output.size + ", maxExpectedOutput = " + maxExpectedOutput) + Thread.sleep(100) + } + } catch { + case e: Exception => logInfo("Exception while running streams: " + e) + } finally { + ssc.stop() + } + val timeTaken = System.currentTimeMillis() - startTime + logInfo("" + output.size + " sets of output generated in " + timeTaken + " ms") + (output, timeTaken) + } + + +} + +object FailureSuite { + var failed = false + var failureCount = 0 + + def reset() { + failed = false + failureCount = 0 + } +} + +class KillingThread(ssc: StreamingContext, maxKillWaitTime: Int) extends Thread with Logging { + initLogging() + + override def run() { + var minKillWaitTime = if (FailureSuite.failureCount == 0) 3000 else 1000 // to allow the first checkpoint + val killWaitTime = minKillWaitTime + Random.nextInt(maxKillWaitTime) + logInfo("Kill wait time = " + killWaitTime) + Thread.sleep(killWaitTime.toLong) + logInfo( + "\n---------------------------------------\n" + + "Killing streaming context after " + killWaitTime + " ms" + + "\n---------------------------------------\n" + ) + if (ssc != null) ssc.stop() + FailureSuite.failed = true + FailureSuite.failureCount += 1 + } +} diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala index b8c7f99603..5fb5cc504c 100644 --- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala @@ -23,12 +23,9 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[ def compute(validTime: Time): Option[RDD[T]] = { logInfo("Computing RDD for time " + validTime) val index = ((validTime - zeroTime) / slideTime - 1).toInt - val rdd = if (index < input.size) { - ssc.sc.makeRDD(input(index), numPartitions) - } else { - ssc.sc.makeRDD(Seq[T](), numPartitions) - } - logInfo("Created RDD " + rdd.id) + val selectedInput = if (index < input.size) input(index) else Seq[T]() + val rdd = ssc.sc.makeRDD(selectedInput, numPartitions) + logInfo("Created RDD " + rdd.id + " with " + selectedInput) Some(rdd) } } -- cgit v1.2.3 From cc2a65f54715ff0990d5873d50eec0dedf64d409 Mon Sep 17 00:00:00 2001 From: tdas Date: Thu, 8 Nov 2012 11:17:57 +0000 Subject: Fixed bug in InputStreamsSuite --- streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala | 2 ++ 1 file changed, 2 insertions(+) (limited to 'streaming/src') diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index c17254b809..8f892baab1 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -12,6 +12,8 @@ import org.apache.commons.io.FileUtils class InputStreamsSuite extends TestSuiteBase { + + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") test("network input stream") { // Start the server -- cgit v1.2.3