aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2017-03-08 14:35:07 -0800
committerBurak Yavuz <brkyvz@gmail.com>2017-03-08 14:35:07 -0800
commita3648b5d4f99ff9461d02f53e9ec71787a3abf51 (patch)
treed35ca4eb741d7bb07c2a2aaefee867a5eef8bdaa /sql
parent455129020ca7f6a162f6f2486a87cc43512cfd2c (diff)
downloadspark-a3648b5d4f99ff9461d02f53e9ec71787a3abf51.tar.gz
spark-a3648b5d4f99ff9461d02f53e9ec71787a3abf51.tar.bz2
spark-a3648b5d4f99ff9461d02f53e9ec71787a3abf51.zip
[SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource
## What changes were proposed in this pull request? **The Problem** There is a file stream source option called maxFileAge which limits how old the files can be, relative the latest file that has been seen. This is used to limit the files that need to be remembered as "processed". Files older than the latest processed files are ignored. This values is by default 7 days. This causes a problem when both latestFirst = true maxFilesPerTrigger > total files to be processed. Here is what happens in all combinations 1) latestFirst = false - Since files are processed in order, there wont be any unprocessed file older than the latest processed file. All files will be processed. 2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is not, then all old files get processed in the first batch, and so no file is left behind. 3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch process the latest X files. That sets the threshold latest file - maxFileAge, so files older than this threshold will never be considered for processing. The bug is with case 3. **The Solution** Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set. ## How was this patch tested? Regression test in `FileStreamSourceSuite` Author: Burak Yavuz <brkyvz@gmail.com> Closes #17153 from brkyvz/maxFileAge.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala82
3 files changed, 63 insertions, 38 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
index 2f802d782f..e7ba901945 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -38,7 +38,10 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
}
/**
- * Maximum age of a file that can be found in this directory, before it is deleted.
+ * Maximum age of a file that can be found in this directory, before it is ignored. For the
+ * first batch all files will be considered valid. If `latestFirst` is set to `true` and
+ * `maxFilesPerTrigger` is set, then this parameter will be ignored, because old files that are
+ * valid, and should be processed, may be ignored. Please refer to SPARK-19813 for details.
*
* The max age is specified with respect to the timestamp of the latest file, and not the
* timestamp of the current system. That this means if the last file has timestamp 1000, and the
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 6a7263ca45..0f09b0a0c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -66,23 +66,29 @@ class FileStreamSource(
private val fileSortOrder = if (sourceOptions.latestFirst) {
logWarning(
- """'latestFirst' is true. New files will be processed first.
- |It may affect the watermark value""".stripMargin)
+ """'latestFirst' is true. New files will be processed first, which may affect the watermark
+ |value. In addition, 'maxFileAge' will be ignored.""".stripMargin)
implicitly[Ordering[Long]].reverse
} else {
implicitly[Ordering[Long]]
}
+ private val maxFileAgeMs: Long = if (sourceOptions.latestFirst && maxFilesPerBatch.isDefined) {
+ Long.MaxValue
+ } else {
+ sourceOptions.maxFileAgeMs
+ }
+
/** A mapping from a file that we have processed to some timestamp it was last modified. */
// Visible for testing and debugging in production.
- val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
+ val seenFiles = new SeenFilesMap(maxFileAgeMs)
metadataLog.allFiles().foreach { entry =>
seenFiles.add(entry.path, entry.timestamp)
}
seenFiles.purge()
- logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = ${sourceOptions.maxFileAgeMs}")
+ logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = $maxFileAgeMs")
/**
* Returns the maximum offset that can be retrieved from the source.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 1586850c77..0517b0a800 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1173,6 +1173,41 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
SerializedOffset(str.trim)
}
+ private def runTwoBatchesAndVerifyResults(
+ src: File,
+ latestFirst: Boolean,
+ firstBatch: String,
+ secondBatch: String,
+ maxFileAge: Option[String] = None): Unit = {
+ val srcOptions = Map("latestFirst" -> latestFirst.toString, "maxFilesPerTrigger" -> "1") ++
+ maxFileAge.map("maxFileAge" -> _)
+ val fileStream = createFileStream(
+ "text",
+ src.getCanonicalPath,
+ options = srcOptions)
+ val clock = new StreamManualClock()
+ testStream(fileStream)(
+ StartStream(trigger = ProcessingTime(10), triggerClock = clock),
+ AssertOnQuery { _ =>
+ // Block until the first batch finishes.
+ eventually(timeout(streamingTimeout)) {
+ assert(clock.isStreamWaitingAt(0))
+ }
+ true
+ },
+ CheckLastBatch(firstBatch),
+ AdvanceManualClock(10),
+ AssertOnQuery { _ =>
+ // Block until the second batch finishes.
+ eventually(timeout(streamingTimeout)) {
+ assert(clock.isStreamWaitingAt(10))
+ }
+ true
+ },
+ CheckLastBatch(secondBatch)
+ )
+ }
+
test("FileStreamSource - latestFirst") {
withTempDir { src =>
// Prepare two files: 1.txt, 2.txt, and make sure they have different modified time.
@@ -1180,42 +1215,23 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val f2 = stringToFile(new File(src, "2.txt"), "2")
f2.setLastModified(f1.lastModified + 1000)
- def runTwoBatchesAndVerifyResults(
- latestFirst: Boolean,
- firstBatch: String,
- secondBatch: String): Unit = {
- val fileStream = createFileStream(
- "text",
- src.getCanonicalPath,
- options = Map("latestFirst" -> latestFirst.toString, "maxFilesPerTrigger" -> "1"))
- val clock = new StreamManualClock()
- testStream(fileStream)(
- StartStream(trigger = ProcessingTime(10), triggerClock = clock),
- AssertOnQuery { _ =>
- // Block until the first batch finishes.
- eventually(timeout(streamingTimeout)) {
- assert(clock.isStreamWaitingAt(0))
- }
- true
- },
- CheckLastBatch(firstBatch),
- AdvanceManualClock(10),
- AssertOnQuery { _ =>
- // Block until the second batch finishes.
- eventually(timeout(streamingTimeout)) {
- assert(clock.isStreamWaitingAt(10))
- }
- true
- },
- CheckLastBatch(secondBatch)
- )
- }
-
// Read oldest files first, so the first batch is "1", and the second batch is "2".
- runTwoBatchesAndVerifyResults(latestFirst = false, firstBatch = "1", secondBatch = "2")
+ runTwoBatchesAndVerifyResults(src, latestFirst = false, firstBatch = "1", secondBatch = "2")
// Read latest files first, so the first batch is "2", and the second batch is "1".
- runTwoBatchesAndVerifyResults(latestFirst = true, firstBatch = "2", secondBatch = "1")
+ runTwoBatchesAndVerifyResults(src, latestFirst = true, firstBatch = "2", secondBatch = "1")
+ }
+ }
+
+ test("SPARK-19813: Ignore maxFileAge when maxFilesPerTrigger and latestFirst is used") {
+ withTempDir { src =>
+ // Prepare two files: 1.txt, 2.txt, and make sure they have different modified time.
+ val f1 = stringToFile(new File(src, "1.txt"), "1")
+ val f2 = stringToFile(new File(src, "2.txt"), "2")
+ f2.setLastModified(f1.lastModified + 3600 * 1000 /* 1 hour later */)
+
+ runTwoBatchesAndVerifyResults(src, latestFirst = true, firstBatch = "2", secondBatch = "1",
+ maxFileAge = Some("1m") /* 1 minute */)
}
}