aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-11-24 13:50:20 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-11-24 13:50:20 -0800
commitcb0e9b0980f38befe88bf52aa037fe33262730f7 (patch)
tree730da28c128641d8b9f82f0a8bb93e0c4c2d6f49
parent4a90276ab22d6989dffb2ee2d8118d9253365646 (diff)
downloadspark-cb0e9b0980f38befe88bf52aa037fe33262730f7.tar.gz
spark-cb0e9b0980f38befe88bf52aa037fe33262730f7.tar.bz2
spark-cb0e9b0980f38befe88bf52aa037fe33262730f7.zip
[SPARK-4518][SPARK-4519][Streaming] Refactored file stream to prevent files from being processed multiple times
Because of a corner case, a file already selected for batch t can get considered again for batch t+2. This refactoring fixes it by remembering all the files selected in the last 1 minute, so that this corner case does not arise. Also uses spark context's hadoop configuration to access the file system API for listing directories. pwendell Please take look. I still have not run long-running integration tests, so I cannot say for sure whether this has indeed solved the issue. You could do a first pass on this in the meantime. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #3419 from tdas/filestream-fix2 and squashes the following commits: c19dd8a [Tathagata Das] Addressed PR comments. 513b608 [Tathagata Das] Updated docs. d364faf [Tathagata Das] Added the current time condition back 5526222 [Tathagata Das] Removed unnecessary imports. 38bb736 [Tathagata Das] Fix long line. 203bbc7 [Tathagata Das] Un-ignore tests. eaef4e1 [Tathagata Das] Fixed SPARK-4519 9dbd40a [Tathagata Das] Refactored FileInputDStream to remember last few batches.
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala291
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala106
4 files changed, 245 insertions, 156 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index eabd61d713..dbf1ebbaf6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -254,7 +254,7 @@ abstract class DStream[T: ClassTag] (
}
private[streaming] def remember(duration: Duration) {
- if (duration != null && duration > rememberDuration) {
+ if (duration != null && (rememberDuration == null || duration > rememberDuration)) {
rememberDuration = duration
logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 55d6cf6a78..5f13fdc557 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -17,18 +17,55 @@
package org.apache.spark.streaming.dstream
-import java.io.{ObjectInputStream, IOException}
-import scala.collection.mutable.{HashSet, HashMap}
+import java.io.{IOException, ObjectInputStream}
+
+import scala.collection.mutable
import scala.reflect.ClassTag
+
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.UnionRDD
-import org.apache.spark.streaming.{StreamingContext, Time}
-import org.apache.spark.util.{TimeStampedHashMap, Utils}
+import org.apache.spark.rdd.{RDD, UnionRDD}
+import org.apache.spark.streaming._
+import org.apache.spark.util.{TimeStampedHashMap, Utils}
+/**
+ * This class represents an input stream that monitors a Hadoop-compatible filesystem for new
+ * files and creates a stream out of them. The way it works as follows.
+ *
+ * At each batch interval, the file system is queried for files in the given directory and
+ * detected new files are selected for that batch. In this case "new" means files that
+ * became visible to readers during that time period. Some extra care is needed to deal
+ * with the fact that files may become visible after they are created. For this purpose, this
+ * class remembers the information about the files selected in past batches for
+ * a certain duration (say, "remember window") as shown in the figure below.
+ *
+ * |<----- remember window ----->|
+ * ignore threshold --->| |<--- current batch time
+ * |____.____.____.____.____.____|
+ * | | | | | | |
+ * ---------------------|----|----|----|----|----|----|-----------------------> Time
+ * |____|____|____|____|____|____|
+ * remembered batches
+ *
+ * The trailing end of the window is the "ignore threshold" and all files whose mod times
+ * are less than this threshold are assumed to have already been selected and are therefore
+ * ignored. Files whose mod times are within the "remember window" are checked against files
+ * that have already been selected. At a high level, this is how new files are identified in
+ * each batch - files whose mod times are greater than the ignore threshold and
+ * have not been considered within the remember window. See the documentation on the method
+ * `isNewFile` for more details.
+ *
+ * This makes some assumptions from the underlying file system that the system is monitoring.
+ * - The clock of the file system is assumed to synchronized with the clock of the machine running
+ * the streaming app.
+ * - If a file is to be visible in the directory listings, it must be visible within a certain
+ * duration of the mod time of the file. This duration is the "remember window", which is set to
+ * 1 minute (see `FileInputDStream.MIN_REMEMBER_DURATION`). Otherwise, the file will never be
+ * selected as the mod time will be less than the ignore threshold when it becomes visible.
+ * - Once a file is visible, the mod time cannot change. If it does due to appends, then the
+ * processing semantics are undefined.
+ */
private[streaming]
class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
@transient ssc_ : StreamingContext,
@@ -37,22 +74,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) {
+ // Data to be saved as part of the streaming checkpoints
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
- // files found in the last interval
- private val lastFoundFiles = new HashSet[String]
+ // Initial ignore threshold based on which old, existing files in the directory (at the time of
+ // starting the streaming application) will be ignored or considered
+ private val initialModTimeIgnoreThreshold = if (newFilesOnly) System.currentTimeMillis() else 0L
+
+ /*
+ * Make sure that the information of files selected in the last few batches are remembered.
+ * This would allow us to filter away not-too-old files which have already been recently
+ * selected and processed.
+ */
+ private val numBatchesToRemember = FileInputDStream.calculateNumBatchesToRemember(slideDuration)
+ private val durationToRemember = slideDuration * numBatchesToRemember
+ remember(durationToRemember)
- // Files with mod time earlier than this is ignored. This is updated every interval
- // such that in the current interval, files older than any file found in the
- // previous interval will be ignored. Obviously this time keeps moving forward.
- private var ignoreTime = if (newFilesOnly) System.currentTimeMillis() else 0L
+ // Map of batch-time to selected file info for the remembered batches
+ @transient private[streaming] var batchTimeToSelectedFiles =
+ new mutable.HashMap[Time, Array[String]]
+
+ // Set of files that were selected in the remembered batches
+ @transient private var recentlySelectedFiles = new mutable.HashSet[String]()
+
+ // Read-through cache of file mod times, used to speed up mod time lookups
+ @transient private var fileToModTime = new TimeStampedHashMap[String, Long](true)
+
+ // Timestamp of the last round of finding files
+ @transient private var lastNewFileFindingTime = 0L
- // Latest file mod time seen till any point of time
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null
- @transient private[streaming] var files = new HashMap[Time, Array[String]]
- @transient private var fileModTimes = new TimeStampedHashMap[String, Long](true)
- @transient private var lastNewFileFindingTime = 0L
override def start() { }
@@ -68,54 +120,113 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* the previous call.
*/
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
- assert(validTime.milliseconds >= ignoreTime,
- "Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]")
-
// Find new files
- val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds)
+ val newFiles = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
- if (!newFiles.isEmpty) {
- lastFoundFiles.clear()
- lastFoundFiles ++= newFiles
- ignoreTime = minNewFileModTime
- }
- files += ((validTime, newFiles.toArray))
+ batchTimeToSelectedFiles += ((validTime, newFiles))
+ recentlySelectedFiles ++= newFiles
Some(filesToRDD(newFiles))
}
/** Clear the old time-to-files mappings along with old RDDs */
protected[streaming] override def clearMetadata(time: Time) {
super.clearMetadata(time)
- val oldFiles = files.filter(_._1 < (time - rememberDuration))
- files --= oldFiles.keys
+ val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
+ batchTimeToSelectedFiles --= oldFiles.keys
+ recentlySelectedFiles --= oldFiles.values.flatten
logInfo("Cleared " + oldFiles.size + " old files that were older than " +
(time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
logDebug("Cleared files are:\n" +
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
// Delete file mod times that weren't accessed in the last round of getting new files
- fileModTimes.clearOldValues(lastNewFileFindingTime - 1)
+ fileToModTime.clearOldValues(lastNewFileFindingTime - 1)
}
/**
- * Find files which have modification timestamp <= current time and return a 3-tuple of
- * (new files found, latest modification time among them, files with latest modification time)
+ * Find new files for the batch of `currentTime`. This is done by first calculating the
+ * ignore threshold for file mod times, and then getting a list of files filtered based on
+ * the current batch time and the ignore threshold. The ignore threshold is the max of
+ * initial ignore threshold and the trailing end of the remember window (that is, which ever
+ * is later in time).
*/
- private def findNewFiles(currentTime: Long): (Seq[String], Long) = {
- logDebug("Trying to get new files for time " + currentTime)
- lastNewFileFindingTime = System.currentTimeMillis
- val filter = new CustomPathFilter(currentTime)
- val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
- val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
- logInfo("Finding new files took " + timeTaken + " ms")
- logDebug("# cached file times = " + fileModTimes.size)
- if (timeTaken > slideDuration.milliseconds) {
- logWarning(
- "Time taken to find new files exceeds the batch size. " +
- "Consider increasing the batch size or reduceing the number of " +
- "files in the monitored directory."
+ private def findNewFiles(currentTime: Long): Array[String] = {
+ try {
+ lastNewFileFindingTime = System.currentTimeMillis
+
+ // Calculate ignore threshold
+ val modTimeIgnoreThreshold = math.max(
+ initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting
+ currentTime - durationToRemember.milliseconds // trailing end of the remember window
)
+ logDebug(s"Getting new files for time $currentTime, " +
+ s"ignoring files older than $modTimeIgnoreThreshold")
+ val filter = new PathFilter {
+ def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
+ }
+ val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
+ val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
+ logInfo("Finding new files took " + timeTaken + " ms")
+ logDebug("# cached file times = " + fileToModTime.size)
+ if (timeTaken > slideDuration.milliseconds) {
+ logWarning(
+ "Time taken to find new files exceeds the batch size. " +
+ "Consider increasing the batch size or reducing the number of " +
+ "files in the monitored directory."
+ )
+ }
+ newFiles
+ } catch {
+ case e: Exception =>
+ logWarning("Error finding new files", e)
+ reset()
+ Array.empty
+ }
+ }
+
+ /**
+ * Identify whether the given `path` is a new file for the batch of `currentTime`. For it to be
+ * accepted, it has to pass the following criteria.
+ * - It must pass the user-provided file filter.
+ * - It must be newer than the ignore threshold. It is assumed that files older than the ignore
+ * threshold have already been considered or are existing files before start
+ * (when newFileOnly = true).
+ * - It must not be present in the recently selected files that this class remembers.
+ * - It must not be newer than the time of the batch (i.e. `currentTime` for which this
+ * file is being tested. This can occur if the driver was recovered, and the missing batches
+ * (during downtime) are being generated. In that case, a batch of time T may be generated
+ * at time T+x. Say x = 5. If that batch T contains file of mod time T+5, then bad things can
+ * happen. Let's say the selected files are remembered for 60 seconds. At time t+61,
+ * the batch of time t is forgotten, and the ignore threshold is still T+1.
+ * The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1).
+ * Hence they can get selected as new files again. To prevent this, files whose mod time is more
+ * than current batch time are not considered.
+ */
+ private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = {
+ val pathStr = path.toString
+ // Reject file if it does not satisfy filter
+ if (!filter(path)) {
+ logDebug(s"$pathStr rejected by filter")
+ return false
+ }
+ // Reject file if it was created before the ignore time
+ val modTime = getFileModTime(path)
+ if (modTime <= modTimeIgnoreThreshold) {
+ // Use <= instead of < to avoid SPARK-4518
+ logDebug(s"$pathStr ignored as mod time $modTime <= ignore time $modTimeIgnoreThreshold")
+ return false
}
- (newFiles, filter.minNewFileModTime)
+ // Reject file if mod time > current batch time
+ if (modTime > currentTime) {
+ logDebug(s"$pathStr not selected as mod time $modTime > current time $currentTime")
+ return false
+ }
+ // Reject file if it was considered earlier
+ if (recentlySelectedFiles.contains(pathStr)) {
+ logDebug(s"$pathStr already considered")
+ return false
+ }
+ logDebug(s"$pathStr accepted with mod time $modTime")
+ return true
}
/** Generate one RDD from an array of files */
@@ -132,21 +243,21 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
new UnionRDD(context.sparkContext, fileRDDs)
}
+ /** Get file mod time from cache or fetch it from the file system */
+ private def getFileModTime(path: Path) = {
+ fileToModTime.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
+ }
+
private def directoryPath: Path = {
if (path_ == null) path_ = new Path(directory)
path_
}
private def fs: FileSystem = {
- if (fs_ == null) fs_ = directoryPath.getFileSystem(new Configuration())
+ if (fs_ == null) fs_ = directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration)
fs_
}
- private def getFileModTime(path: Path) = {
- // Get file mod time from cache or fetch it from the file system
- fileModTimes.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
- }
-
private def reset() {
fs_ = null
}
@@ -155,9 +266,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
- generatedRDDs = new HashMap[Time, RDD[(K,V)]] ()
- files = new HashMap[Time, Array[String]]
- fileModTimes = new TimeStampedHashMap[String, Long](true)
+ generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] ()
+ batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]()
+ recentlySelectedFiles = new mutable.HashSet[String]()
+ fileToModTime = new TimeStampedHashMap[String, Long](true)
}
/**
@@ -167,11 +279,11 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
private[streaming]
class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) {
- def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]]
+ def hadoopFiles = data.asInstanceOf[mutable.HashMap[Time, Array[String]]]
override def update(time: Time) {
hadoopFiles.clear()
- hadoopFiles ++= files
+ hadoopFiles ++= batchTimeToSelectedFiles
}
override def cleanup(time: Time) { }
@@ -182,7 +294,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
// Restore the metadata in both files and generatedRDDs
logInfo("Restoring files for time " + t + " - " +
f.mkString("[", ", ", "]") )
- files += ((t, f))
+ batchTimeToSelectedFiles += ((t, f))
+ recentlySelectedFiles ++= f
generatedRDDs += ((t, filesToRDD(f)))
}
}
@@ -193,57 +306,25 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
}
}
+}
+
+private[streaming]
+object FileInputDStream {
/**
- * Custom PathFilter class to find new files that
- * ... have modification time more than ignore time
- * ... have not been seen in the last interval
- * ... have modification time less than maxModTime
+ * Minimum duration of remembering the information of selected files. Files with mod times
+ * older than this "window" of remembering will be ignored. So if new files are visible
+ * within this window, then the file will get selected in the next batch.
*/
- private[streaming]
- class CustomPathFilter(maxModTime: Long) extends PathFilter {
+ private val MIN_REMEMBER_DURATION = Minutes(1)
- // Minimum of the mod times of new files found in the current interval
- var minNewFileModTime = -1L
+ def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
- def accept(path: Path): Boolean = {
- try {
- if (!filter(path)) { // Reject file if it does not satisfy filter
- logDebug("Rejected by filter " + path)
- return false
- }
- // Reject file if it was found in the last interval
- if (lastFoundFiles.contains(path.toString)) {
- logDebug("Mod time equal to last mod time, but file considered already")
- return false
- }
- val modTime = getFileModTime(path)
- logDebug("Mod time for " + path + " is " + modTime)
- if (modTime < ignoreTime) {
- // Reject file if it was created before the ignore time (or, before last interval)
- logDebug("Mod time " + modTime + " less than ignore time " + ignoreTime)
- return false
- } else if (modTime > maxModTime) {
- // Reject file if it is too new that considering it may give errors
- logDebug("Mod time more than ")
- return false
- }
- if (minNewFileModTime < 0 || modTime < minNewFileModTime) {
- minNewFileModTime = modTime
- }
- logDebug("Accepted " + path)
- } catch {
- case fnfe: java.io.FileNotFoundException =>
- logWarning("Error finding new files", fnfe)
- reset()
- return false
- }
- true
- }
+ /**
+ * Calculate the number of last batches to remember, such that all the files selected in
+ * at least last MIN_REMEMBER_DURATION duration can be remembered.
+ */
+ def calculateNumBatchesToRemember(batchDuration: Duration): Int = {
+ math.ceil(MIN_REMEMBER_DURATION.milliseconds.toDouble / batchDuration.milliseconds).toInt
}
}
-
-private[streaming]
-object FileInputDStream {
- def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
-}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index e5592e52b0..77ff1ca780 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -265,7 +265,7 @@ class CheckpointSuite extends TestSuiteBase {
// Verify whether files created have been recorded correctly or not
var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
- def recordedFiles = fileInputDStream.files.values.flatMap(x => x)
+ def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index fa04fa326e..307052a4a9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -28,9 +28,12 @@ import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue}
+import scala.concurrent.duration._
+import scala.language.postfixOps
import com.google.common.io.Files
import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually._
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
@@ -38,6 +41,9 @@ import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
import org.apache.spark.rdd.RDD
+import org.apache.hadoop.io.{Text, LongWritable}
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
+import org.apache.hadoop.fs.Path
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
@@ -91,54 +97,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
- test("file input stream") {
- // Disable manual clock as FileInputDStream does not work with manual clock
- conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
-
- // Set up the streaming context and input streams
- val testDir = Utils.createTempDir()
- val ssc = new StreamingContext(conf, batchDuration)
- val fileStream = ssc.textFileStream(testDir.toString)
- val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
- def output = outputBuffer.flatMap(x => x)
- val outputStream = new TestOutputStream(fileStream, outputBuffer)
- outputStream.register()
- ssc.start()
-
- // Create files in the temporary directory so that Spark Streaming can read data from it
- val input = Seq(1, 2, 3, 4, 5)
- val expectedOutput = input.map(_.toString)
- Thread.sleep(1000)
- for (i <- 0 until input.size) {
- val file = new File(testDir, i.toString)
- Files.write(input(i) + "\n", file, Charset.forName("UTF-8"))
- logInfo("Created file " + file)
- Thread.sleep(batchDuration.milliseconds)
- Thread.sleep(1000)
- }
- val startTime = System.currentTimeMillis()
- Thread.sleep(1000)
- val timeTaken = System.currentTimeMillis() - startTime
- assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
- logInfo("Stopping context")
- ssc.stop()
-
- // Verify whether data received by Spark Streaming was as expected
- logInfo("--------------------------------")
- logInfo("output, size = " + outputBuffer.size)
- outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("expected output, size = " + expectedOutput.size)
- expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("--------------------------------")
-
- // Verify whether all the elements received are as expected
- // (whether the elements were received one in each interval is not verified)
- assert(output.toList === expectedOutput.toList)
-
- Utils.deleteRecursively(testDir)
+ test("file input stream - newFilesOnly = true") {
+ testFileStream(newFilesOnly = true)
+ }
- // Enable manual clock back again for other tests
- conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ test("file input stream - newFilesOnly = false") {
+ testFileStream(newFilesOnly = false)
}
test("multi-thread receiver") {
@@ -180,7 +144,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
assert(output.sum === numTotalRecords)
}
- test("queue input stream - oneAtATime=true") {
+ test("queue input stream - oneAtATime = true") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val queue = new SynchronizedQueue[RDD[String]]()
@@ -223,7 +187,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
}
- test("queue input stream - oneAtATime=false") {
+ test("queue input stream - oneAtATime = false") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val queue = new SynchronizedQueue[RDD[String]]()
@@ -268,6 +232,50 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
assert(output(i) === expectedOutput(i))
}
}
+
+ def testFileStream(newFilesOnly: Boolean) {
+ var ssc: StreamingContext = null
+ val testDir: File = null
+ try {
+ val testDir = Utils.createTempDir()
+ val existingFile = new File(testDir, "0")
+ Files.write("0\n", existingFile, Charset.forName("UTF-8"))
+
+ Thread.sleep(1000)
+ // Set up the streaming context and input streams
+ val newConf = conf.clone.set(
+ "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
+ ssc = new StreamingContext(newConf, batchDuration)
+ val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
+ testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+ val outputStream = new TestOutputStream(fileStream, outputBuffer)
+ outputStream.register()
+ ssc.start()
+
+ // Create files in the directory
+ val input = Seq(1, 2, 3, 4, 5)
+ input.foreach { i =>
+ Thread.sleep(batchDuration.milliseconds)
+ val file = new File(testDir, i.toString)
+ Files.write(i + "\n", file, Charset.forName("UTF-8"))
+ logInfo("Created file " + file)
+ }
+
+ // Verify that all the files have been read
+ val expectedOutput = if (newFilesOnly) {
+ input.map(_.toString).toSet
+ } else {
+ (Seq(0) ++ input).map(_.toString).toSet
+ }
+ eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) {
+ assert(outputBuffer.flatten.toSet === expectedOutput)
+ }
+ } finally {
+ if (ssc != null) ssc.stop()
+ if (testDir != null) Utils.deleteRecursively(testDir)
+ }
+ }
}