aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authormwws <wei.mao@intel.com>2016-05-11 10:46:58 +0100
committerSean Owen <sowen@cloudera.com>2016-05-11 10:46:58 +0100
commit33597810ec256cd9bd363bad9239cc6d5b707a6f (patch)
treecf762d6c46055351361392cb9723003964e9618f /streaming
parent8beae59144827d81491eed385dc2aa6aedd6a7b4 (diff)
downloadspark-33597810ec256cd9bd363bad9239cc6d5b707a6f.tar.gz
spark-33597810ec256cd9bd363bad9239cc6d5b707a6f.tar.bz2
spark-33597810ec256cd9bd363bad9239cc6d5b707a6f.zip
[SPARK-14976][STREAMING] make StreamingContext.textFileStream support wildcard
## What changes were proposed in this pull request? make StreamingContext.textFileStream support wildcard like /home/user/*/file ## How was this patch tested? I did manual test and added a new unit test case Author: mwws <wei.mao@intel.com> Author: unknown <maowei@maowei-MOBL.ccr.corp.intel.com> Closes #12752 from mwws/SPARK_FileStream.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala10
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala62
2 files changed, 70 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 36f50e04db..ed9305875c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -195,10 +195,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
)
logDebug(s"Getting new files for time $currentTime, " +
s"ignoring files older than $modTimeIgnoreThreshold")
- val filter = new PathFilter {
+
+ val newFileFilter = new PathFilter {
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
- val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
+ val directoryFilter = new PathFilter {
+ override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory
+ }
+ val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)
+ val newFiles = directories.flatMap(dir =>
+ fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 6b4c15f345..00d506c2f1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -198,6 +198,68 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testFileStream(newFilesOnly = false)
}
+ test("file input stream - wildcard") {
+ var testDir: File = null
+ try {
+ val batchDuration = Seconds(2)
+ testDir = Utils.createTempDir()
+ val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1")
+ val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2")
+
+ // Create a file that exists before the StreamingContext is created:
+ val existingFile = new File(testDir, "0")
+ Files.write("0\n", existingFile, StandardCharsets.UTF_8)
+ assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)
+
+ val pathWithWildCard = testDir.toString + "/*/"
+
+ // Set up the streaming context and input streams
+ withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
+ val batchCounter = new BatchCounter(ssc)
+ // monitor "testDir/*/"
+ val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
+ pathWithWildCard).map(_._2.toString)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+ val outputStream = new TestOutputStream(fileStream, outputQueue)
+ outputStream.register()
+ ssc.start()
+
+ // Advance the clock so that the files are created after StreamingContext starts, but
+ // not enough to trigger a batch
+ clock.advance(batchDuration.milliseconds / 2)
+
+ def createFileAndAdvenceTime(data: Int, dir: File): Unit = {
+ val file = new File(testSubDir1, data.toString)
+ Files.write(data + "\n", file, StandardCharsets.UTF_8)
+ assert(file.setLastModified(clock.getTimeMillis()))
+ assert(file.lastModified === clock.getTimeMillis())
+ logInfo("Created file " + file)
+ // Advance the clock after creating the file to avoid a race when
+ // setting its modification time
+ clock.advance(batchDuration.milliseconds)
+ eventually(eventuallyTimeout) {
+ assert(batchCounter.getNumCompletedBatches === data)
+ }
+ }
+ // Over time, create files in the temp directory 1
+ val input1 = Seq(1, 2, 3, 4, 5)
+ input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1))
+
+ // Over time, create files in the temp directory 1
+ val input2 = Seq(6, 7, 8, 9, 10)
+ input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2))
+
+ // Verify that all the files have been read
+ val expectedOutput = (input1 ++ input2).map(_.toString).toSet
+ assert(outputQueue.asScala.flatten.toSet === expectedOutput)
+ }
+ } finally {
+ if (testDir != null) Utils.deleteRecursively(testDir)
+ }
+ }
+
test("multi-thread receiver") {
// set up the test receiver
val numThreads = 10