aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorLiwei Lin <lwlin7@gmail.com>2017-03-09 11:02:44 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-03-09 11:02:44 -0800
commit40da4d181d648308de85fdcabc5c098ee861949a (patch)
tree270f1d888c4b879cb4c525f9208352e98e949624 /sql/core/src/main
parent3232e54f2fcb8d2072cba4bc763ef29d5d8d325f (diff)
downloadspark-40da4d181d648308de85fdcabc5c098ee861949a.tar.gz
spark-40da4d181d648308de85fdcabc5c098ee861949a.tar.bz2
spark-40da4d181d648308de85fdcabc5c098ee861949a.zip
[SPARK-19715][STRUCTURED STREAMING] Option to Strip Paths in FileSource
## What changes were proposed in this pull request? Today, we compare the whole path when deciding if a file is new in the FileSource for structured streaming. However, this would cause false negatives in the case where the path has changed in a cosmetic way (i.e. changing `s3n` to `s3a`). This patch adds an option `fileNameOnly` that causes the new file check to be based only on the filename (but still store the whole path in the log). ## Usage ```scala spark .readStream .option("fileNameOnly", true) .text("s3n://bucket/dir1/dir2") .writeStream ... ``` ## How was this patch tested? Added a test case Author: Liwei Lin <lwlin7@gmail.com> Closes #17120 from lw-lin/filename-only.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala25
2 files changed, 42 insertions, 17 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
index e7ba901945..d54ed44b43 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -61,13 +61,29 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
* Whether to scan latest files first. If it's true, when the source finds unprocessed files in a
* trigger, it will first process the latest files.
*/
- val latestFirst: Boolean = parameters.get("latestFirst").map { str =>
- try {
- str.toBoolean
- } catch {
- case _: IllegalArgumentException =>
- throw new IllegalArgumentException(
- s"Invalid value '$str' for option 'latestFirst', must be 'true' or 'false'")
- }
- }.getOrElse(false)
+ val latestFirst: Boolean = withBooleanParameter("latestFirst", false)
+
+ /**
+ * Whether to check new files based on only the filename instead of on the full path.
+ *
+ * With this set to `true`, the following files would be considered as the same file, because
+ * their filenames, "dataset.txt", are the same:
+ * - "file:///dataset.txt"
+ * - "s3://a/dataset.txt"
+ * - "s3n://a/b/dataset.txt"
+ * - "s3a://a/b/c/dataset.txt"
+ */
+ val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
+
+ private def withBooleanParameter(name: String, default: Boolean) = {
+ parameters.get(name).map { str =>
+ try {
+ str.toBoolean
+ } catch {
+ case _: IllegalArgumentException =>
+ throw new IllegalArgumentException(
+ s"Invalid value '$str' for option '$name', must be 'true' or 'false'")
+ }
+ }.getOrElse(default)
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 0f09b0a0c8..411a15ffce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.streaming
+import java.net.URI
+
import scala.collection.JavaConverters._
import org.apache.hadoop.fs.{FileStatus, Path}
@@ -79,9 +81,16 @@ class FileStreamSource(
sourceOptions.maxFileAgeMs
}
+ private val fileNameOnly = sourceOptions.fileNameOnly
+ if (fileNameOnly) {
+ logWarning("'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using " +
+ "UUID), otherwise, files with the same name but under different paths will be considered " +
+ "the same and causes data lost.")
+ }
+
/** A mapping from a file that we have processed to some timestamp it was last modified. */
// Visible for testing and debugging in production.
- val seenFiles = new SeenFilesMap(maxFileAgeMs)
+ val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
metadataLog.allFiles().foreach { entry =>
seenFiles.add(entry.path, entry.timestamp)
@@ -268,7 +277,7 @@ object FileStreamSource {
* To prevent the hash map from growing indefinitely, a purge function is available to
* remove files "maxAgeMs" older than the latest file.
*/
- class SeenFilesMap(maxAgeMs: Long) {
+ class SeenFilesMap(maxAgeMs: Long, fileNameOnly: Boolean) {
require(maxAgeMs >= 0)
/** Mapping from file to its timestamp. */
@@ -280,9 +289,13 @@ object FileStreamSource {
/** Timestamp for the last purge operation. */
private var lastPurgeTimestamp: Timestamp = 0L
+ @inline private def stripPathIfNecessary(path: String) = {
+ if (fileNameOnly) new Path(new URI(path)).getName else path
+ }
+
/** Add a new file to the map. */
def add(path: String, timestamp: Timestamp): Unit = {
- map.put(path, timestamp)
+ map.put(stripPathIfNecessary(path), timestamp)
if (timestamp > latestTimestamp) {
latestTimestamp = timestamp
}
@@ -295,7 +308,7 @@ object FileStreamSource {
def isNewFile(path: String, timestamp: Timestamp): Boolean = {
// Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that
// is older than (latestTimestamp - maxAgeMs) but has not been purged yet.
- timestamp >= lastPurgeTimestamp && !map.containsKey(path)
+ timestamp >= lastPurgeTimestamp && !map.containsKey(stripPathIfNecessary(path))
}
/** Removes aged entries and returns the number of files removed. */
@@ -314,9 +327,5 @@ object FileStreamSource {
}
def size: Int = map.size()
-
- def allEntries: Seq[(String, Timestamp)] = {
- map.asScala.toSeq
- }
}
}