aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authortdas <tathagata.das1565@gmail.com>2012-11-11 22:56:14 +0000
committertdas <tathagata.das1565@gmail.com>2012-11-11 22:56:14 +0000
commit052d0b800ffe1bcfddc33a6fb3ad71e169b219bb (patch)
tree21f6c62356ea0c07395a39b12f462a31497055f8 /streaming/src/main
parent52d21cb682d1c4ca05e6823f8049ccedc3c5530c (diff)
parent46222dc56db4a521bd613bd3fac5b91868bb339e (diff)
downloadspark-052d0b800ffe1bcfddc33a6fb3ad71e169b219bb.tar.gz
spark-052d0b800ffe1bcfddc33a6fb3ad71e169b219bb.tar.bz2
spark-052d0b800ffe1bcfddc33a6fb3ad71e169b219bb.zip
Merge branch 'dev' of github.com:radlab/spark into dev
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/spark/streaming/FileInputDStream.scala34
1 files changed, 28 insertions, 6 deletions
diff --git a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
index 9d7361097b..88856364d2 100644
--- a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
@@ -6,7 +6,8 @@ import spark.rdd.UnionRDD
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import java.io.{ObjectInputStream, IOException}
+
+import scala.collection.mutable.HashSet
class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
@@ -19,7 +20,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null
- var lastModTime: Long = 0
+ var lastModTime = 0L
+ val lastModTimeFiles = new HashSet[String]()
def path(): Path = {
if (path_ == null) path_ = new Path(directory)
@@ -40,22 +42,37 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
}
override def stop() { }
-
+
+ /**
+ * Finds the files that were modified since the last time this method was called and makes
+ * a union RDD out of them. Note that this maintains the list of files that were processed
+ * in the latest modification time in the previous call to this method. This is because the
+ * modification time returned by the FileStatus API seems to return times only at the
+ * granularity of seconds. Hence, new files may have the same modification time as the
+ * latest modification time in the previous call to this method and the list of files
+ * maintained is used to filter the one that have been processed.
+ */
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
+ // Create the filter for selecting new files
val newFilter = new PathFilter() {
var latestModTime = 0L
-
+ val latestModTimeFiles = new HashSet[String]()
+
def accept(path: Path): Boolean = {
if (!filter.accept(path)) {
return false
} else {
val modTime = fs.getFileStatus(path).getModificationTime()
- if (modTime <= lastModTime) {
+ if (modTime < lastModTime){
+ return false
+ } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) {
return false
}
if (modTime > latestModTime) {
latestModTime = modTime
+ latestModTimeFiles.clear()
}
+ latestModTimeFiles += path.toString
return true
}
}
@@ -64,7 +81,12 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
val newFiles = fs.listStatus(path, newFilter)
logInfo("New files: " + newFiles.map(_.getPath).mkString(", "))
if (newFiles.length > 0) {
- lastModTime = newFilter.latestModTime
+ // Update the modification time and the files processed for that modification time
+ if (lastModTime != newFilter.latestModTime) {
+ lastModTime = newFilter.latestModTime
+ lastModTimeFiles.clear()
+ }
+ lastModTimeFiles ++= newFilter.latestModTimeFiles
}
val newRDD = new UnionRDD(ssc.sc, newFiles.map(
file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString)))