diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-11-11 13:20:09 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-11-11 13:20:09 -0800 |
commit | 46222dc56db4a521bd613bd3fac5b91868bb339e (patch) | |
tree | 8f0282c0bd409c300519fbec9af294bf706df6ca /streaming/src/main | |
parent | 04e9e9d93c512f856116bc2c99c35dfb48b4adee (diff) | |
download | spark-46222dc56db4a521bd613bd3fac5b91868bb339e.tar.gz spark-46222dc56db4a521bd613bd3fac5b91868bb339e.tar.bz2 spark-46222dc56db4a521bd613bd3fac5b91868bb339e.zip |
Fixed bug in FileInputDStream that allowed it to miss new files. Added tests in the InputStreamsSuite to test checkpointing of file and network streams.
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/FileInputDStream.scala | 34 |
1 files changed, 28 insertions, 6 deletions
diff --git a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala index 9d7361097b..88856364d2 100644 --- a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala @@ -6,7 +6,8 @@ import spark.rdd.UnionRDD import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import java.io.{ObjectInputStream, IOException} + +import scala.collection.mutable.HashSet class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( @@ -19,7 +20,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K @transient private var path_ : Path = null @transient private var fs_ : FileSystem = null - var lastModTime: Long = 0 + var lastModTime = 0L + val lastModTimeFiles = new HashSet[String]() def path(): Path = { if (path_ == null) path_ = new Path(directory) @@ -40,22 +42,37 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K } override def stop() { } - + + /** + * Finds the files that were modified since the last time this method was called and makes + * a union RDD out of them. Note that this maintains the list of files that were processed + * in the latest modification time in the previous call to this method. This is because the + * modification time returned by the FileStatus API seems to return times only at the + * granularity of seconds. Hence, new files may have the same modification time as the + * latest modification time in the previous call to this method and the list of files + * maintained is used to filter the one that have been processed. + */ override def compute(validTime: Time): Option[RDD[(K, V)]] = { + // Create the filter for selecting new files val newFilter = new PathFilter() { var latestModTime = 0L - + val latestModTimeFiles = new HashSet[String]() + def accept(path: Path): Boolean = { if (!filter.accept(path)) { return false } else { val modTime = fs.getFileStatus(path).getModificationTime() - if (modTime <= lastModTime) { + if (modTime < lastModTime){ + return false + } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) { return false } if (modTime > latestModTime) { latestModTime = modTime + latestModTimeFiles.clear() } + latestModTimeFiles += path.toString return true } } @@ -64,7 +81,12 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K val newFiles = fs.listStatus(path, newFilter) logInfo("New files: " + newFiles.map(_.getPath).mkString(", ")) if (newFiles.length > 0) { - lastModTime = newFilter.latestModTime + // Update the modification time and the files processed for that modification time + if (lastModTime != newFilter.latestModTime) { + lastModTime = newFilter.latestModTime + lastModTimeFiles.clear() + } + lastModTimeFiles ++= newFilter.latestModTimeFiles } val newRDD = new UnionRDD(ssc.sc, newFiles.map( file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString))) |