diff options
author | mwws <wei.mao@intel.com> | 2016-05-11 10:46:58 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-05-11 10:46:58 +0100 |
commit | 33597810ec256cd9bd363bad9239cc6d5b707a6f (patch) | |
tree | cf762d6c46055351361392cb9723003964e9618f | |
parent | 8beae59144827d81491eed385dc2aa6aedd6a7b4 (diff) | |
download | spark-33597810ec256cd9bd363bad9239cc6d5b707a6f.tar.gz spark-33597810ec256cd9bd363bad9239cc6d5b707a6f.tar.bz2 spark-33597810ec256cd9bd363bad9239cc6d5b707a6f.zip |
[SPARK-14976][STREAMING] make StreamingContext.textFileStream support wildcard
## What changes were proposed in this pull request?
make StreamingContext.textFileStream support wildcard
like /home/user/*/file
## How was this patch tested?
I did manual test and added a new unit test case
Author: mwws <wei.mao@intel.com>
Author: unknown <maowei@maowei-MOBL.ccr.corp.intel.com>
Closes #12752 from mwws/SPARK_FileStream.
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala | 10 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala | 62 |
2 files changed, 70 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 36f50e04db..ed9305875c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -195,10 +195,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( ) logDebug(s"Getting new files for time $currentTime, " + s"ignoring files older than $modTimeIgnoreThreshold") - val filter = new PathFilter { + + val newFileFilter = new PathFilter { def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) } - val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) + val directoryFilter = new PathFilter { + override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory + } + val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath) + val newFiles = directories.flatMap(dir => + fs.listStatus(dir, newFileFilter).map(_.getPath.toString)) val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime logInfo("Finding new files took " + timeTaken + " ms") logDebug("# cached file times = " + fileToModTime.size) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 6b4c15f345..00d506c2f1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -198,6 +198,68 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testFileStream(newFilesOnly = false) } + test("file input stream - wildcard") { + var testDir: File = null + try { + val batchDuration = Seconds(2) + testDir = Utils.createTempDir() + val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1") + val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2") + + // Create a file that exists before the StreamingContext is created: + val existingFile = new File(testDir, "0") + Files.write("0\n", existingFile, StandardCharsets.UTF_8) + assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000) + + val pathWithWildCard = testDir.toString + "/*/" + + // Set up the streaming context and input streams + withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + clock.setTime(existingFile.lastModified + batchDuration.milliseconds) + val batchCounter = new BatchCounter(ssc) + // monitor "testDir/*/" + val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( + pathWithWildCard).map(_._2.toString) + val outputQueue = new ConcurrentLinkedQueue[Seq[String]] + val outputStream = new TestOutputStream(fileStream, outputQueue) + outputStream.register() + ssc.start() + + // Advance the clock so that the files are created after StreamingContext starts, but + // not enough to trigger a batch + clock.advance(batchDuration.milliseconds / 2) + + def createFileAndAdvenceTime(data: Int, dir: File): Unit = { + val file = new File(testSubDir1, data.toString) + Files.write(data + "\n", file, StandardCharsets.UTF_8) + assert(file.setLastModified(clock.getTimeMillis())) + assert(file.lastModified === clock.getTimeMillis()) + logInfo("Created file " + file) + // Advance the clock after creating the file to avoid a race when + // setting its modification time + clock.advance(batchDuration.milliseconds) + eventually(eventuallyTimeout) { + assert(batchCounter.getNumCompletedBatches === data) + } + } + // Over time, create files in the temp directory 1 + val input1 = Seq(1, 2, 3, 4, 5) + input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1)) + + // Over time, create files in the temp directory 1 + val input2 = Seq(6, 7, 8, 9, 10) + input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2)) + + // Verify that all the files have been read + val expectedOutput = (input1 ++ input2).map(_.toString).toSet + assert(outputQueue.asScala.flatten.toSet === expectedOutput) + } + } finally { + if (testDir != null) Utils.deleteRecursively(testDir) + } + } + test("multi-thread receiver") { // set up the test receiver val numThreads = 10 |