aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-01-06 00:31:19 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-01-06 00:31:19 -0800
commita6394bc2c094c6c662237236c2effa2dabe67910 (patch)
treea23a9eec1f941dabdc9cf013fc5273804f10e870 /streaming
parent451546aa6d2e61e43b0c0f0669f18cfb7489e584 (diff)
downloadspark-a6394bc2c094c6c662237236c2effa2dabe67910.tar.gz
spark-a6394bc2c094c6c662237236c2effa2dabe67910.tar.bz2
spark-a6394bc2c094c6c662237236c2effa2dabe67910.zip
[SPARK-1600] Refactor FileInputStream tests to remove Thread.sleep() calls and SystemClock usage
This patch refactors Spark Streaming's FileInputStream tests to remove uses of Thread.sleep() and SystemClock, which should hopefully resolve some longstanding flakiness in these tests (see SPARK-1600). Key changes: - Modify FileInputDStream to use the scheduler's Clock instead of System.currentTimeMillis(); this allows it to be tested using ManualClock. - Fix a synchronization issue in ManualClock's `currentTime` method. - Add a StreamingTestWaiter class which allows callers to block until a certain number of batches have finished. - Change the FileInputStream tests so that files' modification times are manually set based off of ManualClock; this eliminates many Thread.sleep calls. - Update these tests to use the withStreamingContext fixture. Author: Josh Rosen <joshrosen@databricks.com> Closes #3801 from JoshRosen/SPARK-1600 and squashes the following commits: e4494f4 [Josh Rosen] Address a potential race when setting file modification times 8340bd0 [Josh Rosen] Use set comparisons for output. 0b9c252 [Josh Rosen] Fix some ManualClock usage problems. 1cc689f [Josh Rosen] ConcurrentHashMap -> SynchronizedMap db26c3a [Josh Rosen] Use standard timeout in ScalaTest `eventually` blocks. 3939432 [Josh Rosen] Rename StreamingTestWaiter to BatchCounter 0b9c3a1 [Josh Rosen] Wait for checkpoint to complete 863d71a [Josh Rosen] Remove Thread.sleep that was used to make task run slowly b4442c3 [Josh Rosen] batchTimeToSelectedFiles should be thread-safe 15b48ee [Josh Rosen] Replace several TestWaiter methods w/ ScalaTest eventually. fffc51c [Josh Rosen] Revert "Remove last remaining sleep() call" dbb8247 [Josh Rosen] Remove last remaining sleep() call 566a63f [Josh Rosen] Fix log message and comment typos da32f3f [Josh Rosen] Fix log message and comment typos 3689214 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-1600 c8f06b1 [Josh Rosen] Remove Thread.sleep calls in FileInputStream CheckpointSuite test. d4f2d87 [Josh Rosen] Refactor file input stream tests to not rely on SystemClock. dda1403 [Josh Rosen] Add StreamingTestWaiter class. 3c3efc3 [Josh Rosen] Synchronize `currentTime` in ManualClock a95ddc4 [Josh Rosen] Modify FileInputDStream to use Clock class.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala16
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala248
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala69
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala46
6 files changed, 251 insertions, 136 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 5f13fdc557..e7c5639a63 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.dstream
import java.io.{IOException, ObjectInputStream}
+import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import scala.reflect.ClassTag
@@ -74,12 +75,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) {
+ // This is a def so that it works during checkpoint recovery:
+ private def clock = ssc.scheduler.clock
+
// Data to be saved as part of the streaming checkpoints
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
// Initial ignore threshold based on which old, existing files in the directory (at the time of
// starting the streaming application) will be ignored or considered
- private val initialModTimeIgnoreThreshold = if (newFilesOnly) System.currentTimeMillis() else 0L
+ private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.currentTime() else 0L
/*
* Make sure that the information of files selected in the last few batches are remembered.
@@ -91,8 +95,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
remember(durationToRemember)
// Map of batch-time to selected file info for the remembered batches
+ // This is a concurrent map because it's also accessed in unit tests
@transient private[streaming] var batchTimeToSelectedFiles =
- new mutable.HashMap[Time, Array[String]]
+ new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
// Set of files that were selected in the remembered batches
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()
@@ -151,7 +156,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
*/
private def findNewFiles(currentTime: Long): Array[String] = {
try {
- lastNewFileFindingTime = System.currentTimeMillis
+ lastNewFileFindingTime = clock.currentTime()
// Calculate ignore threshold
val modTimeIgnoreThreshold = math.max(
@@ -164,7 +169,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
- val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
+ val timeTaken = clock.currentTime() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
if (timeTaken > slideDuration.milliseconds) {
@@ -267,7 +272,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] ()
- batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]()
+ batchTimeToSelectedFiles =
+ new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
recentlySelectedFiles = new mutable.HashSet[String]()
fileToModTime = new TimeStampedHashMap[String, Long](true)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
index 7cd867ce34..d6d96d7ba0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
@@ -59,9 +59,11 @@ class SystemClock() extends Clock {
private[streaming]
class ManualClock() extends Clock {
- var time = 0L
+ private var time = 0L
- def currentTime() = time
+ def currentTime() = this.synchronized {
+ time
+ }
def setTime(timeToSet: Long) = {
this.synchronized {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 199f5e7161..e8f4a7779e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -638,7 +638,7 @@ class BasicOperationsSuite extends TestSuiteBase {
if (rememberDuration != null) ssc.remember(rememberDuration)
val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- assert(clock.time === Seconds(10).milliseconds)
+ assert(clock.currentTime() === Seconds(10).milliseconds)
assert(output.size === numExpectedOutput)
operatedStream
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 5d232c6ade..8f8bc61437 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -18,17 +18,18 @@
package org.apache.spark.streaming
import java.io.File
-import java.nio.charset.Charset
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.reflect.ClassTag
+import com.google.common.base.Charsets
import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
+import org.scalatest.concurrent.Eventually._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.util.ManualClock
@@ -45,8 +46,6 @@ class CheckpointSuite extends TestSuiteBase {
override def batchDuration = Milliseconds(500)
- override def actuallyWait = true // to allow checkpoints to be written
-
override def beforeFunction() {
super.beforeFunction()
Utils.deleteRecursively(new File(checkpointDir))
@@ -143,7 +142,6 @@ class CheckpointSuite extends TestSuiteBase {
ssc.start()
advanceTimeWithRealDelay(ssc, 4)
ssc.stop()
- System.clearProperty("spark.streaming.manualClock.jump")
ssc = null
}
@@ -312,109 +310,161 @@ class CheckpointSuite extends TestSuiteBase {
testCheckpointedOperation(input, operation, output, 7)
}
-
// This tests whether file input stream remembers what files were seen before
// the master failure and uses them again to process a large window operation.
// It also tests whether batches, whose processing was incomplete due to the
// failure, are re-processed or not.
test("recovery with file input stream") {
// Set up the streaming context and input streams
+ val batchDuration = Seconds(2) // Due to 1-second resolution of setLastModified() on some OS's.
val testDir = Utils.createTempDir()
- var ssc = new StreamingContext(master, framework, Seconds(1))
- ssc.checkpoint(checkpointDir)
- val fileStream = ssc.textFileStream(testDir.toString)
- // Making value 3 take large time to process, to ensure that the master
- // shuts down in the middle of processing the 3rd batch
- val mappedStream = fileStream.map(s => {
- val i = s.toInt
- if (i == 3) Thread.sleep(2000)
- i
- })
-
- // Reducing over a large window to ensure that recovery from master failure
- // requires reprocessing of all the files seen before the failure
- val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
- val outputBuffer = new ArrayBuffer[Seq[Int]]
- var outputStream = new TestOutputStream(reducedStream, outputBuffer)
- outputStream.register()
- ssc.start()
-
- // Create files and advance manual clock to process them
- // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- Thread.sleep(1000)
- for (i <- Seq(1, 2, 3)) {
- Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
- // wait to make sure that the file is written such that it gets shown in the file listings
- Thread.sleep(1000)
+ val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
+
+ /**
+ * Writes a file named `i` (which contains the number `i`) to the test directory and sets its
+ * modification time to `clock`'s current time.
+ */
+ def writeFile(i: Int, clock: ManualClock): Unit = {
+ val file = new File(testDir, i.toString)
+ Files.write(i + "\n", file, Charsets.UTF_8)
+ assert(file.setLastModified(clock.currentTime()))
+ // Check that the file's modification date is actually the value we wrote, since rounding or
+ // truncation will break the test:
+ assert(file.lastModified() === clock.currentTime())
}
- logInfo("Output = " + outputStream.output.mkString(","))
- assert(outputStream.output.size > 0, "No files processed before restart")
- ssc.stop()
- // Verify whether files created have been recorded correctly or not
- var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
- def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
- assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
-
- // Create files while the master is down
- for (i <- Seq(4, 5, 6)) {
- Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
- Thread.sleep(1000)
+ /**
+ * Returns ids that identify which files which have been recorded by the file input stream.
+ */
+ def recordedFiles(ssc: StreamingContext): Seq[Int] = {
+ val fileInputDStream =
+ ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
+ val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
+ filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
}
- // Recover context from checkpoint file and verify whether the files that were
- // recorded before failure were saved and successfully recovered
- logInfo("*********** RESTARTING ************")
- ssc = new StreamingContext(checkpointDir)
- fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
- assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+ try {
+ // This is a var because it's re-assigned when we restart from a checkpoint
+ var clock: ManualClock = null
+ withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ ssc.checkpoint(checkpointDir)
+ clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val batchCounter = new BatchCounter(ssc)
+ val fileStream = ssc.textFileStream(testDir.toString)
+ // Make value 3 take a large time to process, to ensure that the driver
+ // shuts down in the middle of processing the 3rd batch
+ CheckpointSuite.batchThreeShouldBlockIndefinitely = true
+ val mappedStream = fileStream.map(s => {
+ val i = s.toInt
+ if (i == 3) {
+ while (CheckpointSuite.batchThreeShouldBlockIndefinitely) {
+ Thread.sleep(Long.MaxValue)
+ }
+ }
+ i
+ })
+
+ // Reducing over a large window to ensure that recovery from driver failure
+ // requires reprocessing of all the files seen before the failure
+ val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
+ val outputStream = new TestOutputStream(reducedStream, outputBuffer)
+ outputStream.register()
+ ssc.start()
+
+ // Advance half a batch so that the first file is created after the StreamingContext starts
+ clock.addToTime(batchDuration.milliseconds / 2)
+ // Create files and advance manual clock to process them
+ for (i <- Seq(1, 2, 3)) {
+ writeFile(i, clock)
+ // Advance the clock after creating the file to avoid a race when
+ // setting its modification time
+ clock.addToTime(batchDuration.milliseconds)
+ if (i != 3) {
+ // Since we want to shut down while the 3rd batch is processing
+ eventually(eventuallyTimeout) {
+ assert(batchCounter.getNumCompletedBatches === i)
+ }
+ }
+ }
+ clock.addToTime(batchDuration.milliseconds)
+ eventually(eventuallyTimeout) {
+ // Wait until all files have been recorded and all batches have started
+ assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3)
+ }
+ // Wait for a checkpoint to be written
+ val fs = new Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration)
+ eventually(eventuallyTimeout) {
+ assert(Checkpoint.getCheckpointFiles(checkpointDir, fs).size === 6)
+ }
+ ssc.stop()
+ // Check that we shut down while the third batch was being processed
+ assert(batchCounter.getNumCompletedBatches === 2)
+ assert(outputStream.output.flatten === Seq(1, 3))
+ }
- // Restart stream computation
- ssc.start()
- for (i <- Seq(7, 8, 9)) {
- Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
- Thread.sleep(1000)
- }
- Thread.sleep(1000)
- logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
- assert(outputStream.output.size > 0, "No files processed after restart")
- ssc.stop()
+ // The original StreamingContext has now been stopped.
+ CheckpointSuite.batchThreeShouldBlockIndefinitely = false
- // Verify whether files created while the driver was down have been recorded or not
- assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
-
- // Verify whether new files created after recover have been recorded or not
- assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
-
- // Append the new output to the old buffer
- outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
- outputBuffer ++= outputStream.output
-
- val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
- logInfo("--------------------------------")
- logInfo("output, size = " + outputBuffer.size)
- outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("expected output, size = " + expectedOutput.size)
- expectedOutput.foreach(x => logInfo("[" + x + "]"))
- logInfo("--------------------------------")
-
- // Verify whether all the elements received are as expected
- val output = outputBuffer.flatMap(x => x)
- assert(output.contains(6)) // To ensure that the 3rd input (i.e., 3) was processed
- output.foreach(o => // To ensure all the inputs are correctly added cumulatively
- assert(expectedOutput.contains(o), "Expected value " + o + " not found")
- )
- // To ensure that all the inputs were received correctly
- assert(expectedOutput.last === output.last)
- Utils.deleteRecursively(testDir)
+ // Create files while the streaming driver is down
+ for (i <- Seq(4, 5, 6)) {
+ writeFile(i, clock)
+ // Advance the clock after creating the file to avoid a race when
+ // setting its modification time
+ clock.addToTime(batchDuration.milliseconds)
+ }
+
+ // Recover context from checkpoint file and verify whether the files that were
+ // recorded before failure were saved and successfully recovered
+ logInfo("*********** RESTARTING ************")
+ withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
+ // So that the restarted StreamingContext's clock has gone forward in time since failure
+ ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration * 3).milliseconds.toString)
+ val oldClockTime = clock.currentTime()
+ clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val batchCounter = new BatchCounter(ssc)
+ val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
+ // Check that we remember files that were recorded before the restart
+ assert(recordedFiles(ssc) === Seq(1, 2, 3))
+
+ // Restart stream computation
+ ssc.start()
+ // Verify that the clock has traveled forward to the expected time
+ eventually(eventuallyTimeout) {
+ clock.currentTime() === oldClockTime
+ }
+ // Wait for pre-failure batch to be recomputed (3 while SSC was down plus last batch)
+ val numBatchesAfterRestart = 4
+ eventually(eventuallyTimeout) {
+ assert(batchCounter.getNumCompletedBatches === numBatchesAfterRestart)
+ }
+ for ((i, index) <- Seq(7, 8, 9).zipWithIndex) {
+ writeFile(i, clock)
+ // Advance the clock after creating the file to avoid a race when
+ // setting its modification time
+ clock.addToTime(batchDuration.milliseconds)
+ eventually(eventuallyTimeout) {
+ assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1)
+ }
+ }
+ clock.addToTime(batchDuration.milliseconds)
+ logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]"))
+ assert(outputStream.output.size > 0, "No files processed after restart")
+ ssc.stop()
+
+ // Verify whether files created while the driver was down (4, 5, 6) and files created after
+ // recovery (7, 8, 9) have been recorded
+ assert(recordedFiles(ssc) === (1 to 9))
+
+ // Append the new output to the old buffer
+ outputBuffer ++= outputStream.output
+
+ // Verify whether all the elements received are as expected
+ val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
+ assert(outputBuffer.flatten.toSet === expectedOutput.toSet)
+ }
+ } finally {
+ Utils.deleteRecursively(testDir)
+ }
}
@@ -471,12 +521,12 @@ class CheckpointSuite extends TestSuiteBase {
*/
def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- logInfo("Manual clock before advancing = " + clock.time)
+ logInfo("Manual clock before advancing = " + clock.currentTime())
for (i <- 1 to numBatches.toInt) {
clock.addToTime(batchDuration.milliseconds)
Thread.sleep(batchDuration.milliseconds)
}
- logInfo("Manual clock after advancing = " + clock.time)
+ logInfo("Manual clock after advancing = " + clock.currentTime())
Thread.sleep(batchDuration.milliseconds)
val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
@@ -485,3 +535,7 @@ class CheckpointSuite extends TestSuiteBase {
outputStream.output.map(_.flatten)
}
}
+
+private object CheckpointSuite extends Serializable {
+ var batchThreeShouldBlockIndefinitely: Boolean = true
+} \ No newline at end of file
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 307052a4a9..bddf51e130 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -28,7 +28,6 @@ import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue}
-import scala.concurrent.duration._
import scala.language.postfixOps
import com.google.common.io.Files
@@ -234,45 +233,57 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
def testFileStream(newFilesOnly: Boolean) {
- var ssc: StreamingContext = null
val testDir: File = null
try {
+ val batchDuration = Seconds(2)
val testDir = Utils.createTempDir()
+ // Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
Files.write("0\n", existingFile, Charset.forName("UTF-8"))
+ assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)
- Thread.sleep(1000)
// Set up the streaming context and input streams
- val newConf = conf.clone.set(
- "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
- ssc = new StreamingContext(newConf, batchDuration)
- val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
- testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
- val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
- val outputStream = new TestOutputStream(fileStream, outputBuffer)
- outputStream.register()
- ssc.start()
-
- // Create files in the directory
- val input = Seq(1, 2, 3, 4, 5)
- input.foreach { i =>
- Thread.sleep(batchDuration.milliseconds)
- val file = new File(testDir, i.toString)
- Files.write(i + "\n", file, Charset.forName("UTF-8"))
- logInfo("Created file " + file)
- }
+ withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ // This `setTime` call ensures that the clock is past the creation time of `existingFile`
+ clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
+ val batchCounter = new BatchCounter(ssc)
+ val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
+ testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+ val outputStream = new TestOutputStream(fileStream, outputBuffer)
+ outputStream.register()
+ ssc.start()
+
+ // Advance the clock so that the files are created after StreamingContext starts, but
+ // not enough to trigger a batch
+ clock.addToTime(batchDuration.milliseconds / 2)
+
+ // Over time, create files in the directory
+ val input = Seq(1, 2, 3, 4, 5)
+ input.foreach { i =>
+ val file = new File(testDir, i.toString)
+ Files.write(i + "\n", file, Charset.forName("UTF-8"))
+ assert(file.setLastModified(clock.currentTime()))
+ assert(file.lastModified === clock.currentTime)
+ logInfo("Created file " + file)
+ // Advance the clock after creating the file to avoid a race when
+ // setting its modification time
+ clock.addToTime(batchDuration.milliseconds)
+ eventually(eventuallyTimeout) {
+ assert(batchCounter.getNumCompletedBatches === i)
+ }
+ }
- // Verify that all the files have been read
- val expectedOutput = if (newFilesOnly) {
- input.map(_.toString).toSet
- } else {
- (Seq(0) ++ input).map(_.toString).toSet
- }
- eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) {
+ // Verify that all the files have been read
+ val expectedOutput = if (newFilesOnly) {
+ input.map(_.toString).toSet
+ } else {
+ (Seq(0) ++ input).map(_.toString).toSet
+ }
assert(outputBuffer.flatten.toSet === expectedOutput)
}
} finally {
- if (ssc != null) ssc.stop()
if (testDir != null) Utils.deleteRecursively(testDir)
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 52972f63c6..7d82c3e4aa 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -21,11 +21,16 @@ import java.io.{ObjectInputStream, IOException}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.SynchronizedBuffer
+import scala.language.implicitConversions
import scala.reflect.ClassTag
import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.time.{Span, Seconds => ScalaTestSeconds}
+import org.scalatest.concurrent.Eventually.timeout
+import org.scalatest.concurrent.PatienceConfiguration
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
+import org.apache.spark.streaming.scheduler.{StreamingListenerBatchStarted, StreamingListenerBatchCompleted, StreamingListener}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.rdd.RDD
@@ -104,6 +109,40 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
}
/**
+ * An object that counts the number of started / completed batches. This is implemented using a
+ * StreamingListener. Constructing a new instance automatically registers a StreamingListener on
+ * the given StreamingContext.
+ */
+class BatchCounter(ssc: StreamingContext) {
+
+ // All access to this state should be guarded by `BatchCounter.this.synchronized`
+ private var numCompletedBatches = 0
+ private var numStartedBatches = 0
+
+ private val listener = new StreamingListener {
+ override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit =
+ BatchCounter.this.synchronized {
+ numStartedBatches += 1
+ BatchCounter.this.notifyAll()
+ }
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit =
+ BatchCounter.this.synchronized {
+ numCompletedBatches += 1
+ BatchCounter.this.notifyAll()
+ }
+ }
+ ssc.addStreamingListener(listener)
+
+ def getNumCompletedBatches: Int = this.synchronized {
+ numCompletedBatches
+ }
+
+ def getNumStartedBatches: Int = this.synchronized {
+ numStartedBatches
+ }
+}
+
+/**
* This is the base trait for Spark Streaming testsuites. This provides basic functionality
* to run user-defined set of input on user-defined stream operations, and verify the output.
*/
@@ -142,6 +181,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
.setMaster(master)
.setAppName(framework)
+ // Timeout for use in ScalaTest `eventually` blocks
+ val eventuallyTimeout: PatienceConfiguration.Timeout = timeout(Span(10, ScalaTestSeconds))
+
// Default before function for any streaming test suite. Override this
// if you want to add your stuff to "before" (i.e., don't call before { } )
def beforeFunction() {
@@ -291,7 +333,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Advance manual clock
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- logInfo("Manual clock before advancing = " + clock.time)
+ logInfo("Manual clock before advancing = " + clock.currentTime())
if (actuallyWait) {
for (i <- 1 to numBatches) {
logInfo("Actually waiting for " + batchDuration)
@@ -301,7 +343,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
} else {
clock.addToTime(numBatches * batchDuration.milliseconds)
}
- logInfo("Manual clock after advancing = " + clock.time)
+ logInfo("Manual clock after advancing = " + clock.currentTime())
// Wait until expected number of output items have been generated
val startTime = System.currentTimeMillis()