diff options
Diffstat (limited to 'streaming/src')
3 files changed, 21 insertions, 50 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index d6514a1fb1..b163b13a09 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -40,9 +40,6 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData - // Max attempts to try if listing files fail - val MAX_ATTEMPTS = 10 - // Latest file mod time seen till any point of time private val prevModTimeFiles = new HashSet[String]() private var prevModTime = 0L @@ -109,19 +106,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas * (new files found, latest modification time among them, files with latest modification time) */ private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = { - logDebug("Trying to get new files for time " + currentTime) - var attempts = 0 - while (attempts < MAX_ATTEMPTS) { - attempts += 1 - try { - val filter = new CustomPathFilter(currentTime) - val newFiles = fs.listStatus(path, filter).map(_.getPath.toString) - return (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq) - } catch { - case ioe: IOException => - logWarning("Attempt " + attempts + " to get new files failed", ioe) - reset() - } + try { + logDebug("Trying to get new files for time " + currentTime) + val filter = new CustomPathFilter(currentTime) + val newFiles = fs.listStatus(path, filter).map(_.getPath.toString) + return (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq) + } catch { + case e: Exception => + logError("Attempt to get new files failed", e) + reset() } (Seq.empty, -1, Seq.empty) } @@ -193,22 +186,17 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas * been seen before (i.e. the file should not be in lastModTimeFiles) */ private[streaming] - class CustomPathFilter(currentTime: Long) extends PathFilter { + class CustomPathFilter(maxModTime: Long) extends PathFilter { // Latest file mod time seen in this round of fetching files and its corresponding files var latestModTime = 0L val latestModTimeFiles = new HashSet[String]() - // Creating an RDD from a HDFS file immediately after the file is created sometime returns - // an RDD with 0 partitions. To avoid that, we introduce a slack time - files that are older - // than slack time from current time is considered for processing. - val slackTime = System.getProperty("spark.streaming.fileStream.slackTime", "2000").toLong - val maxModTime = currentTime - slackTime - def accept(path: Path): Boolean = { - if (!filter(path)) { // Reject file if it does not satisfy filter - logDebug("Rejected by filter " + path) - return false - } else { // Accept file only if + try { + if (!filter(path)) { // Reject file if it does not satisfy filter + logDebug("Rejected by filter " + path) + return false + } val modTime = fs.getFileStatus(path).getModificationTime() logDebug("Mod time for " + path + " is " + modTime) if (modTime < prevModTime) { @@ -228,8 +216,13 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas } latestModTimeFiles += path.toString logDebug("Accepted " + path) - return true + } catch { + case fnfe: java.io.FileNotFoundException => + logWarning("Error finding new files", fnfe) + reset() + return false } + return true } } } @@ -237,14 +230,4 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas private[streaming] object FileInputDStream { def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") - - // Disable slack time (i.e. set it to zero) - private[streaming] def disableSlackTime() { - System.setProperty("spark.streaming.fileStream.slackTime", "0") - } - - // Restore default value of slack time - private[streaming] def restoreSlackTime() { - System.clearProperty("spark.streaming.fileStream.slackTime") - } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 0347cc1032..4e25c9566c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -200,9 +200,6 @@ class CheckpointSuite extends TestSuiteBase { val clockProperty = System.getProperty("spark.streaming.clock") System.clearProperty("spark.streaming.clock") - // Disable slack time of file stream when testing with local file system - FileInputDStream.disableSlackTime() - // Set up the streaming context and input streams val testDir = Files.createTempDir() var ssc = new StreamingContext(master, framework, Seconds(1)) @@ -303,9 +300,6 @@ class CheckpointSuite extends TestSuiteBase { // Enable manual clock back again for other tests if (clockProperty != null) System.setProperty("spark.streaming.clock", clockProperty) - - // Restore the default slack time - FileInputDStream.restoreSlackTime() } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index e506c954ac..5fa14ad7c4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -152,9 +152,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Disable manual clock as FileInputDStream does not work with manual clock System.clearProperty("spark.streaming.clock") - // Disable slack time of file stream when testing with local file system - FileInputDStream.disableSlackTime() - // Set up the streaming context and input streams val testDir = Files.createTempDir() val ssc = new StreamingContext(master, framework, batchDuration) @@ -199,9 +196,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Enable manual clock back again for other tests System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") - - // Restore the default slack time - FileInputDStream.restoreSlackTime() } |