diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-09-22 23:35:08 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-09-22 23:35:08 -0700 |
commit | 62ccf27ab4b55e734646678ae78b7e812262d14b (patch) | |
tree | 73ed5c72bd4e1456671a4ca85d588ee6bdd0a5e2 /sql/core/src/test | |
parent | 947b8c6e3acd671d501f0ed6c077aac8e51ccede (diff) | |
download | spark-62ccf27ab4b55e734646678ae78b7e812262d14b.tar.gz spark-62ccf27ab4b55e734646678ae78b7e812262d14b.tar.bz2 spark-62ccf27ab4b55e734646678ae78b7e812262d14b.zip |
[SPARK-17640][SQL] Avoid using -1 as the default batchId for FileStreamSource.FileEntry
## What changes were proposed in this pull request?
Avoid using -1 as the default batchId for FileStreamSource.FileEntry so that we can make sure not writing any FileEntry(..., batchId = -1) into the log. This also avoids people misusing it in future (#15203 is an example).
## How was this patch tested?
Jenkins.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #15206 from zsxwing/cleanup.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala | 24 |
1 files changed, 12 insertions, 12 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala index 0795a0527f..3e1e1126f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala @@ -36,51 +36,51 @@ class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext { test("SeenFilesMap") { val map = new SeenFilesMap(maxAgeMs = 10) - map.add(FileEntry("a", 5)) + map.add("a", 5) assert(map.size == 1) map.purge() assert(map.size == 1) // Add a new entry and purge should be no-op, since the gap is exactly 10 ms. - map.add(FileEntry("b", 15)) + map.add("b", 15) assert(map.size == 2) map.purge() assert(map.size == 2) // Add a new entry that's more than 10 ms than the first entry. We should be able to purge now. - map.add(FileEntry("c", 16)) + map.add("c", 16) assert(map.size == 3) map.purge() assert(map.size == 2) // Override existing entry shouldn't change the size - map.add(FileEntry("c", 25)) + map.add("c", 25) assert(map.size == 2) // Not a new file because we have seen c before - assert(!map.isNewFile(FileEntry("c", 20))) + assert(!map.isNewFile("c", 20)) // Not a new file because timestamp is too old - assert(!map.isNewFile(FileEntry("d", 5))) + assert(!map.isNewFile("d", 5)) // Finally a new file: never seen and not too old - assert(map.isNewFile(FileEntry("e", 20))) + assert(map.isNewFile("e", 20)) } test("SeenFilesMap should only consider a file old if it is earlier than last purge time") { val map = new SeenFilesMap(maxAgeMs = 10) - map.add(FileEntry("a", 20)) + map.add("a", 20) assert(map.size == 1) // Timestamp 5 should still considered a new file because purge time should be 0 - assert(map.isNewFile(FileEntry("b", 9))) - assert(map.isNewFile(FileEntry("b", 10))) + assert(map.isNewFile("b", 9)) + assert(map.isNewFile("b", 10)) // Once purge, purge time should be 10 and then b would be a old file if it is less than 10. map.purge() - assert(!map.isNewFile(FileEntry("b", 9))) - assert(map.isNewFile(FileEntry("b", 10))) + assert(!map.isNewFile("b", 9)) + assert(map.isNewFile("b", 10)) } testWithUninterruptibleThread("do not recheck that files exist during getBatch") { |