aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorLiwei Lin <lwlin7@gmail.com>2017-03-09 11:02:44 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-03-09 11:02:44 -0800
commit40da4d181d648308de85fdcabc5c098ee861949a (patch)
tree270f1d888c4b879cb4c525f9208352e98e949624 /sql
parent3232e54f2fcb8d2072cba4bc763ef29d5d8d325f (diff)
downloadspark-40da4d181d648308de85fdcabc5c098ee861949a.tar.gz
spark-40da4d181d648308de85fdcabc5c098ee861949a.tar.bz2
spark-40da4d181d648308de85fdcabc5c098ee861949a.zip
[SPARK-19715][STRUCTURED STREAMING] Option to Strip Paths in FileSource
## What changes were proposed in this pull request? Today, we compare the whole path when deciding if a file is new in the FileSource for structured streaming. However, this would cause false negatives in the case where the path has changed in a cosmetic way (i.e. changing `s3n` to `s3a`). This patch adds an option `fileNameOnly` that causes the new file check to be based only on the filename (but still store the whole path in the log). ## Usage ```scala spark .readStream .option("fileNameOnly", true) .text("s3n://bucket/dir1/dir2") .writeStream ... ``` ## How was this patch tested? Added a test case Author: Liwei Lin <lwlin7@gmail.com> Closes #17120 from lw-lin/filename-only.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala25
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala22
3 files changed, 62 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
index e7ba901945..d54ed44b43 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -61,13 +61,29 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
* Whether to scan latest files first. If it's true, when the source finds unprocessed files in a
* trigger, it will first process the latest files.
*/
- val latestFirst: Boolean = parameters.get("latestFirst").map { str =>
- try {
- str.toBoolean
- } catch {
- case _: IllegalArgumentException =>
- throw new IllegalArgumentException(
- s"Invalid value '$str' for option 'latestFirst', must be 'true' or 'false'")
- }
- }.getOrElse(false)
+ val latestFirst: Boolean = withBooleanParameter("latestFirst", false)
+
+ /**
+ * Whether to check new files based on only the filename instead of on the full path.
+ *
+ * With this set to `true`, the following files would be considered as the same file, because
+ * their filenames, "dataset.txt", are the same:
+ * - "file:///dataset.txt"
+ * - "s3://a/dataset.txt"
+ * - "s3n://a/b/dataset.txt"
+ * - "s3a://a/b/c/dataset.txt"
+ */
+ val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
+
+ private def withBooleanParameter(name: String, default: Boolean) = {
+ parameters.get(name).map { str =>
+ try {
+ str.toBoolean
+ } catch {
+ case _: IllegalArgumentException =>
+ throw new IllegalArgumentException(
+ s"Invalid value '$str' for option '$name', must be 'true' or 'false'")
+ }
+ }.getOrElse(default)
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 0f09b0a0c8..411a15ffce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.streaming
+import java.net.URI
+
import scala.collection.JavaConverters._
import org.apache.hadoop.fs.{FileStatus, Path}
@@ -79,9 +81,16 @@ class FileStreamSource(
sourceOptions.maxFileAgeMs
}
+ private val fileNameOnly = sourceOptions.fileNameOnly
+ if (fileNameOnly) {
+ logWarning("'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using " +
+ "UUID), otherwise, files with the same name but under different paths will be considered " +
+ "the same and causes data lost.")
+ }
+
/** A mapping from a file that we have processed to some timestamp it was last modified. */
// Visible for testing and debugging in production.
- val seenFiles = new SeenFilesMap(maxFileAgeMs)
+ val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
metadataLog.allFiles().foreach { entry =>
seenFiles.add(entry.path, entry.timestamp)
@@ -268,7 +277,7 @@ object FileStreamSource {
* To prevent the hash map from growing indefinitely, a purge function is available to
* remove files "maxAgeMs" older than the latest file.
*/
- class SeenFilesMap(maxAgeMs: Long) {
+ class SeenFilesMap(maxAgeMs: Long, fileNameOnly: Boolean) {
require(maxAgeMs >= 0)
/** Mapping from file to its timestamp. */
@@ -280,9 +289,13 @@ object FileStreamSource {
/** Timestamp for the last purge operation. */
private var lastPurgeTimestamp: Timestamp = 0L
+ @inline private def stripPathIfNecessary(path: String) = {
+ if (fileNameOnly) new Path(new URI(path)).getName else path
+ }
+
/** Add a new file to the map. */
def add(path: String, timestamp: Timestamp): Unit = {
- map.put(path, timestamp)
+ map.put(stripPathIfNecessary(path), timestamp)
if (timestamp > latestTimestamp) {
latestTimestamp = timestamp
}
@@ -295,7 +308,7 @@ object FileStreamSource {
def isNewFile(path: String, timestamp: Timestamp): Boolean = {
// Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that
// is older than (latestTimestamp - maxAgeMs) but has not been purged yet.
- timestamp >= lastPurgeTimestamp && !map.containsKey(path)
+ timestamp >= lastPurgeTimestamp && !map.containsKey(stripPathIfNecessary(path))
}
/** Removes aged entries and returns the number of files removed. */
@@ -314,9 +327,5 @@ object FileStreamSource {
}
def size: Int = map.size()
-
- def allEntries: Seq[(String, Timestamp)] = {
- map.asScala.toSeq
- }
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 0517b0a800..f705da3d6a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1236,7 +1236,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
test("SeenFilesMap") {
- val map = new SeenFilesMap(maxAgeMs = 10)
+ val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false)
map.add("a", 5)
assert(map.size == 1)
@@ -1269,8 +1269,26 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
assert(map.isNewFile("e", 20))
}
+ test("SeenFilesMap with fileNameOnly = true") {
+ val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = true)
+
+ map.add("file:///a/b/c/d", 5)
+ map.add("file:///a/b/c/e", 5)
+ assert(map.size === 2)
+
+ assert(!map.isNewFile("d", 5))
+ assert(!map.isNewFile("file:///d", 5))
+ assert(!map.isNewFile("file:///x/d", 5))
+ assert(!map.isNewFile("file:///x/y/d", 5))
+
+ map.add("s3:///bucket/d", 5)
+ map.add("s3n:///bucket/d", 5)
+ map.add("s3a:///bucket/d", 5)
+ assert(map.size === 2)
+ }
+
test("SeenFilesMap should only consider a file old if it is earlier than last purge time") {
- val map = new SeenFilesMap(maxAgeMs = 10)
+ val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false)
map.add("a", 20)
assert(map.size == 1)