aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-09-22 23:35:08 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-09-22 23:35:08 -0700
commit62ccf27ab4b55e734646678ae78b7e812262d14b (patch)
tree73ed5c72bd4e1456671a4ca85d588ee6bdd0a5e2 /sql/core/src/test
parent947b8c6e3acd671d501f0ed6c077aac8e51ccede (diff)
downloadspark-62ccf27ab4b55e734646678ae78b7e812262d14b.tar.gz
spark-62ccf27ab4b55e734646678ae78b7e812262d14b.tar.bz2
spark-62ccf27ab4b55e734646678ae78b7e812262d14b.zip
[SPARK-17640][SQL] Avoid using -1 as the default batchId for FileStreamSource.FileEntry
## What changes were proposed in this pull request? Avoid using -1 as the default batchId for FileStreamSource.FileEntry so that we can make sure not writing any FileEntry(..., batchId = -1) into the log. This also avoids people misusing it in future (#15203 is an example). ## How was this patch tested? Jenkins. Author: Shixiong Zhu <shixiong@databricks.com> Closes #15206 from zsxwing/cleanup.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala24
1 files changed, 12 insertions, 12 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
index 0795a0527f..3e1e1126f9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
@@ -36,51 +36,51 @@ class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext {
test("SeenFilesMap") {
val map = new SeenFilesMap(maxAgeMs = 10)
- map.add(FileEntry("a", 5))
+ map.add("a", 5)
assert(map.size == 1)
map.purge()
assert(map.size == 1)
// Add a new entry and purge should be no-op, since the gap is exactly 10 ms.
- map.add(FileEntry("b", 15))
+ map.add("b", 15)
assert(map.size == 2)
map.purge()
assert(map.size == 2)
// Add a new entry that's more than 10 ms than the first entry. We should be able to purge now.
- map.add(FileEntry("c", 16))
+ map.add("c", 16)
assert(map.size == 3)
map.purge()
assert(map.size == 2)
// Override existing entry shouldn't change the size
- map.add(FileEntry("c", 25))
+ map.add("c", 25)
assert(map.size == 2)
// Not a new file because we have seen c before
- assert(!map.isNewFile(FileEntry("c", 20)))
+ assert(!map.isNewFile("c", 20))
// Not a new file because timestamp is too old
- assert(!map.isNewFile(FileEntry("d", 5)))
+ assert(!map.isNewFile("d", 5))
// Finally a new file: never seen and not too old
- assert(map.isNewFile(FileEntry("e", 20)))
+ assert(map.isNewFile("e", 20))
}
test("SeenFilesMap should only consider a file old if it is earlier than last purge time") {
val map = new SeenFilesMap(maxAgeMs = 10)
- map.add(FileEntry("a", 20))
+ map.add("a", 20)
assert(map.size == 1)
// Timestamp 5 should still considered a new file because purge time should be 0
- assert(map.isNewFile(FileEntry("b", 9)))
- assert(map.isNewFile(FileEntry("b", 10)))
+ assert(map.isNewFile("b", 9))
+ assert(map.isNewFile("b", 10))
// Once purge, purge time should be 10 and then b would be a old file if it is less than 10.
map.purge()
- assert(!map.isNewFile(FileEntry("b", 9)))
- assert(map.isNewFile(FileEntry("b", 10)))
+ assert(!map.isNewFile("b", 9))
+ assert(map.isNewFile("b", 10))
}
testWithUninterruptibleThread("do not recheck that files exist during getBatch") {