aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-26 10:18:46 +0000
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-26 10:18:46 +0000
commitbacc65cf28b9f95b129e9adede43f684f2c5ced3 (patch)
tree2ba370be80abcb7e462e3c890b7db348b23696fd /streaming
parentd4dfab503a9222b5acf5c4bf69b91c16f298e4aa (diff)
downloadspark-bacc65cf28b9f95b129e9adede43f684f2c5ced3.tar.gz
spark-bacc65cf28b9f95b129e9adede43f684f2c5ced3.tar.bz2
spark-bacc65cf28b9f95b129e9adede43f684f2c5ced3.zip
Removed slack time in file stream and added better handling of exceptions due to failures due FileNotFound exceptions.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala59
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala6
3 files changed, 21 insertions, 50 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index d6514a1fb1..b163b13a09 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -40,9 +40,6 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
- // Max attempts to try if listing files fail
- val MAX_ATTEMPTS = 10
-
// Latest file mod time seen till any point of time
private val prevModTimeFiles = new HashSet[String]()
private var prevModTime = 0L
@@ -109,19 +106,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* (new files found, latest modification time among them, files with latest modification time)
*/
private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
- logDebug("Trying to get new files for time " + currentTime)
- var attempts = 0
- while (attempts < MAX_ATTEMPTS) {
- attempts += 1
- try {
- val filter = new CustomPathFilter(currentTime)
- val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
- return (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
- } catch {
- case ioe: IOException =>
- logWarning("Attempt " + attempts + " to get new files failed", ioe)
- reset()
- }
+ try {
+ logDebug("Trying to get new files for time " + currentTime)
+ val filter = new CustomPathFilter(currentTime)
+ val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
+ return (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
+ } catch {
+ case e: Exception =>
+ logError("Attempt to get new files failed", e)
+ reset()
}
(Seq.empty, -1, Seq.empty)
}
@@ -193,22 +186,17 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* been seen before (i.e. the file should not be in lastModTimeFiles)
*/
private[streaming]
- class CustomPathFilter(currentTime: Long) extends PathFilter {
+ class CustomPathFilter(maxModTime: Long) extends PathFilter {
// Latest file mod time seen in this round of fetching files and its corresponding files
var latestModTime = 0L
val latestModTimeFiles = new HashSet[String]()
- // Creating an RDD from a HDFS file immediately after the file is created sometime returns
- // an RDD with 0 partitions. To avoid that, we introduce a slack time - files that are older
- // than slack time from current time is considered for processing.
- val slackTime = System.getProperty("spark.streaming.fileStream.slackTime", "2000").toLong
- val maxModTime = currentTime - slackTime
-
def accept(path: Path): Boolean = {
- if (!filter(path)) { // Reject file if it does not satisfy filter
- logDebug("Rejected by filter " + path)
- return false
- } else { // Accept file only if
+ try {
+ if (!filter(path)) { // Reject file if it does not satisfy filter
+ logDebug("Rejected by filter " + path)
+ return false
+ }
val modTime = fs.getFileStatus(path).getModificationTime()
logDebug("Mod time for " + path + " is " + modTime)
if (modTime < prevModTime) {
@@ -228,8 +216,13 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}
latestModTimeFiles += path.toString
logDebug("Accepted " + path)
- return true
+ } catch {
+ case fnfe: java.io.FileNotFoundException =>
+ logWarning("Error finding new files", fnfe)
+ reset()
+ return false
}
+ return true
}
}
}
@@ -237,14 +230,4 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
private[streaming]
object FileInputDStream {
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
-
- // Disable slack time (i.e. set it to zero)
- private[streaming] def disableSlackTime() {
- System.setProperty("spark.streaming.fileStream.slackTime", "0")
- }
-
- // Restore default value of slack time
- private[streaming] def restoreSlackTime() {
- System.clearProperty("spark.streaming.fileStream.slackTime")
- }
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 0347cc1032..4e25c9566c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -200,9 +200,6 @@ class CheckpointSuite extends TestSuiteBase {
val clockProperty = System.getProperty("spark.streaming.clock")
System.clearProperty("spark.streaming.clock")
- // Disable slack time of file stream when testing with local file system
- FileInputDStream.disableSlackTime()
-
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
var ssc = new StreamingContext(master, framework, Seconds(1))
@@ -303,9 +300,6 @@ class CheckpointSuite extends TestSuiteBase {
// Enable manual clock back again for other tests
if (clockProperty != null)
System.setProperty("spark.streaming.clock", clockProperty)
-
- // Restore the default slack time
- FileInputDStream.restoreSlackTime()
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index e506c954ac..5fa14ad7c4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -152,9 +152,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Disable manual clock as FileInputDStream does not work with manual clock
System.clearProperty("spark.streaming.clock")
- // Disable slack time of file stream when testing with local file system
- FileInputDStream.disableSlackTime()
-
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
val ssc = new StreamingContext(master, framework, batchDuration)
@@ -199,9 +196,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Enable manual clock back again for other tests
System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
-
- // Restore the default slack time
- FileInputDStream.restoreSlackTime()
}