aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-05 23:42:53 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-05 23:42:53 -0800
commitac1f4b06c12dae922172b6fa907eec0ae0bd0170 (patch)
treee00d6fd0a4ecf95bfa582081c0b26c8712419938 /streaming
parent23947945913cafb4f6549167c53a3cdd4a09fef0 (diff)
downloadspark-ac1f4b06c12dae922172b6fa907eec0ae0bd0170.tar.gz
spark-ac1f4b06c12dae922172b6fa907eec0ae0bd0170.tar.bz2
spark-ac1f4b06c12dae922172b6fa907eec0ae0bd0170.zip
Added a hashmap to cache file mod times.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala30
1 files changed, 24 insertions, 6 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index b4743013b1..0028422db9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -23,10 +23,10 @@ import scala.reflect.ClassTag
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.UnionRDD
import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
+import org.apache.spark.util.TimeStampedHashMap
private[streaming]
@@ -46,6 +46,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null
@transient private[streaming] var files = new HashMap[Time, Array[String]]
+ @transient private var fileModTimes = new TimeStampedHashMap[String, Long](true)
+ @transient private var lastNewFileFindingTime = 0L
override def start() {
if (newFilesOnly) {
@@ -96,6 +98,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
(time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
logDebug("Cleared files are:\n" +
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
+ // Delete file times that weren't accessed in the last round of getting new files
+ fileModTimes.clearOldValues(lastNewFileFindingTime - 1)
}
/**
@@ -104,8 +108,18 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
*/
private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
logDebug("Trying to get new files for time " + currentTime)
+ lastNewFileFindingTime = System.currentTimeMillis
val filter = new CustomPathFilter(currentTime)
- val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
+ val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
+ val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
+ logInfo("Finding new files took " + timeTaken + " ms")
+ if (timeTaken > slideDuration.milliseconds) {
+ logWarning(
+ "Time taken to find new files exceeds the batch size. " +
+ "Consider increasing the batch size or reduceing the number of " +
+ "files in the monitored directory."
+ )
+ }
(newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
}
@@ -122,16 +136,20 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
new UnionRDD(context.sparkContext, fileRDDs)
}
- private def path: Path = {
+ private def directoryPath: Path = {
if (path_ == null) path_ = new Path(directory)
path_
}
private def fs: FileSystem = {
- if (fs_ == null) fs_ = path.getFileSystem(new Configuration())
+ if (fs_ == null) fs_ = directoryPath.getFileSystem(new Configuration())
fs_
}
+ private def getFileModTime(path: Path) = {
+ fileModTimes.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
+ }
+
private def reset() {
fs_ = null
}
@@ -142,6 +160,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
ois.defaultReadObject()
generatedRDDs = new HashMap[Time, RDD[(K,V)]] ()
files = new HashMap[Time, Array[String]]
+ fileModTimes = new TimeStampedHashMap[String, Long](true)
}
/**
@@ -187,14 +206,13 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
// Latest file mod time seen in this round of fetching files and its corresponding files
var latestModTime = 0L
val latestModTimeFiles = new HashSet[String]()
-
def accept(path: Path): Boolean = {
try {
if (!filter(path)) { // Reject file if it does not satisfy filter
logDebug("Rejected by filter " + path)
return false
}
- val modTime = fs.getFileStatus(path).getModificationTime()
+ val modTime = getFileModTime(path)
logDebug("Mod time for " + path + " is " + modTime)
if (modTime < prevModTime) {
logDebug("Mod time less than last mod time")