aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-23 23:48:43 +0000
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-23 23:48:43 +0000
commit19d1d58b67a767b227e009ab8723efaa7087dd07 (patch)
treeb075af9a176d58a49f925902d6e599429a6c81d4 /streaming
parente7b62cbfbfdb8fda880548bce4249672c6a0a851 (diff)
downloadspark-19d1d58b67a767b227e009ab8723efaa7087dd07.tar.gz
spark-19d1d58b67a767b227e009ab8723efaa7087dd07.tar.bz2
spark-19d1d58b67a767b227e009ab8723efaa7087dd07.zip
Fixed bug in file stream that prevented some files from being read
correctly.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala21
1 files changed, 12 insertions, 9 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index a618a709a7..fb52bcfb67 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -128,10 +128,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
/** Generate one RDD from an array of files */
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
- new UnionRDD(
- context.sparkContext,
- files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
- )
+ val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
+ new UnionRDD(context.sparkContext, fileRDDs)
}
private def path: Path = {
@@ -191,9 +189,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}
/**
- * PathFilter to find new files that have modification timestamps <= current time, but have not
+ * Custom PathFilter class to find new files that have modification timestamps <= current time, but have not
* been seen before (i.e. the file should not be in lastModTimeFiles)
- * @param currentTime
*/
private[streaming]
class CustomPathFilter(currentTime: Long) extends PathFilter() {
@@ -201,6 +198,12 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
var latestModTime = 0L
val latestModTimeFiles = new HashSet[String]()
+ // Creating an RDD from a HDFS file immediately after the file is created sometime returns
+ // an RDD with 0 partitions. To avoid that, we introduce a slack time - files that are older
+ // than slack time from current time is considered for processing.
+ val slackTime = System.getProperty("spark.streaming.filestream.slackTime", "2000").toLong
+ val maxModTime = currentTime - slackTime
+
def accept(path: Path): Boolean = {
if (!filter(path)) { // Reject file if it does not satisfy filter
logDebug("Rejected by filter " + path)
@@ -214,9 +217,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
} else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) {
logDebug("Mod time equal to last mod time, but file considered already")
return false // If the file was created exactly as lastModTime but not reported yet
- } else if (modTime > currentTime) {
- logDebug("Mod time more than valid time")
- return false // If the file was created after the time this function call requires
+ } else if (modTime > maxModTime) {
+ logDebug("Mod time more than ")
+ return false // If the file is too new that considering it may give errors
}
if (modTime > latestModTime) {
latestModTime = modTime