From 33597810ec256cd9bd363bad9239cc6d5b707a6f Mon Sep 17 00:00:00 2001 From: mwws Date: Wed, 11 May 2016 10:46:58 +0100 Subject: [SPARK-14976][STREAMING] make StreamingContext.textFileStream support wildcard ## What changes were proposed in this pull request? make StreamingContext.textFileStream support wildcard like /home/user/*/file ## How was this patch tested? I did manual test and added a new unit test case Author: mwws Author: unknown Closes #12752 from mwws/SPARK_FileStream. --- .../apache/spark/streaming/InputStreamsSuite.scala | 62 ++++++++++++++++++++++ 1 file changed, 62 insertions(+) (limited to 'streaming/src/test/scala') diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 6b4c15f345..00d506c2f1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -198,6 +198,68 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testFileStream(newFilesOnly = false) } + test("file input stream - wildcard") { + var testDir: File = null + try { + val batchDuration = Seconds(2) + testDir = Utils.createTempDir() + val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1") + val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2") + + // Create a file that exists before the StreamingContext is created: + val existingFile = new File(testDir, "0") + Files.write("0\n", existingFile, StandardCharsets.UTF_8) + assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000) + + val pathWithWildCard = testDir.toString + "/*/" + + // Set up the streaming context and input streams + withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + clock.setTime(existingFile.lastModified + batchDuration.milliseconds) + val batchCounter = new BatchCounter(ssc) + // monitor "testDir/*/" + val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( + pathWithWildCard).map(_._2.toString) + val outputQueue = new ConcurrentLinkedQueue[Seq[String]] + val outputStream = new TestOutputStream(fileStream, outputQueue) + outputStream.register() + ssc.start() + + // Advance the clock so that the files are created after StreamingContext starts, but + // not enough to trigger a batch + clock.advance(batchDuration.milliseconds / 2) + + def createFileAndAdvenceTime(data: Int, dir: File): Unit = { + val file = new File(testSubDir1, data.toString) + Files.write(data + "\n", file, StandardCharsets.UTF_8) + assert(file.setLastModified(clock.getTimeMillis())) + assert(file.lastModified === clock.getTimeMillis()) + logInfo("Created file " + file) + // Advance the clock after creating the file to avoid a race when + // setting its modification time + clock.advance(batchDuration.milliseconds) + eventually(eventuallyTimeout) { + assert(batchCounter.getNumCompletedBatches === data) + } + } + // Over time, create files in the temp directory 1 + val input1 = Seq(1, 2, 3, 4, 5) + input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1)) + + // Over time, create files in the temp directory 1 + val input2 = Seq(6, 7, 8, 9, 10) + input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2)) + + // Verify that all the files have been read + val expectedOutput = (input1 ++ input2).map(_.toString).toSet + assert(outputQueue.asScala.flatten.toSet === expectedOutput) + } + } finally { + if (testDir != null) Utils.deleteRecursively(testDir) + } + } + test("multi-thread receiver") { // set up the test receiver val numThreads = 10 -- cgit v1.2.3