diff options
Diffstat (limited to 'sql/core/src/main')
2 files changed, 42 insertions, 17 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index e7ba901945..d54ed44b43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -61,13 +61,29 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging * Whether to scan latest files first. If it's true, when the source finds unprocessed files in a * trigger, it will first process the latest files. */ - val latestFirst: Boolean = parameters.get("latestFirst").map { str => - try { - str.toBoolean - } catch { - case _: IllegalArgumentException => - throw new IllegalArgumentException( - s"Invalid value '$str' for option 'latestFirst', must be 'true' or 'false'") - } - }.getOrElse(false) + val latestFirst: Boolean = withBooleanParameter("latestFirst", false) + + /** + * Whether to check new files based on only the filename instead of on the full path. + * + * With this set to `true`, the following files would be considered as the same file, because + * their filenames, "dataset.txt", are the same: + * - "file:///dataset.txt" + * - "s3://a/dataset.txt" + * - "s3n://a/b/dataset.txt" + * - "s3a://a/b/c/dataset.txt" + */ + val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false) + + private def withBooleanParameter(name: String, default: Boolean) = { + parameters.get(name).map { str => + try { + str.toBoolean + } catch { + case _: IllegalArgumentException => + throw new IllegalArgumentException( + s"Invalid value '$str' for option '$name', must be 'true' or 'false'") + } + }.getOrElse(default) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 0f09b0a0c8..411a15ffce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.streaming +import java.net.URI + import scala.collection.JavaConverters._ import org.apache.hadoop.fs.{FileStatus, Path} @@ -79,9 +81,16 @@ class FileStreamSource( sourceOptions.maxFileAgeMs } + private val fileNameOnly = sourceOptions.fileNameOnly + if (fileNameOnly) { + logWarning("'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using " + + "UUID), otherwise, files with the same name but under different paths will be considered " + + "the same and causes data lost.") + } + /** A mapping from a file that we have processed to some timestamp it was last modified. */ // Visible for testing and debugging in production. - val seenFiles = new SeenFilesMap(maxFileAgeMs) + val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly) metadataLog.allFiles().foreach { entry => seenFiles.add(entry.path, entry.timestamp) @@ -268,7 +277,7 @@ object FileStreamSource { * To prevent the hash map from growing indefinitely, a purge function is available to * remove files "maxAgeMs" older than the latest file. */ - class SeenFilesMap(maxAgeMs: Long) { + class SeenFilesMap(maxAgeMs: Long, fileNameOnly: Boolean) { require(maxAgeMs >= 0) /** Mapping from file to its timestamp. */ @@ -280,9 +289,13 @@ object FileStreamSource { /** Timestamp for the last purge operation. */ private var lastPurgeTimestamp: Timestamp = 0L + @inline private def stripPathIfNecessary(path: String) = { + if (fileNameOnly) new Path(new URI(path)).getName else path + } + /** Add a new file to the map. */ def add(path: String, timestamp: Timestamp): Unit = { - map.put(path, timestamp) + map.put(stripPathIfNecessary(path), timestamp) if (timestamp > latestTimestamp) { latestTimestamp = timestamp } @@ -295,7 +308,7 @@ object FileStreamSource { def isNewFile(path: String, timestamp: Timestamp): Boolean = { // Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that // is older than (latestTimestamp - maxAgeMs) but has not been purged yet. - timestamp >= lastPurgeTimestamp && !map.containsKey(path) + timestamp >= lastPurgeTimestamp && !map.containsKey(stripPathIfNecessary(path)) } /** Removes aged entries and returns the number of files removed. */ @@ -314,9 +327,5 @@ object FileStreamSource { } def size: Int = map.size() - - def allEntries: Seq[(String, Timestamp)] = { - map.asScala.toSeq - } } } |