aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-13 19:45:26 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-13 19:45:26 -0800
commita2fee38ee054c7dd6ff5f5d72f036fef54194d53 (patch)
treec70619e76241ff45728469fdcde8490581b012df /streaming
parent01c0d72b322544665c51a9066b870fd723dbd3d2 (diff)
parentc0bb38e8aa36c53e96a64b9bf8d2c8b020e93663 (diff)
downloadspark-a2fee38ee054c7dd6ff5f5d72f036fef54194d53.tar.gz
spark-a2fee38ee054c7dd6ff5f5d72f036fef54194d53.tar.bz2
spark-a2fee38ee054c7dd6ff5f5d72f036fef54194d53.zip
Merge pull request #411 from tdas/filestream-fix
Improved logic of finding new files in FileInputDStream Earlier, if HDFS has a hiccup and reports a existence of a new file (mod time T sec) at time T + 1 sec, then fileStream could have missed that file. With this change, it should be able to find files that are delayed by up to <batch size> seconds. That is, even if file is reported at T + <batch time> sec, file stream should be able to catch it. The new logic, at a high level, is as follows. It keeps track of the new files it found in the previous interval and mod time of the oldest of those files (lets call it X). Then in the current interval, it will ignore those files that were seen in the previous interval and those which have mod time older than X. So if a new file gets reported by HDFS that in the current interval, but has mod time in the previous interval, it will be considered. However, if the mod time earlier than the previous interval (that is, earlier than X), they will be ignored. This is the current limitation, and future version would improve this behavior further. Also reduced line lengths in DStream to <=100 chars.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala49
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala82
2 files changed, 69 insertions, 62 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index a7c4cca7ea..9dfcc08abe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -35,18 +35,19 @@ import org.apache.spark.streaming.Duration
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
- * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
- * for more details on RDDs). DStreams can either be created from live data (such as, data from
- * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
- * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
- * DStream periodically generates a RDD, either from live data or by transforming the RDD generated
- * by a parent DStream.
+ * sequence of RDDs (of the same type) representing a continuous stream of data (see
+ * [[org.apache.spark.rdd.RDD]] for more details on RDDs). DStreams can either be created from
+ * live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation
+ * existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`.
+ * While a Spark Streaming program is running, each DStream periodically generates a RDD,
+ * either from live data or by transforming the RDD generated by a parent DStream.
*
* This class contains the basic operations available on all DStreams, such as `map`, `filter` and
- * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations available
- * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
- * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
- * implicit conversions when `spark.streaming.StreamingContext._` is imported.
+ * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
+ * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
+ * `join`. These operations are automatically available on any DStream of pairs
+ * (e.g., DStream[(Int, Int)] through implicit conversions when
+ * `org.apache.spark.streaming.StreamingContext._` is imported.
*
* DStreams internally is characterized by a few basic properties:
* - A list of other DStreams that the DStream depends on
@@ -155,7 +156,8 @@ abstract class DStream[T: ClassTag] (
// Set the minimum value of the rememberDuration if not already set
var minRememberDuration = slideDuration
if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
- minRememberDuration = checkpointDuration * 2 // times 2 just to be sure that the latest checkpoint is not forgetten
+ // times 2 just to be sure that the latest checkpoint is not forgotten (#paranoia)
+ minRememberDuration = checkpointDuration * 2
}
if (rememberDuration == null || rememberDuration < minRememberDuration) {
rememberDuration = minRememberDuration
@@ -259,7 +261,8 @@ abstract class DStream[T: ClassTag] (
if (!isInitialized) {
throw new Exception (this + " has not been initialized")
} else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
- logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
+ logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime +
+ " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
false
} else {
logDebug("Time " + time + " is valid")
@@ -288,11 +291,14 @@ abstract class DStream[T: ClassTag] (
case Some(newRDD) =>
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
- logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time)
+ logInfo("Persisting RDD " + newRDD.id + " for time " +
+ time + " to " + storageLevel + " at time " + time)
}
- if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
+ if (checkpointDuration != null &&
+ (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
- logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time)
+ logInfo("Marking RDD " + newRDD.id + " for time " + time +
+ " for checkpointing at time " + time)
}
generatedRDDs.put(time, newRDD)
Some(newRDD)
@@ -401,7 +407,8 @@ abstract class DStream[T: ClassTag] (
}
}
} else {
- throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.")
+ throw new java.io.NotSerializableException(
+ "Graph is unexpectedly null when DStream is being serialized.")
}
}
@@ -651,8 +658,8 @@ abstract class DStream[T: ClassTag] (
/**
* Return a new DStream in which each RDD has a single element generated by counting the number
- * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
- * Spark's default number of partitions.
+ * of elements in a sliding window over this DStream. Hash partitioning is used to generate
+ * the RDDs with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
@@ -709,10 +716,12 @@ abstract class DStream[T: ClassTag] (
*/
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
- logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
+ logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
+ + slideDuration + ")")
}
if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
- logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
+ logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
+ + slideDuration + ")")
}
val alignedToTime = toTime.floor(slideDuration)
val alignedFromTime = fromTime.floor(slideDuration)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 37c46b26a5..8a6051622e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -39,24 +39,22 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
- // Latest file mod time seen till any point of time
- private val prevModTimeFiles = new HashSet[String]()
- private var prevModTime = 0L
+ // files found in the last interval
+ private val lastFoundFiles = new HashSet[String]
+
+ // Files with mod time earlier than this is ignored. This is updated every interval
+ // such that in the current interval, files older than any file found in the
+ // previous interval will be ignored. Obviously this time keeps moving forward.
+ private var ignoreTime = if (newFilesOnly) 0L else System.currentTimeMillis()
+ // Latest file mod time seen till any point of time
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null
@transient private[streaming] var files = new HashMap[Time, Array[String]]
@transient private var fileModTimes = new TimeStampedHashMap[String, Long](true)
@transient private var lastNewFileFindingTime = 0L
- override def start() {
- if (newFilesOnly) {
- prevModTime = graph.zeroTime.milliseconds
- } else {
- prevModTime = 0
- }
- logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly)
- }
+ override def start() { }
override def stop() { }
@@ -70,20 +68,16 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* the previous call.
*/
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
- assert(validTime.milliseconds >= prevModTime,
- "Trying to get new files for really old time [" + validTime + " < " + prevModTime + "]")
+ assert(validTime.milliseconds >= ignoreTime,
+ "Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]")
// Find new files
- val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds)
+ val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
- if (newFiles.length > 0) {
- // Update the modification time and the files processed for that modification time
- if (prevModTime < latestModTime) {
- prevModTime = latestModTime
- prevModTimeFiles.clear()
- }
- prevModTimeFiles ++= latestModTimeFiles
- logDebug("Last mod time updated to " + prevModTime)
+ if (!newFiles.isEmpty) {
+ lastFoundFiles.clear()
+ lastFoundFiles ++= newFiles
+ ignoreTime = minNewFileModTime
}
files += ((validTime, newFiles.toArray))
Some(filesToRDD(newFiles))
@@ -92,7 +86,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
/** Clear the old time-to-files mappings along with old RDDs */
protected[streaming] override def clearMetadata(time: Time) {
super.clearMetadata(time)
- val oldFiles = files.filter(_._1 <= (time - rememberDuration))
+ val oldFiles = files.filter(_._1 < (time - rememberDuration))
files --= oldFiles.keys
logInfo("Cleared " + oldFiles.size + " old files that were older than " +
(time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
@@ -106,7 +100,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* Find files which have modification timestamp <= current time and return a 3-tuple of
* (new files found, latest modification time among them, files with latest modification time)
*/
- private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
+ private def findNewFiles(currentTime: Long): (Seq[String], Long) = {
logDebug("Trying to get new files for time " + currentTime)
lastNewFileFindingTime = System.currentTimeMillis
val filter = new CustomPathFilter(currentTime)
@@ -121,7 +115,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
"files in the monitored directory."
)
}
- (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
+ (newFiles, filter.minNewFileModTime)
}
/** Generate one RDD from an array of files */
@@ -200,38 +194,42 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}
/**
- * Custom PathFilter class to find new files that have modification timestamps <= current time,
- * but have not been seen before (i.e. the file should not be in lastModTimeFiles)
+ * Custom PathFilter class to find new files that
+ * ... have modification time more than ignore time
+ * ... have not been seen in the last interval
+ * ... have modification time less than maxModTime
*/
private[streaming]
class CustomPathFilter(maxModTime: Long) extends PathFilter {
- // Latest file mod time seen in this round of fetching files and its corresponding files
- var latestModTime = 0L
- val latestModTimeFiles = new HashSet[String]()
+
+ // Minimum of the mod times of new files found in the current interval
+ var minNewFileModTime = -1L
+
def accept(path: Path): Boolean = {
try {
if (!filter(path)) { // Reject file if it does not satisfy filter
logDebug("Rejected by filter " + path)
return false
}
+ // Reject file if it was found in the last interval
+ if (lastFoundFiles.contains(path.toString)) {
+ logDebug("Mod time equal to last mod time, but file considered already")
+ return false
+ }
val modTime = getFileModTime(path)
logDebug("Mod time for " + path + " is " + modTime)
- if (modTime < prevModTime) {
- logDebug("Mod time less than last mod time")
- return false // If the file was created before the last time it was called
- } else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) {
- logDebug("Mod time equal to last mod time, but file considered already")
- return false // If the file was created exactly as lastModTime but not reported yet
+ if (modTime < ignoreTime) {
+ // Reject file if it was created before the ignore time (or, before last interval)
+ logDebug("Mod time " + modTime + " less than ignore time " + ignoreTime)
+ return false
} else if (modTime > maxModTime) {
+ // Reject file if it is too new that considering it may give errors
logDebug("Mod time more than ")
- return false // If the file is too new that considering it may give errors
+ return false
}
- if (modTime > latestModTime) {
- latestModTime = modTime
- latestModTimeFiles.clear()
- logDebug("Latest mod time updated to " + latestModTime)
+ if (minNewFileModTime < 0 || modTime < minNewFileModTime) {
+ minNewFileModTime = modTime
}
- latestModTimeFiles += path.toString
logDebug("Accepted " + path)
} catch {
case fnfe: java.io.FileNotFoundException =>