aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authormwws <wei.mao@intel.com>2016-05-11 10:46:58 +0100
committerSean Owen <sowen@cloudera.com>2016-05-11 10:46:58 +0100
commit33597810ec256cd9bd363bad9239cc6d5b707a6f (patch)
treecf762d6c46055351361392cb9723003964e9618f /streaming/src/main
parent8beae59144827d81491eed385dc2aa6aedd6a7b4 (diff)
downloadspark-33597810ec256cd9bd363bad9239cc6d5b707a6f.tar.gz
spark-33597810ec256cd9bd363bad9239cc6d5b707a6f.tar.bz2
spark-33597810ec256cd9bd363bad9239cc6d5b707a6f.zip
[SPARK-14976][STREAMING] make StreamingContext.textFileStream support wildcard
## What changes were proposed in this pull request? make StreamingContext.textFileStream support wildcard like /home/user/*/file ## How was this patch tested? I did manual test and added a new unit test case Author: mwws <wei.mao@intel.com> Author: unknown <maowei@maowei-MOBL.ccr.corp.intel.com> Closes #12752 from mwws/SPARK_FileStream.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala10
1 files changed, 8 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 36f50e04db..ed9305875c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -195,10 +195,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
)
logDebug(s"Getting new files for time $currentTime, " +
s"ignoring files older than $modTimeIgnoreThreshold")
- val filter = new PathFilter {
+
+ val newFileFilter = new PathFilter {
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
- val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
+ val directoryFilter = new PathFilter {
+ override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory
+ }
+ val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)
+ val newFiles = directories.flatMap(dir =>
+ fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)