aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test/scala/org/apache')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala62
1 files changed, 62 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 6b4c15f345..00d506c2f1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -198,6 +198,68 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testFileStream(newFilesOnly = false)
}
+ test("file input stream - wildcard") {
+ var testDir: File = null
+ try {
+ val batchDuration = Seconds(2)
+ testDir = Utils.createTempDir()
+ val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1")
+ val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2")
+
+ // Create a file that exists before the StreamingContext is created:
+ val existingFile = new File(testDir, "0")
+ Files.write("0\n", existingFile, StandardCharsets.UTF_8)
+ assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)
+
+ val pathWithWildCard = testDir.toString + "/*/"
+
+ // Set up the streaming context and input streams
+ withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
+ val batchCounter = new BatchCounter(ssc)
+ // monitor "testDir/*/"
+ val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
+ pathWithWildCard).map(_._2.toString)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+ val outputStream = new TestOutputStream(fileStream, outputQueue)
+ outputStream.register()
+ ssc.start()
+
+ // Advance the clock so that the files are created after StreamingContext starts, but
+ // not enough to trigger a batch
+ clock.advance(batchDuration.milliseconds / 2)
+
+ def createFileAndAdvenceTime(data: Int, dir: File): Unit = {
+ val file = new File(testSubDir1, data.toString)
+ Files.write(data + "\n", file, StandardCharsets.UTF_8)
+ assert(file.setLastModified(clock.getTimeMillis()))
+ assert(file.lastModified === clock.getTimeMillis())
+ logInfo("Created file " + file)
+ // Advance the clock after creating the file to avoid a race when
+ // setting its modification time
+ clock.advance(batchDuration.milliseconds)
+ eventually(eventuallyTimeout) {
+ assert(batchCounter.getNumCompletedBatches === data)
+ }
+ }
+ // Over time, create files in the temp directory 1
+ val input1 = Seq(1, 2, 3, 4, 5)
+ input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1))
+
+ // Over time, create files in the temp directory 1
+ val input2 = Seq(6, 7, 8, 9, 10)
+ input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2))
+
+ // Verify that all the files have been read
+ val expectedOutput = (input1 ++ input2).map(_.toString).toSet
+ assert(outputQueue.asScala.flatten.toSet === expectedOutput)
+ }
+ } finally {
+ if (testDir != null) Utils.deleteRecursively(testDir)
+ }
+ }
+
test("multi-thread receiver") {
// set up the test receiver
val numThreads = 10