diff options
author | mwws <wei.mao@intel.com> | 2016-05-11 10:46:58 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-05-11 10:46:58 +0100 |
commit | 33597810ec256cd9bd363bad9239cc6d5b707a6f (patch) | |
tree | cf762d6c46055351361392cb9723003964e9618f /streaming/src/main | |
parent | 8beae59144827d81491eed385dc2aa6aedd6a7b4 (diff) | |
download | spark-33597810ec256cd9bd363bad9239cc6d5b707a6f.tar.gz spark-33597810ec256cd9bd363bad9239cc6d5b707a6f.tar.bz2 spark-33597810ec256cd9bd363bad9239cc6d5b707a6f.zip |
[SPARK-14976][STREAMING] make StreamingContext.textFileStream support wildcard
## What changes were proposed in this pull request?
make StreamingContext.textFileStream support wildcard
like /home/user/*/file
## How was this patch tested?
I did manual test and added a new unit test case
Author: mwws <wei.mao@intel.com>
Author: unknown <maowei@maowei-MOBL.ccr.corp.intel.com>
Closes #12752 from mwws/SPARK_FileStream.
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala | 10 |
1 files changed, 8 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 36f50e04db..ed9305875c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -195,10 +195,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( ) logDebug(s"Getting new files for time $currentTime, " + s"ignoring files older than $modTimeIgnoreThreshold") - val filter = new PathFilter { + + val newFileFilter = new PathFilter { def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) } - val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) + val directoryFilter = new PathFilter { + override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory + } + val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath) + val newFiles = directories.flatMap(dir => + fs.listStatus(dir, newFileFilter).map(_.getPath.toString)) val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime logInfo("Finding new files took " + timeTaken + " ms") logDebug("# cached file times = " + fileToModTime.size) |