aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/scala
diff options
context:
space:
mode:
authormwws <wei.mao@intel.com>2016-05-11 10:46:58 +0100
committerSean Owen <sowen@cloudera.com>2016-05-11 10:46:58 +0100
commit33597810ec256cd9bd363bad9239cc6d5b707a6f (patch)
treecf762d6c46055351361392cb9723003964e9618f /streaming/src/test/scala
parent8beae59144827d81491eed385dc2aa6aedd6a7b4 (diff)
downloadspark-33597810ec256cd9bd363bad9239cc6d5b707a6f.tar.gz
spark-33597810ec256cd9bd363bad9239cc6d5b707a6f.tar.bz2
spark-33597810ec256cd9bd363bad9239cc6d5b707a6f.zip
[SPARK-14976][STREAMING] make StreamingContext.textFileStream support wildcard
## What changes were proposed in this pull request? make StreamingContext.textFileStream support wildcard like /home/user/*/file ## How was this patch tested? I did manual test and added a new unit test case Author: mwws <wei.mao@intel.com> Author: unknown <maowei@maowei-MOBL.ccr.corp.intel.com> Closes #12752 from mwws/SPARK_FileStream.
Diffstat (limited to 'streaming/src/test/scala')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala62
1 files changed, 62 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 6b4c15f345..00d506c2f1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -198,6 +198,68 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testFileStream(newFilesOnly = false)
}
+ test("file input stream - wildcard") {
+ var testDir: File = null
+ try {
+ val batchDuration = Seconds(2)
+ testDir = Utils.createTempDir()
+ val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1")
+ val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2")
+
+ // Create a file that exists before the StreamingContext is created:
+ val existingFile = new File(testDir, "0")
+ Files.write("0\n", existingFile, StandardCharsets.UTF_8)
+ assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)
+
+ val pathWithWildCard = testDir.toString + "/*/"
+
+ // Set up the streaming context and input streams
+ withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
+ val batchCounter = new BatchCounter(ssc)
+ // monitor "testDir/*/"
+ val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
+ pathWithWildCard).map(_._2.toString)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+ val outputStream = new TestOutputStream(fileStream, outputQueue)
+ outputStream.register()
+ ssc.start()
+
+ // Advance the clock so that the files are created after StreamingContext starts, but
+ // not enough to trigger a batch
+ clock.advance(batchDuration.milliseconds / 2)
+
+ def createFileAndAdvenceTime(data: Int, dir: File): Unit = {
+ val file = new File(testSubDir1, data.toString)
+ Files.write(data + "\n", file, StandardCharsets.UTF_8)
+ assert(file.setLastModified(clock.getTimeMillis()))
+ assert(file.lastModified === clock.getTimeMillis())
+ logInfo("Created file " + file)
+ // Advance the clock after creating the file to avoid a race when
+ // setting its modification time
+ clock.advance(batchDuration.milliseconds)
+ eventually(eventuallyTimeout) {
+ assert(batchCounter.getNumCompletedBatches === data)
+ }
+ }
+ // Over time, create files in the temp directory 1
+ val input1 = Seq(1, 2, 3, 4, 5)
+ input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1))
+
+ // Over time, create files in the temp directory 1
+ val input2 = Seq(6, 7, 8, 9, 10)
+ input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2))
+
+ // Verify that all the files have been read
+ val expectedOutput = (input1 ++ input2).map(_.toString).toSet
+ assert(outputQueue.asScala.flatten.toSet === expectedOutput)
+ }
+ } finally {
+ if (testDir != null) Utils.deleteRecursively(testDir)
+ }
+ }
+
test("multi-thread receiver") {
// set up the test receiver
val numThreads = 10