aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala25
2 files changed, 42 insertions, 17 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
index e7ba901945..d54ed44b43 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -61,13 +61,29 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
* Whether to scan latest files first. If it's true, when the source finds unprocessed files in a
* trigger, it will first process the latest files.
*/
- val latestFirst: Boolean = parameters.get("latestFirst").map { str =>
- try {
- str.toBoolean
- } catch {
- case _: IllegalArgumentException =>
- throw new IllegalArgumentException(
- s"Invalid value '$str' for option 'latestFirst', must be 'true' or 'false'")
- }
- }.getOrElse(false)
+ val latestFirst: Boolean = withBooleanParameter("latestFirst", false)
+
+ /**
+ * Whether to check new files based on only the filename instead of on the full path.
+ *
+ * With this set to `true`, the following files would be considered as the same file, because
+ * their filenames, "dataset.txt", are the same:
+ * - "file:///dataset.txt"
+ * - "s3://a/dataset.txt"
+ * - "s3n://a/b/dataset.txt"
+ * - "s3a://a/b/c/dataset.txt"
+ */
+ val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
+
+ private def withBooleanParameter(name: String, default: Boolean) = {
+ parameters.get(name).map { str =>
+ try {
+ str.toBoolean
+ } catch {
+ case _: IllegalArgumentException =>
+ throw new IllegalArgumentException(
+ s"Invalid value '$str' for option '$name', must be 'true' or 'false'")
+ }
+ }.getOrElse(default)
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 0f09b0a0c8..411a15ffce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.streaming
+import java.net.URI
+
import scala.collection.JavaConverters._
import org.apache.hadoop.fs.{FileStatus, Path}
@@ -79,9 +81,16 @@ class FileStreamSource(
sourceOptions.maxFileAgeMs
}
+ private val fileNameOnly = sourceOptions.fileNameOnly
+ if (fileNameOnly) {
+ logWarning("'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using " +
+ "UUID), otherwise, files with the same name but under different paths will be considered " +
+ "the same and causes data lost.")
+ }
+
/** A mapping from a file that we have processed to some timestamp it was last modified. */
// Visible for testing and debugging in production.
- val seenFiles = new SeenFilesMap(maxFileAgeMs)
+ val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
metadataLog.allFiles().foreach { entry =>
seenFiles.add(entry.path, entry.timestamp)
@@ -268,7 +277,7 @@ object FileStreamSource {
* To prevent the hash map from growing indefinitely, a purge function is available to
* remove files "maxAgeMs" older than the latest file.
*/
- class SeenFilesMap(maxAgeMs: Long) {
+ class SeenFilesMap(maxAgeMs: Long, fileNameOnly: Boolean) {
require(maxAgeMs >= 0)
/** Mapping from file to its timestamp. */
@@ -280,9 +289,13 @@ object FileStreamSource {
/** Timestamp for the last purge operation. */
private var lastPurgeTimestamp: Timestamp = 0L
+ @inline private def stripPathIfNecessary(path: String) = {
+ if (fileNameOnly) new Path(new URI(path)).getName else path
+ }
+
/** Add a new file to the map. */
def add(path: String, timestamp: Timestamp): Unit = {
- map.put(path, timestamp)
+ map.put(stripPathIfNecessary(path), timestamp)
if (timestamp > latestTimestamp) {
latestTimestamp = timestamp
}
@@ -295,7 +308,7 @@ object FileStreamSource {
def isNewFile(path: String, timestamp: Timestamp): Boolean = {
// Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that
// is older than (latestTimestamp - maxAgeMs) but has not been purged yet.
- timestamp >= lastPurgeTimestamp && !map.containsKey(path)
+ timestamp >= lastPurgeTimestamp && !map.containsKey(stripPathIfNecessary(path))
}
/** Removes aged entries and returns the number of files removed. */
@@ -314,9 +327,5 @@ object FileStreamSource {
}
def size: Int = map.size()
-
- def allEntries: Seq[(String, Timestamp)] = {
- map.asScala.toSeq
- }
}
}