From 19d1d58b67a767b227e009ab8723efaa7087dd07 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 23 Dec 2013 23:48:43 +0000 Subject: Fixed bug in file stream that prevented some files from being read correctly. --- .../spark/streaming/dstream/FileInputDStream.scala | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index a618a709a7..fb52bcfb67 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -128,10 +128,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas /** Generate one RDD from an array of files */ private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { - new UnionRDD( - context.sparkContext, - files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file)) - ) + val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file)) + new UnionRDD(context.sparkContext, fileRDDs) } private def path: Path = { @@ -191,9 +189,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas } /** - * PathFilter to find new files that have modification timestamps <= current time, but have not + * Custom PathFilter class to find new files that have modification timestamps <= current time, but have not * been seen before (i.e. the file should not be in lastModTimeFiles) - * @param currentTime */ private[streaming] class CustomPathFilter(currentTime: Long) extends PathFilter() { @@ -201,6 +198,12 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas var latestModTime = 0L val latestModTimeFiles = new HashSet[String]() + // Creating an RDD from a HDFS file immediately after the file is created sometime returns + // an RDD with 0 partitions. To avoid that, we introduce a slack time - files that are older + // than slack time from current time is considered for processing. + val slackTime = System.getProperty("spark.streaming.filestream.slackTime", "2000").toLong + val maxModTime = currentTime - slackTime + def accept(path: Path): Boolean = { if (!filter(path)) { // Reject file if it does not satisfy filter logDebug("Rejected by filter " + path) @@ -214,9 +217,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas } else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) { logDebug("Mod time equal to last mod time, but file considered already") return false // If the file was created exactly as lastModTime but not reported yet - } else if (modTime > currentTime) { - logDebug("Mod time more than valid time") - return false // If the file was created after the time this function call requires + } else if (modTime > maxModTime) { + logDebug("Mod time more than ") + return false // If the file is too new that considering it may give errors } if (modTime > latestModTime) { latestModTime = modTime -- cgit v1.2.3