aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-13 16:54:52 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-13 16:54:52 -0800
commitc0bb38e8aa36c53e96a64b9bf8d2c8b020e93663 (patch)
treee857ce08bcf7a87953a381470d5cb08fa976b817 /streaming
parentb93f9d42f21f03163734ef97b2871db945e166da (diff)
downloadspark-c0bb38e8aa36c53e96a64b9bf8d2c8b020e93663.tar.gz
spark-c0bb38e8aa36c53e96a64b9bf8d2c8b020e93663.tar.bz2
spark-c0bb38e8aa36c53e96a64b9bf8d2c8b020e93663.zip
Improved file input stream further.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala49
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala82
2 files changed, 69 insertions, 62 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index a7c4cca7ea..9dfcc08abe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -35,18 +35,19 @@ import org.apache.spark.streaming.Duration
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
- * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
- * for more details on RDDs). DStreams can either be created from live data (such as, data from
- * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
- * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
- * DStream periodically generates a RDD, either from live data or by transforming the RDD generated
- * by a parent DStream.
+ * sequence of RDDs (of the same type) representing a continuous stream of data (see
+ * [[org.apache.spark.rdd.RDD]] for more details on RDDs). DStreams can either be created from
+ * live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation
+ * existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`.
+ * While a Spark Streaming program is running, each DStream periodically generates a RDD,
+ * either from live data or by transforming the RDD generated by a parent DStream.
*
* This class contains the basic operations available on all DStreams, such as `map`, `filter` and
- * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations available
- * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
- * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
- * implicit conversions when `spark.streaming.StreamingContext._` is imported.
+ * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
+ * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
+ * `join`. These operations are automatically available on any DStream of pairs
+ * (e.g., DStream[(Int, Int)] through implicit conversions when
+ * `org.apache.spark.streaming.StreamingContext._` is imported.
*
* DStreams internally is characterized by a few basic properties:
* - A list of other DStreams that the DStream depends on
@@ -155,7 +156,8 @@ abstract class DStream[T: ClassTag] (
// Set the minimum value of the rememberDuration if not already set
var minRememberDuration = slideDuration
if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
- minRememberDuration = checkpointDuration * 2 // times 2 just to be sure that the latest checkpoint is not forgetten
+ // times 2 just to be sure that the latest checkpoint is not forgotten (#paranoia)
+ minRememberDuration = checkpointDuration * 2
}
if (rememberDuration == null || rememberDuration < minRememberDuration) {
rememberDuration = minRememberDuration
@@ -259,7 +261,8 @@ abstract class DStream[T: ClassTag] (
if (!isInitialized) {
throw new Exception (this + " has not been initialized")
} else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
- logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
+ logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime +
+ " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
false
} else {
logDebug("Time " + time + " is valid")
@@ -288,11 +291,14 @@ abstract class DStream[T: ClassTag] (
case Some(newRDD) =>
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
- logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time)
+ logInfo("Persisting RDD " + newRDD.id + " for time " +
+ time + " to " + storageLevel + " at time " + time)
}
- if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
+ if (checkpointDuration != null &&
+ (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
- logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time)
+ logInfo("Marking RDD " + newRDD.id + " for time " + time +
+ " for checkpointing at time " + time)
}
generatedRDDs.put(time, newRDD)
Some(newRDD)
@@ -401,7 +407,8 @@ abstract class DStream[T: ClassTag] (
}
}
} else {
- throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.")
+ throw new java.io.NotSerializableException(
+ "Graph is unexpectedly null when DStream is being serialized.")
}
}
@@ -651,8 +658,8 @@ abstract class DStream[T: ClassTag] (
/**
* Return a new DStream in which each RDD has a single element generated by counting the number
- * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
- * Spark's default number of partitions.
+ * of elements in a sliding window over this DStream. Hash partitioning is used to generate
+ * the RDDs with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
@@ -709,10 +716,12 @@ abstract class DStream[T: ClassTag] (
*/
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
- logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
+ logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
+ + slideDuration + ")")
}
if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
- logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
+ logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
+ + slideDuration + ")")
}
val alignedToTime = toTime.floor(slideDuration)
val alignedFromTime = fromTime.floor(slideDuration)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 37c46b26a5..8a6051622e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -39,24 +39,22 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
- // Latest file mod time seen till any point of time
- private val prevModTimeFiles = new HashSet[String]()
- private var prevModTime = 0L
+ // files found in the last interval
+ private val lastFoundFiles = new HashSet[String]
+
+ // Files with mod time earlier than this is ignored. This is updated every interval
+ // such that in the current interval, files older than any file found in the
+ // previous interval will be ignored. Obviously this time keeps moving forward.
+ private var ignoreTime = if (newFilesOnly) 0L else System.currentTimeMillis()
+ // Latest file mod time seen till any point of time
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null
@transient private[streaming] var files = new HashMap[Time, Array[String]]
@transient private var fileModTimes = new TimeStampedHashMap[String, Long](true)
@transient private var lastNewFileFindingTime = 0L
- override def start() {
- if (newFilesOnly) {
- prevModTime = graph.zeroTime.milliseconds
- } else {
- prevModTime = 0
- }
- logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly)
- }
+ override def start() { }
override def stop() { }
@@ -70,20 +68,16 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* the previous call.
*/
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
- assert(validTime.milliseconds >= prevModTime,
- "Trying to get new files for really old time [" + validTime + " < " + prevModTime + "]")
+ assert(validTime.milliseconds >= ignoreTime,
+ "Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]")
// Find new files
- val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds)
+ val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
- if (newFiles.length > 0) {
- // Update the modification time and the files processed for that modification time
- if (prevModTime < latestModTime) {
- prevModTime = latestModTime
- prevModTimeFiles.clear()
- }
- prevModTimeFiles ++= latestModTimeFiles
- logDebug("Last mod time updated to " + prevModTime)
+ if (!newFiles.isEmpty) {
+ lastFoundFiles.clear()
+ lastFoundFiles ++= newFiles
+ ignoreTime = minNewFileModTime
}
files += ((validTime, newFiles.toArray))
Some(filesToRDD(newFiles))
@@ -92,7 +86,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
/** Clear the old time-to-files mappings along with old RDDs */
protected[streaming] override def clearMetadata(time: Time) {
super.clearMetadata(time)
- val oldFiles = files.filter(_._1 <= (time - rememberDuration))
+ val oldFiles = files.filter(_._1 < (time - rememberDuration))
files --= oldFiles.keys
logInfo("Cleared " + oldFiles.size + " old files that were older than " +
(time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
@@ -106,7 +100,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* Find files which have modification timestamp <= current time and return a 3-tuple of
* (new files found, latest modification time among them, files with latest modification time)
*/
- private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
+ private def findNewFiles(currentTime: Long): (Seq[String], Long) = {
logDebug("Trying to get new files for time " + currentTime)
lastNewFileFindingTime = System.currentTimeMillis
val filter = new CustomPathFilter(currentTime)
@@ -121,7 +115,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
"files in the monitored directory."
)
}
- (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
+ (newFiles, filter.minNewFileModTime)
}
/** Generate one RDD from an array of files */
@@ -200,38 +194,42 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}
/**
- * Custom PathFilter class to find new files that have modification timestamps <= current time,
- * but have not been seen before (i.e. the file should not be in lastModTimeFiles)
+ * Custom PathFilter class to find new files that
+ * ... have modification time more than ignore time
+ * ... have not been seen in the last interval
+ * ... have modification time less than maxModTime
*/
private[streaming]
class CustomPathFilter(maxModTime: Long) extends PathFilter {
- // Latest file mod time seen in this round of fetching files and its corresponding files
- var latestModTime = 0L
- val latestModTimeFiles = new HashSet[String]()
+
+ // Minimum of the mod times of new files found in the current interval
+ var minNewFileModTime = -1L
+
def accept(path: Path): Boolean = {
try {
if (!filter(path)) { // Reject file if it does not satisfy filter
logDebug("Rejected by filter " + path)
return false
}
+ // Reject file if it was found in the last interval
+ if (lastFoundFiles.contains(path.toString)) {
+ logDebug("Mod time equal to last mod time, but file considered already")
+ return false
+ }
val modTime = getFileModTime(path)
logDebug("Mod time for " + path + " is " + modTime)
- if (modTime < prevModTime) {
- logDebug("Mod time less than last mod time")
- return false // If the file was created before the last time it was called
- } else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) {
- logDebug("Mod time equal to last mod time, but file considered already")
- return false // If the file was created exactly as lastModTime but not reported yet
+ if (modTime < ignoreTime) {
+ // Reject file if it was created before the ignore time (or, before last interval)
+ logDebug("Mod time " + modTime + " less than ignore time " + ignoreTime)
+ return false
} else if (modTime > maxModTime) {
+ // Reject file if it is too new that considering it may give errors
logDebug("Mod time more than ")
- return false // If the file is too new that considering it may give errors
+ return false
}
- if (modTime > latestModTime) {
- latestModTime = modTime
- latestModTimeFiles.clear()
- logDebug("Latest mod time updated to " + latestModTime)
+ if (minNewFileModTime < 0 || modTime < minNewFileModTime) {
+ minNewFileModTime = modTime
}
- latestModTimeFiles += path.toString
logDebug("Accepted " + path)
} catch {
case fnfe: java.io.FileNotFoundException =>