aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-09-22 23:35:08 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-09-22 23:35:08 -0700
commit62ccf27ab4b55e734646678ae78b7e812262d14b (patch)
tree73ed5c72bd4e1456671a4ca85d588ee6bdd0a5e2
parent947b8c6e3acd671d501f0ed6c077aac8e51ccede (diff)
downloadspark-62ccf27ab4b55e734646678ae78b7e812262d14b.tar.gz
spark-62ccf27ab4b55e734646678ae78b7e812262d14b.tar.bz2
spark-62ccf27ab4b55e734646678ae78b7e812262d14b.zip
[SPARK-17640][SQL] Avoid using -1 as the default batchId for FileStreamSource.FileEntry
## What changes were proposed in this pull request? Avoid using -1 as the default batchId for FileStreamSource.FileEntry so that we can make sure not writing any FileEntry(..., batchId = -1) into the log. This also avoids people misusing it in future (#15203 is an example). ## How was this patch tested? Jenkins. Author: Shixiong Zhu <shixiong@databricks.com> Closes #15206 from zsxwing/cleanup.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala37
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala24
2 files changed, 31 insertions, 30 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 5ebc083a7d..be023273db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -59,7 +59,7 @@ class FileStreamSource(
val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
metadataLog.allFiles().foreach { entry =>
- seenFiles.add(entry)
+ seenFiles.add(entry.path, entry.timestamp)
}
seenFiles.purge()
@@ -73,14 +73,16 @@ class FileStreamSource(
*/
private def fetchMaxOffset(): LongOffset = synchronized {
// All the new files found - ignore aged files and files that we have seen.
- val newFiles = fetchAllFiles().filter(seenFiles.isNewFile)
+ val newFiles = fetchAllFiles().filter {
+ case (path, timestamp) => seenFiles.isNewFile(path, timestamp)
+ }
// Obey user's setting to limit the number of files in this batch trigger.
val batchFiles =
if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles
batchFiles.foreach { file =>
- seenFiles.add(file)
+ seenFiles.add(file._1, file._2)
logDebug(s"New file: $file")
}
val numPurged = seenFiles.purge()
@@ -95,7 +97,9 @@ class FileStreamSource(
if (batchFiles.nonEmpty) {
maxBatchId += 1
- metadataLog.add(maxBatchId, batchFiles.map(_.copy(batchId = maxBatchId)).toArray)
+ metadataLog.add(maxBatchId, batchFiles.map { case (path, timestamp) =>
+ FileEntry(path = path, timestamp = timestamp, batchId = maxBatchId)
+ }.toArray)
logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}
@@ -140,12 +144,12 @@ class FileStreamSource(
/**
* Returns a list of files found, sorted by their timestamp.
*/
- private def fetchAllFiles(): Seq[FileEntry] = {
+ private def fetchAllFiles(): Seq[(String, Long)] = {
val startTime = System.nanoTime
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType))
val files = catalog.allFiles().sortBy(_.getModificationTime).map { status =>
- FileEntry(status.getPath.toUri.toString, status.getModificationTime)
+ (status.getPath.toUri.toString, status.getModificationTime)
}
val endTime = System.nanoTime
val listingTimeMs = (endTime.toDouble - startTime) / 1000000
@@ -172,10 +176,7 @@ object FileStreamSource {
/** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
type Timestamp = Long
- val NOT_SET = -1L
-
- case class FileEntry(path: String, timestamp: Timestamp, batchId: Long = NOT_SET)
- extends Serializable
+ case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) extends Serializable
/**
* A custom hash map used to track the list of files seen. This map is not thread-safe.
@@ -196,10 +197,10 @@ object FileStreamSource {
private var lastPurgeTimestamp: Timestamp = 0L
/** Add a new file to the map. */
- def add(file: FileEntry): Unit = {
- map.put(file.path, file.timestamp)
- if (file.timestamp > latestTimestamp) {
- latestTimestamp = file.timestamp
+ def add(path: String, timestamp: Timestamp): Unit = {
+ map.put(path, timestamp)
+ if (timestamp > latestTimestamp) {
+ latestTimestamp = timestamp
}
}
@@ -207,10 +208,10 @@ object FileStreamSource {
* Returns true if we should consider this file a new file. The file is only considered "new"
* if it is new enough that we are still tracking, and we have not seen it before.
*/
- def isNewFile(file: FileEntry): Boolean = {
+ def isNewFile(path: String, timestamp: Timestamp): Boolean = {
// Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that
// is older than (latestTimestamp - maxAgeMs) but has not been purged yet.
- file.timestamp >= lastPurgeTimestamp && !map.containsKey(file.path)
+ timestamp >= lastPurgeTimestamp && !map.containsKey(path)
}
/** Removes aged entries and returns the number of files removed. */
@@ -230,8 +231,8 @@ object FileStreamSource {
def size: Int = map.size()
- def allEntries: Seq[FileEntry] = {
- map.entrySet().asScala.map(entry => FileEntry(entry.getKey, entry.getValue)).toSeq
+ def allEntries: Seq[(String, Timestamp)] = {
+ map.asScala.toSeq
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
index 0795a0527f..3e1e1126f9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
@@ -36,51 +36,51 @@ class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext {
test("SeenFilesMap") {
val map = new SeenFilesMap(maxAgeMs = 10)
- map.add(FileEntry("a", 5))
+ map.add("a", 5)
assert(map.size == 1)
map.purge()
assert(map.size == 1)
// Add a new entry and purge should be no-op, since the gap is exactly 10 ms.
- map.add(FileEntry("b", 15))
+ map.add("b", 15)
assert(map.size == 2)
map.purge()
assert(map.size == 2)
// Add a new entry that's more than 10 ms than the first entry. We should be able to purge now.
- map.add(FileEntry("c", 16))
+ map.add("c", 16)
assert(map.size == 3)
map.purge()
assert(map.size == 2)
// Override existing entry shouldn't change the size
- map.add(FileEntry("c", 25))
+ map.add("c", 25)
assert(map.size == 2)
// Not a new file because we have seen c before
- assert(!map.isNewFile(FileEntry("c", 20)))
+ assert(!map.isNewFile("c", 20))
// Not a new file because timestamp is too old
- assert(!map.isNewFile(FileEntry("d", 5)))
+ assert(!map.isNewFile("d", 5))
// Finally a new file: never seen and not too old
- assert(map.isNewFile(FileEntry("e", 20)))
+ assert(map.isNewFile("e", 20))
}
test("SeenFilesMap should only consider a file old if it is earlier than last purge time") {
val map = new SeenFilesMap(maxAgeMs = 10)
- map.add(FileEntry("a", 20))
+ map.add("a", 20)
assert(map.size == 1)
// Timestamp 5 should still considered a new file because purge time should be 0
- assert(map.isNewFile(FileEntry("b", 9)))
- assert(map.isNewFile(FileEntry("b", 10)))
+ assert(map.isNewFile("b", 9))
+ assert(map.isNewFile("b", 10))
// Once purge, purge time should be 10 and then b would be a old file if it is less than 10.
map.purge()
- assert(!map.isNewFile(FileEntry("b", 9)))
- assert(map.isNewFile(FileEntry("b", 10)))
+ assert(!map.isNewFile("b", 9))
+ assert(map.isNewFile("b", 10))
}
testWithUninterruptibleThread("do not recheck that files exist during getBatch") {