From cb0e9b0980f38befe88bf52aa037fe33262730f7 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 24 Nov 2014 13:50:20 -0800 Subject: [SPARK-4518][SPARK-4519][Streaming] Refactored file stream to prevent files from being processed multiple times Because of a corner case, a file already selected for batch t can get considered again for batch t+2. This refactoring fixes it by remembering all the files selected in the last 1 minute, so that this corner case does not arise. Also uses spark context's hadoop configuration to access the file system API for listing directories. pwendell Please take look. I still have not run long-running integration tests, so I cannot say for sure whether this has indeed solved the issue. You could do a first pass on this in the meantime. Author: Tathagata Das Closes #3419 from tdas/filestream-fix2 and squashes the following commits: c19dd8a [Tathagata Das] Addressed PR comments. 513b608 [Tathagata Das] Updated docs. d364faf [Tathagata Das] Added the current time condition back 5526222 [Tathagata Das] Removed unnecessary imports. 38bb736 [Tathagata Das] Fix long line. 203bbc7 [Tathagata Das] Un-ignore tests. eaef4e1 [Tathagata Das] Fixed SPARK-4519 9dbd40a [Tathagata Das] Refactored FileInputDStream to remember last few batches. --- .../apache/spark/streaming/dstream/DStream.scala | 2 +- .../spark/streaming/dstream/FileInputDStream.scala | 291 +++++++++++++-------- .../apache/spark/streaming/CheckpointSuite.scala | 2 +- .../apache/spark/streaming/InputStreamsSuite.scala | 106 ++++---- 4 files changed, 245 insertions(+), 156 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index eabd61d713..dbf1ebbaf6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -254,7 +254,7 @@ abstract class DStream[T: ClassTag] ( } private[streaming] def remember(duration: Duration) { - if (duration != null && duration > rememberDuration) { + if (duration != null && (rememberDuration == null || duration > rememberDuration)) { rememberDuration = duration logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 55d6cf6a78..5f13fdc557 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -17,18 +17,55 @@ package org.apache.spark.streaming.dstream -import java.io.{ObjectInputStream, IOException} -import scala.collection.mutable.{HashSet, HashMap} +import java.io.{IOException, ObjectInputStream} + +import scala.collection.mutable import scala.reflect.ClassTag + import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.UnionRDD -import org.apache.spark.streaming.{StreamingContext, Time} -import org.apache.spark.util.{TimeStampedHashMap, Utils} +import org.apache.spark.rdd.{RDD, UnionRDD} +import org.apache.spark.streaming._ +import org.apache.spark.util.{TimeStampedHashMap, Utils} +/** + * This class represents an input stream that monitors a Hadoop-compatible filesystem for new + * files and creates a stream out of them. The way it works as follows. + * + * At each batch interval, the file system is queried for files in the given directory and + * detected new files are selected for that batch. In this case "new" means files that + * became visible to readers during that time period. Some extra care is needed to deal + * with the fact that files may become visible after they are created. For this purpose, this + * class remembers the information about the files selected in past batches for + * a certain duration (say, "remember window") as shown in the figure below. + * + * |<----- remember window ----->| + * ignore threshold --->| |<--- current batch time + * |____.____.____.____.____.____| + * | | | | | | | + * ---------------------|----|----|----|----|----|----|-----------------------> Time + * |____|____|____|____|____|____| + * remembered batches + * + * The trailing end of the window is the "ignore threshold" and all files whose mod times + * are less than this threshold are assumed to have already been selected and are therefore + * ignored. Files whose mod times are within the "remember window" are checked against files + * that have already been selected. At a high level, this is how new files are identified in + * each batch - files whose mod times are greater than the ignore threshold and + * have not been considered within the remember window. See the documentation on the method + * `isNewFile` for more details. + * + * This makes some assumptions from the underlying file system that the system is monitoring. + * - The clock of the file system is assumed to synchronized with the clock of the machine running + * the streaming app. + * - If a file is to be visible in the directory listings, it must be visible within a certain + * duration of the mod time of the file. This duration is the "remember window", which is set to + * 1 minute (see `FileInputDStream.MIN_REMEMBER_DURATION`). Otherwise, the file will never be + * selected as the mod time will be less than the ignore threshold when it becomes visible. + * - Once a file is visible, the mod time cannot change. If it does due to appends, then the + * processing semantics are undefined. + */ private[streaming] class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag]( @transient ssc_ : StreamingContext, @@ -37,22 +74,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas newFilesOnly: Boolean = true) extends InputDStream[(K, V)](ssc_) { + // Data to be saved as part of the streaming checkpoints protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData - // files found in the last interval - private val lastFoundFiles = new HashSet[String] + // Initial ignore threshold based on which old, existing files in the directory (at the time of + // starting the streaming application) will be ignored or considered + private val initialModTimeIgnoreThreshold = if (newFilesOnly) System.currentTimeMillis() else 0L + + /* + * Make sure that the information of files selected in the last few batches are remembered. + * This would allow us to filter away not-too-old files which have already been recently + * selected and processed. + */ + private val numBatchesToRemember = FileInputDStream.calculateNumBatchesToRemember(slideDuration) + private val durationToRemember = slideDuration * numBatchesToRemember + remember(durationToRemember) - // Files with mod time earlier than this is ignored. This is updated every interval - // such that in the current interval, files older than any file found in the - // previous interval will be ignored. Obviously this time keeps moving forward. - private var ignoreTime = if (newFilesOnly) System.currentTimeMillis() else 0L + // Map of batch-time to selected file info for the remembered batches + @transient private[streaming] var batchTimeToSelectedFiles = + new mutable.HashMap[Time, Array[String]] + + // Set of files that were selected in the remembered batches + @transient private var recentlySelectedFiles = new mutable.HashSet[String]() + + // Read-through cache of file mod times, used to speed up mod time lookups + @transient private var fileToModTime = new TimeStampedHashMap[String, Long](true) + + // Timestamp of the last round of finding files + @transient private var lastNewFileFindingTime = 0L - // Latest file mod time seen till any point of time @transient private var path_ : Path = null @transient private var fs_ : FileSystem = null - @transient private[streaming] var files = new HashMap[Time, Array[String]] - @transient private var fileModTimes = new TimeStampedHashMap[String, Long](true) - @transient private var lastNewFileFindingTime = 0L override def start() { } @@ -68,54 +120,113 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas * the previous call. */ override def compute(validTime: Time): Option[RDD[(K, V)]] = { - assert(validTime.milliseconds >= ignoreTime, - "Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]") - // Find new files - val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds) + val newFiles = findNewFiles(validTime.milliseconds) logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) - if (!newFiles.isEmpty) { - lastFoundFiles.clear() - lastFoundFiles ++= newFiles - ignoreTime = minNewFileModTime - } - files += ((validTime, newFiles.toArray)) + batchTimeToSelectedFiles += ((validTime, newFiles)) + recentlySelectedFiles ++= newFiles Some(filesToRDD(newFiles)) } /** Clear the old time-to-files mappings along with old RDDs */ protected[streaming] override def clearMetadata(time: Time) { super.clearMetadata(time) - val oldFiles = files.filter(_._1 < (time - rememberDuration)) - files --= oldFiles.keys + val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration)) + batchTimeToSelectedFiles --= oldFiles.keys + recentlySelectedFiles --= oldFiles.values.flatten logInfo("Cleared " + oldFiles.size + " old files that were older than " + (time - rememberDuration) + ": " + oldFiles.keys.mkString(", ")) logDebug("Cleared files are:\n" + oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) // Delete file mod times that weren't accessed in the last round of getting new files - fileModTimes.clearOldValues(lastNewFileFindingTime - 1) + fileToModTime.clearOldValues(lastNewFileFindingTime - 1) } /** - * Find files which have modification timestamp <= current time and return a 3-tuple of - * (new files found, latest modification time among them, files with latest modification time) + * Find new files for the batch of `currentTime`. This is done by first calculating the + * ignore threshold for file mod times, and then getting a list of files filtered based on + * the current batch time and the ignore threshold. The ignore threshold is the max of + * initial ignore threshold and the trailing end of the remember window (that is, which ever + * is later in time). */ - private def findNewFiles(currentTime: Long): (Seq[String], Long) = { - logDebug("Trying to get new files for time " + currentTime) - lastNewFileFindingTime = System.currentTimeMillis - val filter = new CustomPathFilter(currentTime) - val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) - val timeTaken = System.currentTimeMillis - lastNewFileFindingTime - logInfo("Finding new files took " + timeTaken + " ms") - logDebug("# cached file times = " + fileModTimes.size) - if (timeTaken > slideDuration.milliseconds) { - logWarning( - "Time taken to find new files exceeds the batch size. " + - "Consider increasing the batch size or reduceing the number of " + - "files in the monitored directory." + private def findNewFiles(currentTime: Long): Array[String] = { + try { + lastNewFileFindingTime = System.currentTimeMillis + + // Calculate ignore threshold + val modTimeIgnoreThreshold = math.max( + initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting + currentTime - durationToRemember.milliseconds // trailing end of the remember window ) + logDebug(s"Getting new files for time $currentTime, " + + s"ignoring files older than $modTimeIgnoreThreshold") + val filter = new PathFilter { + def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) + } + val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) + val timeTaken = System.currentTimeMillis - lastNewFileFindingTime + logInfo("Finding new files took " + timeTaken + " ms") + logDebug("# cached file times = " + fileToModTime.size) + if (timeTaken > slideDuration.milliseconds) { + logWarning( + "Time taken to find new files exceeds the batch size. " + + "Consider increasing the batch size or reducing the number of " + + "files in the monitored directory." + ) + } + newFiles + } catch { + case e: Exception => + logWarning("Error finding new files", e) + reset() + Array.empty + } + } + + /** + * Identify whether the given `path` is a new file for the batch of `currentTime`. For it to be + * accepted, it has to pass the following criteria. + * - It must pass the user-provided file filter. + * - It must be newer than the ignore threshold. It is assumed that files older than the ignore + * threshold have already been considered or are existing files before start + * (when newFileOnly = true). + * - It must not be present in the recently selected files that this class remembers. + * - It must not be newer than the time of the batch (i.e. `currentTime` for which this + * file is being tested. This can occur if the driver was recovered, and the missing batches + * (during downtime) are being generated. In that case, a batch of time T may be generated + * at time T+x. Say x = 5. If that batch T contains file of mod time T+5, then bad things can + * happen. Let's say the selected files are remembered for 60 seconds. At time t+61, + * the batch of time t is forgotten, and the ignore threshold is still T+1. + * The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1). + * Hence they can get selected as new files again. To prevent this, files whose mod time is more + * than current batch time are not considered. + */ + private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = { + val pathStr = path.toString + // Reject file if it does not satisfy filter + if (!filter(path)) { + logDebug(s"$pathStr rejected by filter") + return false + } + // Reject file if it was created before the ignore time + val modTime = getFileModTime(path) + if (modTime <= modTimeIgnoreThreshold) { + // Use <= instead of < to avoid SPARK-4518 + logDebug(s"$pathStr ignored as mod time $modTime <= ignore time $modTimeIgnoreThreshold") + return false } - (newFiles, filter.minNewFileModTime) + // Reject file if mod time > current batch time + if (modTime > currentTime) { + logDebug(s"$pathStr not selected as mod time $modTime > current time $currentTime") + return false + } + // Reject file if it was considered earlier + if (recentlySelectedFiles.contains(pathStr)) { + logDebug(s"$pathStr already considered") + return false + } + logDebug(s"$pathStr accepted with mod time $modTime") + return true } /** Generate one RDD from an array of files */ @@ -132,21 +243,21 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas new UnionRDD(context.sparkContext, fileRDDs) } + /** Get file mod time from cache or fetch it from the file system */ + private def getFileModTime(path: Path) = { + fileToModTime.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime()) + } + private def directoryPath: Path = { if (path_ == null) path_ = new Path(directory) path_ } private def fs: FileSystem = { - if (fs_ == null) fs_ = directoryPath.getFileSystem(new Configuration()) + if (fs_ == null) fs_ = directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration) fs_ } - private def getFileModTime(path: Path) = { - // Get file mod time from cache or fetch it from the file system - fileModTimes.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime()) - } - private def reset() { fs_ = null } @@ -155,9 +266,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() - generatedRDDs = new HashMap[Time, RDD[(K,V)]] () - files = new HashMap[Time, Array[String]] - fileModTimes = new TimeStampedHashMap[String, Long](true) + generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] () + batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]() + recentlySelectedFiles = new mutable.HashSet[String]() + fileToModTime = new TimeStampedHashMap[String, Long](true) } /** @@ -167,11 +279,11 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas private[streaming] class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) { - def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]] + def hadoopFiles = data.asInstanceOf[mutable.HashMap[Time, Array[String]]] override def update(time: Time) { hadoopFiles.clear() - hadoopFiles ++= files + hadoopFiles ++= batchTimeToSelectedFiles } override def cleanup(time: Time) { } @@ -182,7 +294,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas // Restore the metadata in both files and generatedRDDs logInfo("Restoring files for time " + t + " - " + f.mkString("[", ", ", "]") ) - files += ((t, f)) + batchTimeToSelectedFiles += ((t, f)) + recentlySelectedFiles ++= f generatedRDDs += ((t, filesToRDD(f))) } } @@ -193,57 +306,25 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]" } } +} + +private[streaming] +object FileInputDStream { /** - * Custom PathFilter class to find new files that - * ... have modification time more than ignore time - * ... have not been seen in the last interval - * ... have modification time less than maxModTime + * Minimum duration of remembering the information of selected files. Files with mod times + * older than this "window" of remembering will be ignored. So if new files are visible + * within this window, then the file will get selected in the next batch. */ - private[streaming] - class CustomPathFilter(maxModTime: Long) extends PathFilter { + private val MIN_REMEMBER_DURATION = Minutes(1) - // Minimum of the mod times of new files found in the current interval - var minNewFileModTime = -1L + def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") - def accept(path: Path): Boolean = { - try { - if (!filter(path)) { // Reject file if it does not satisfy filter - logDebug("Rejected by filter " + path) - return false - } - // Reject file if it was found in the last interval - if (lastFoundFiles.contains(path.toString)) { - logDebug("Mod time equal to last mod time, but file considered already") - return false - } - val modTime = getFileModTime(path) - logDebug("Mod time for " + path + " is " + modTime) - if (modTime < ignoreTime) { - // Reject file if it was created before the ignore time (or, before last interval) - logDebug("Mod time " + modTime + " less than ignore time " + ignoreTime) - return false - } else if (modTime > maxModTime) { - // Reject file if it is too new that considering it may give errors - logDebug("Mod time more than ") - return false - } - if (minNewFileModTime < 0 || modTime < minNewFileModTime) { - minNewFileModTime = modTime - } - logDebug("Accepted " + path) - } catch { - case fnfe: java.io.FileNotFoundException => - logWarning("Error finding new files", fnfe) - reset() - return false - } - true - } + /** + * Calculate the number of last batches to remember, such that all the files selected in + * at least last MIN_REMEMBER_DURATION duration can be remembered. + */ + def calculateNumBatchesToRemember(batchDuration: Duration): Int = { + math.ceil(MIN_REMEMBER_DURATION.milliseconds.toDouble / batchDuration.milliseconds).toInt } } - -private[streaming] -object FileInputDStream { - def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") -} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index e5592e52b0..77ff1ca780 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -265,7 +265,7 @@ class CheckpointSuite extends TestSuiteBase { // Verify whether files created have been recorded correctly or not var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] - def recordedFiles = fileInputDStream.files.values.flatMap(x => x) + def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten assert(!recordedFiles.filter(_.endsWith("1")).isEmpty) assert(!recordedFiles.filter(_.endsWith("2")).isEmpty) assert(!recordedFiles.filter(_.endsWith("3")).isEmpty) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index fa04fa326e..307052a4a9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -28,9 +28,12 @@ import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue} +import scala.concurrent.duration._ +import scala.language.postfixOps import com.google.common.io.Files import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.Eventually._ import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel @@ -38,6 +41,9 @@ import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils import org.apache.spark.streaming.receiver.{ActorHelper, Receiver} import org.apache.spark.rdd.RDD +import org.apache.hadoop.io.{Text, LongWritable} +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat +import org.apache.hadoop.fs.Path class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { @@ -91,54 +97,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } - test("file input stream") { - // Disable manual clock as FileInputDStream does not work with manual clock - conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") - - // Set up the streaming context and input streams - val testDir = Utils.createTempDir() - val ssc = new StreamingContext(conf, batchDuration) - val fileStream = ssc.textFileStream(testDir.toString) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - def output = outputBuffer.flatMap(x => x) - val outputStream = new TestOutputStream(fileStream, outputBuffer) - outputStream.register() - ssc.start() - - // Create files in the temporary directory so that Spark Streaming can read data from it - val input = Seq(1, 2, 3, 4, 5) - val expectedOutput = input.map(_.toString) - Thread.sleep(1000) - for (i <- 0 until input.size) { - val file = new File(testDir, i.toString) - Files.write(input(i) + "\n", file, Charset.forName("UTF-8")) - logInfo("Created file " + file) - Thread.sleep(batchDuration.milliseconds) - Thread.sleep(1000) - } - val startTime = System.currentTimeMillis() - Thread.sleep(1000) - val timeTaken = System.currentTimeMillis() - startTime - assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") - logInfo("Stopping context") - ssc.stop() - - // Verify whether data received by Spark Streaming was as expected - logInfo("--------------------------------") - logInfo("output, size = " + outputBuffer.size) - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("expected output, size = " + expectedOutput.size) - expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("--------------------------------") - - // Verify whether all the elements received are as expected - // (whether the elements were received one in each interval is not verified) - assert(output.toList === expectedOutput.toList) - - Utils.deleteRecursively(testDir) + test("file input stream - newFilesOnly = true") { + testFileStream(newFilesOnly = true) + } - // Enable manual clock back again for other tests - conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + test("file input stream - newFilesOnly = false") { + testFileStream(newFilesOnly = false) } test("multi-thread receiver") { @@ -180,7 +144,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(output.sum === numTotalRecords) } - test("queue input stream - oneAtATime=true") { + test("queue input stream - oneAtATime = true") { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) val queue = new SynchronizedQueue[RDD[String]]() @@ -223,7 +187,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } - test("queue input stream - oneAtATime=false") { + test("queue input stream - oneAtATime = false") { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) val queue = new SynchronizedQueue[RDD[String]]() @@ -268,6 +232,50 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(output(i) === expectedOutput(i)) } } + + def testFileStream(newFilesOnly: Boolean) { + var ssc: StreamingContext = null + val testDir: File = null + try { + val testDir = Utils.createTempDir() + val existingFile = new File(testDir, "0") + Files.write("0\n", existingFile, Charset.forName("UTF-8")) + + Thread.sleep(1000) + // Set up the streaming context and input streams + val newConf = conf.clone.set( + "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") + ssc = new StreamingContext(newConf, batchDuration) + val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( + testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString) + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + val outputStream = new TestOutputStream(fileStream, outputBuffer) + outputStream.register() + ssc.start() + + // Create files in the directory + val input = Seq(1, 2, 3, 4, 5) + input.foreach { i => + Thread.sleep(batchDuration.milliseconds) + val file = new File(testDir, i.toString) + Files.write(i + "\n", file, Charset.forName("UTF-8")) + logInfo("Created file " + file) + } + + // Verify that all the files have been read + val expectedOutput = if (newFilesOnly) { + input.map(_.toString).toSet + } else { + (Seq(0) ++ input).map(_.toString).toSet + } + eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) { + assert(outputBuffer.flatten.toSet === expectedOutput) + } + } finally { + if (ssc != null) ssc.stop() + if (testDir != null) Utils.deleteRecursively(testDir) + } + } } -- cgit v1.2.3