aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala16
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala248
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala69
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala46
6 files changed, 251 insertions, 136 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 5f13fdc557..e7c5639a63 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.dstream
import java.io.{IOException, ObjectInputStream}
+import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import scala.reflect.ClassTag
@@ -74,12 +75,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) {
+ // This is a def so that it works during checkpoint recovery:
+ private def clock = ssc.scheduler.clock
+
// Data to be saved as part of the streaming checkpoints
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
// Initial ignore threshold based on which old, existing files in the directory (at the time of
// starting the streaming application) will be ignored or considered
- private val initialModTimeIgnoreThreshold = if (newFilesOnly) System.currentTimeMillis() else 0L
+ private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.currentTime() else 0L
/*
* Make sure that the information of files selected in the last few batches are remembered.
@@ -91,8 +95,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
remember(durationToRemember)
// Map of batch-time to selected file info for the remembered batches
+ // This is a concurrent map because it's also accessed in unit tests
@transient private[streaming] var batchTimeToSelectedFiles =
- new mutable.HashMap[Time, Array[String]]
+ new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
// Set of files that were selected in the remembered batches
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()
@@ -151,7 +156,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
*/
private def findNewFiles(currentTime: Long): Array[String] = {
try {
- lastNewFileFindingTime = System.currentTimeMillis
+ lastNewFileFindingTime = clock.currentTime()
// Calculate ignore threshold
val modTimeIgnoreThreshold = math.max(
@@ -164,7 +169,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
- val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
+ val timeTaken = clock.currentTime() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
if (timeTaken > slideDuration.milliseconds) {
@@ -267,7 +272,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] ()
- batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]()
+ batchTimeToSelectedFiles =
+ new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
recentlySelectedFiles = new mutable.HashSet[String]()
fileToModTime = new TimeStampedHashMap[String, Long](true)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
index 7cd867ce34..d6d96d7ba0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
@@ -59,9 +59,11 @@ class SystemClock() extends Clock {
private[streaming]
class ManualClock() extends Clock {
- var time = 0L
+ private var time = 0L
- def currentTime() = time
+ def currentTime() = this.synchronized {
+ time
+ }
def setTime(timeToSet: Long) = {
this.synchronized {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 199f5e7161..e8f4a7779e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -638,7 +638,7 @@ class BasicOperationsSuite extends TestSuiteBase {
if (rememberDuration != null) ssc.remember(rememberDuration)
val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- assert(clock.time === Seconds(10).milliseconds)
+ assert(clock.currentTime() === Seconds(10).milliseconds)
assert(output.size === numExpectedOutput)
operatedStream
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 5d232c6ade..8f8bc61437 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -18,17 +18,18 @@
package org.apache.spark.streaming
import java.io.File
-import java.nio.charset.Charset
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.reflect.ClassTag
+import com.google.common.base.Charsets
import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
+import org.scalatest.concurrent.Eventually._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.util.ManualClock
@@ -45,8 +46,6 @@ class CheckpointSuite extends TestSuiteBase {
override def batchDuration = Milliseconds(500)
- override def actuallyWait = true // to allow checkpoints to be written
-
override def beforeFunction() {
super.beforeFunction()
Utils.deleteRecursively(new File(checkpointDir))
@@ -143,7 +142,6 @@ class CheckpointSuite extends TestSuiteBase {
ssc.start()
advanceTimeWithRealDelay(ssc, 4)
ssc.stop()
- System.clearProperty("spark.streaming.manualClock.jump")
ssc = null
}
@@ -312,109 +310,161 @@ class CheckpointSuite extends TestSuiteBase {
testCheckpointedOperation(input, operation, output, 7)
}
-
// This tests whether file input stream remembers what files were seen before
// the master failure and uses them again to process a large window operation.
// It also tests whether batches, whose processing was incomplete due to the
// failure, are re-processed or not.
test("recovery with file input stream") {
// Set up the streaming context and input streams
+ val batchDuration = Seconds(2) // Due to 1-second resolution of setLastModified() on some OS's.
val testDir = Utils.createTempDir()
- var ssc = new StreamingContext(master, framework, Seconds(1))
- ssc.checkpoint(checkpointDir)
- val fileStream = ssc.textFileStream(testDir.toString)
- // Making value 3 take large time to process, to ensure that the master
- // shuts down in the middle of processing the 3rd batch
- val mappedStream = fileStream.map(s => {
- val i = s.toInt
- if (i == 3) Thread.sleep(2000)
- i
- })
-
- // Reducing over a large window to ensure that recovery from master failure
- // requires reprocessing of all the files seen before the failure
- val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
- val outputBuffer = new ArrayBuffer[Seq[Int]]
- var outputStream = new TestOutputStream(reducedStream, outputBuffer)
- outputStream.register()
- ssc.start()
-
- // Create files and advance manual clock to process them
- // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- Thread.sleep(1000)
- for (i <- Seq(1, 2, 3)) {
- Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
- // wait to make sure that the file is written such that it gets shown in the file listings
- Thread.sleep(1000)
+ val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
+
+ /**
+ * Writes a file named `i` (which contains the number `i`) to the test directory and sets its
+ * modification time to `clock`'s current time.
+ */
+ def writeFile(i: Int, clock: ManualClock): Unit = {
+ val file = new File(testDir, i.toString)
+ Files.write(i + "\n", file, Charsets.UTF_8)
+ assert(file.setLastModified(clock.currentTime()))
+ // Check that the file's modification date is actually the value we wrote, since rounding or
+ // truncation will break the test:
+ assert(file.lastModified() === clock.currentTime())
}
- logInfo("Output = " + outputStream.output.mkString(","))
- assert(outputStream.output.size > 0, "No files processed before restart")
- ssc.stop()
- // Verify whether files created have been recorded correctly or not
- var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
- def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
- assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
-
- // Create files while the master is down
- for (i <- Seq(4, 5, 6)) {
- Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
- Thread.sleep(1000)
+ /**
+ * Returns ids that identify which files which have been recorded by the file input stream.
+ */
+ def recordedFiles(ssc: StreamingContext): Seq[Int] = {
+ val fileInputDStream =
+ ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
+ val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
+ filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
}
- // Recover context from checkpoint file and verify whether the files that were
- // recorded before failure were saved and successfully recovered
- logInfo("*********** RESTARTING ************")
- ssc = new StreamingContext(checkpointDir)
- fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
- assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+ try {
+ // This is a var because it's re-assigned when we restart from a checkpoint
+ var clock: ManualClock = null
+ withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ ssc.checkpoint(checkpointDir)
+ clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val batchCounter = new BatchCounter(ssc)
+ val fileStream = ssc.textFileStream(testDir.toString)
+ // Make value 3 take a large time to process, to ensure that the driver
+ // shuts down in the middle of processing the 3rd batch
+ CheckpointSuite.batchThreeShouldBlockIndefinitely = true
+ val mappedStream = fileStream.map(s => {
+ val i = s.toInt
+ if (i == 3) {
+ while (CheckpointSuite.batchThreeShouldBlockIndefinitely) {
+ Thread.sleep(Long.MaxValue)
+ }
+ }
+ i
+ })
+
+ // Reducing over a large window to ensure that recovery from driver failure
+ // requires reprocessing of all the files seen before the failure
+ val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
+ val outputStream = new TestOutputStream(reducedStream, outputBuffer)
+ outputStream.register()
+ ssc.start()
+
+ // Advance half a batch so that the first file is created after the StreamingContext starts
+ clock.addToTime(batchDuration.milliseconds / 2)
+ // Create files and advance manual clock to process them
+ for (i <- Seq(1, 2, 3)) {
+ writeFile(i, clock)
+ // Advance the clock after creating the file to avoid a race when
+ // setting its modification time
+ clock.addToTime(batchDuration.milliseconds)
+ if (i != 3) {
+ // Since we want to shut down while the 3rd batch is processing
+ eventually(eventuallyTimeout) {
+ assert(batchCounter.getNumCompletedBatches === i)
+ }
+ }
+ }
+ clock.addToTime(batchDuration.milliseconds)
+ eventually(eventuallyTimeout) {
+ // Wait until all files have been recorded and all batches have started
+ assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3)
+ }
+ // Wait for a checkpoint to be written
+ val fs = new Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration)
+ eventually(eventuallyTimeout) {
+ assert(Checkpoint.getCheckpointFiles(checkpointDir, fs).size === 6)
+ }
+ ssc.stop()
+ // Check that we shut down while the third batch was being processed
+ assert(batchCounter.getNumCompletedBatches === 2)
+ assert(outputStream.output.flatten === Seq(1, 3))
+ }
- // Restart stream computation
- ssc.start()
- for (i <- Seq(7, 8, 9)) {
- Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
- Thread.sleep(1000)
- }
- Thread.sleep(1000)
- logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
- assert(outputStream.output.size > 0, "No files processed after restart")
- ssc.stop()
+ // The original StreamingContext has now been stopped.
+ CheckpointSuite.batchThreeShouldBlockIndefinitely = false
- // Verify whether files created while the driver was down have been recorded or not
- assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
-
- // Verify whether new files created after recover have been recorded or not
- assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
-
- // Append the new output to the old buffer
- outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
- outputBuffer ++= outputStream.output
-
- val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
- logInfo("--------------------------------")
- logInfo("output, size = " + outputBuffer.size)
- outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("expected output, size = " + expectedOutput.size)
- expectedOutput.foreach(x => logInfo("[" + x + "]"))
- logInfo("--------------------------------")
-
- // Verify whether all the elements received are as expected
- val output = outputBuffer.flatMap(x => x)
- assert(output.contains(6)) // To ensure that the 3rd input (i.e., 3) was processed
- output.foreach(o => // To ensure all the inputs are correctly added cumulatively
- assert(expectedOutput.contains(o), "Expected value " + o + " not found")
- )
- // To ensure that all the inputs were received correctly
- assert(expectedOutput.last === output.last)
- Utils.deleteRecursively(testDir)
+ // Create files while the streaming driver is down
+ for (i <- Seq(4, 5, 6)) {
+ writeFile(i, clock)
+ // Advance the clock after creating the file to avoid a race when
+ // setting its modification time
+ clock.addToTime(batchDuration.milliseconds)
+ }
+
+ // Recover context from checkpoint file and verify whether the files that were
+ // recorded before failure were saved and successfully recovered
+ logInfo("*********** RESTARTING ************")
+ withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
+ // So that the restarted StreamingContext's clock has gone forward in time since failure
+ ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration * 3).milliseconds.toString)
+ val oldClockTime = clock.currentTime()
+ clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val batchCounter = new BatchCounter(ssc)
+ val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
+ // Check that we remember files that were recorded before the restart
+ assert(recordedFiles(ssc) === Seq(1, 2, 3))
+
+ // Restart stream computation
+ ssc.start()
+ // Verify that the clock has traveled forward to the expected time
+ eventually(eventuallyTimeout) {
+ clock.currentTime() === oldClockTime
+ }
+ // Wait for pre-failure batch to be recomputed (3 while SSC was down plus last batch)
+ val numBatchesAfterRestart = 4
+ eventually(eventuallyTimeout) {
+ assert(batchCounter.getNumCompletedBatches === numBatchesAfterRestart)
+ }
+ for ((i, index) <- Seq(7, 8, 9).zipWithIndex) {
+ writeFile(i, clock)
+ // Advance the clock after creating the file to avoid a race when
+ // setting its modification time
+ clock.addToTime(batchDuration.milliseconds)
+ eventually(eventuallyTimeout) {
+ assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1)
+ }
+ }
+ clock.addToTime(batchDuration.milliseconds)
+ logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]"))
+ assert(outputStream.output.size > 0, "No files processed after restart")
+ ssc.stop()
+
+ // Verify whether files created while the driver was down (4, 5, 6) and files created after
+ // recovery (7, 8, 9) have been recorded
+ assert(recordedFiles(ssc) === (1 to 9))
+
+ // Append the new output to the old buffer
+ outputBuffer ++= outputStream.output
+
+ // Verify whether all the elements received are as expected
+ val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
+ assert(outputBuffer.flatten.toSet === expectedOutput.toSet)
+ }
+ } finally {
+ Utils.deleteRecursively(testDir)
+ }
}
@@ -471,12 +521,12 @@ class CheckpointSuite extends TestSuiteBase {
*/
def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- logInfo("Manual clock before advancing = " + clock.time)
+ logInfo("Manual clock before advancing = " + clock.currentTime())
for (i <- 1 to numBatches.toInt) {
clock.addToTime(batchDuration.milliseconds)
Thread.sleep(batchDuration.milliseconds)
}
- logInfo("Manual clock after advancing = " + clock.time)
+ logInfo("Manual clock after advancing = " + clock.currentTime())
Thread.sleep(batchDuration.milliseconds)
val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
@@ -485,3 +535,7 @@ class CheckpointSuite extends TestSuiteBase {
outputStream.output.map(_.flatten)
}
}
+
+private object CheckpointSuite extends Serializable {
+ var batchThreeShouldBlockIndefinitely: Boolean = true
+} \ No newline at end of file
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 307052a4a9..bddf51e130 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -28,7 +28,6 @@ import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue}
-import scala.concurrent.duration._
import scala.language.postfixOps
import com.google.common.io.Files
@@ -234,45 +233,57 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
def testFileStream(newFilesOnly: Boolean) {
- var ssc: StreamingContext = null
val testDir: File = null
try {
+ val batchDuration = Seconds(2)
val testDir = Utils.createTempDir()
+ // Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
Files.write("0\n", existingFile, Charset.forName("UTF-8"))
+ assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)
- Thread.sleep(1000)
// Set up the streaming context and input streams
- val newConf = conf.clone.set(
- "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
- ssc = new StreamingContext(newConf, batchDuration)
- val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
- testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
- val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
- val outputStream = new TestOutputStream(fileStream, outputBuffer)
- outputStream.register()
- ssc.start()
-
- // Create files in the directory
- val input = Seq(1, 2, 3, 4, 5)
- input.foreach { i =>
- Thread.sleep(batchDuration.milliseconds)
- val file = new File(testDir, i.toString)
- Files.write(i + "\n", file, Charset.forName("UTF-8"))
- logInfo("Created file " + file)
- }
+ withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ // This `setTime` call ensures that the clock is past the creation time of `existingFile`
+ clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
+ val batchCounter = new BatchCounter(ssc)
+ val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
+ testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+ val outputStream = new TestOutputStream(fileStream, outputBuffer)
+ outputStream.register()
+ ssc.start()
+
+ // Advance the clock so that the files are created after StreamingContext starts, but
+ // not enough to trigger a batch
+ clock.addToTime(batchDuration.milliseconds / 2)
+
+ // Over time, create files in the directory
+ val input = Seq(1, 2, 3, 4, 5)
+ input.foreach { i =>
+ val file = new File(testDir, i.toString)
+ Files.write(i + "\n", file, Charset.forName("UTF-8"))
+ assert(file.setLastModified(clock.currentTime()))
+ assert(file.lastModified === clock.currentTime)
+ logInfo("Created file " + file)
+ // Advance the clock after creating the file to avoid a race when
+ // setting its modification time
+ clock.addToTime(batchDuration.milliseconds)
+ eventually(eventuallyTimeout) {
+ assert(batchCounter.getNumCompletedBatches === i)
+ }
+ }
- // Verify that all the files have been read
- val expectedOutput = if (newFilesOnly) {
- input.map(_.toString).toSet
- } else {
- (Seq(0) ++ input).map(_.toString).toSet
- }
- eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) {
+ // Verify that all the files have been read
+ val expectedOutput = if (newFilesOnly) {
+ input.map(_.toString).toSet
+ } else {
+ (Seq(0) ++ input).map(_.toString).toSet
+ }
assert(outputBuffer.flatten.toSet === expectedOutput)
}
} finally {
- if (ssc != null) ssc.stop()
if (testDir != null) Utils.deleteRecursively(testDir)
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 52972f63c6..7d82c3e4aa 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -21,11 +21,16 @@ import java.io.{ObjectInputStream, IOException}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.SynchronizedBuffer
+import scala.language.implicitConversions
import scala.reflect.ClassTag
import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.time.{Span, Seconds => ScalaTestSeconds}
+import org.scalatest.concurrent.Eventually.timeout
+import org.scalatest.concurrent.PatienceConfiguration
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
+import org.apache.spark.streaming.scheduler.{StreamingListenerBatchStarted, StreamingListenerBatchCompleted, StreamingListener}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.rdd.RDD
@@ -104,6 +109,40 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
}
/**
+ * An object that counts the number of started / completed batches. This is implemented using a
+ * StreamingListener. Constructing a new instance automatically registers a StreamingListener on
+ * the given StreamingContext.
+ */
+class BatchCounter(ssc: StreamingContext) {
+
+ // All access to this state should be guarded by `BatchCounter.this.synchronized`
+ private var numCompletedBatches = 0
+ private var numStartedBatches = 0
+
+ private val listener = new StreamingListener {
+ override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit =
+ BatchCounter.this.synchronized {
+ numStartedBatches += 1
+ BatchCounter.this.notifyAll()
+ }
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit =
+ BatchCounter.this.synchronized {
+ numCompletedBatches += 1
+ BatchCounter.this.notifyAll()
+ }
+ }
+ ssc.addStreamingListener(listener)
+
+ def getNumCompletedBatches: Int = this.synchronized {
+ numCompletedBatches
+ }
+
+ def getNumStartedBatches: Int = this.synchronized {
+ numStartedBatches
+ }
+}
+
+/**
* This is the base trait for Spark Streaming testsuites. This provides basic functionality
* to run user-defined set of input on user-defined stream operations, and verify the output.
*/
@@ -142,6 +181,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
.setMaster(master)
.setAppName(framework)
+ // Timeout for use in ScalaTest `eventually` blocks
+ val eventuallyTimeout: PatienceConfiguration.Timeout = timeout(Span(10, ScalaTestSeconds))
+
// Default before function for any streaming test suite. Override this
// if you want to add your stuff to "before" (i.e., don't call before { } )
def beforeFunction() {
@@ -291,7 +333,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Advance manual clock
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- logInfo("Manual clock before advancing = " + clock.time)
+ logInfo("Manual clock before advancing = " + clock.currentTime())
if (actuallyWait) {
for (i <- 1 to numBatches) {
logInfo("Actually waiting for " + batchDuration)
@@ -301,7 +343,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
} else {
clock.addToTime(numBatches * batchDuration.milliseconds)
}
- logInfo("Manual clock after advancing = " + clock.time)
+ logInfo("Manual clock after advancing = " + clock.currentTime())
// Wait until expected number of output items have been generated
val startTime = System.currentTimeMillis()