From 62ccf27ab4b55e734646678ae78b7e812262d14b Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 22 Sep 2016 23:35:08 -0700 Subject: [SPARK-17640][SQL] Avoid using -1 as the default batchId for FileStreamSource.FileEntry ## What changes were proposed in this pull request? Avoid using -1 as the default batchId for FileStreamSource.FileEntry so that we can make sure not writing any FileEntry(..., batchId = -1) into the log. This also avoids people misusing it in future (#15203 is an example). ## How was this patch tested? Jenkins. Author: Shixiong Zhu Closes #15206 from zsxwing/cleanup. --- .../streaming/FileStreamSourceSuite.scala | 24 +++++++++++----------- 1 file changed, 12 insertions(+), 12 deletions(-) (limited to 'sql/core/src/test/scala') diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala index 0795a0527f..3e1e1126f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala @@ -36,51 +36,51 @@ class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext { test("SeenFilesMap") { val map = new SeenFilesMap(maxAgeMs = 10) - map.add(FileEntry("a", 5)) + map.add("a", 5) assert(map.size == 1) map.purge() assert(map.size == 1) // Add a new entry and purge should be no-op, since the gap is exactly 10 ms. - map.add(FileEntry("b", 15)) + map.add("b", 15) assert(map.size == 2) map.purge() assert(map.size == 2) // Add a new entry that's more than 10 ms than the first entry. We should be able to purge now. - map.add(FileEntry("c", 16)) + map.add("c", 16) assert(map.size == 3) map.purge() assert(map.size == 2) // Override existing entry shouldn't change the size - map.add(FileEntry("c", 25)) + map.add("c", 25) assert(map.size == 2) // Not a new file because we have seen c before - assert(!map.isNewFile(FileEntry("c", 20))) + assert(!map.isNewFile("c", 20)) // Not a new file because timestamp is too old - assert(!map.isNewFile(FileEntry("d", 5))) + assert(!map.isNewFile("d", 5)) // Finally a new file: never seen and not too old - assert(map.isNewFile(FileEntry("e", 20))) + assert(map.isNewFile("e", 20)) } test("SeenFilesMap should only consider a file old if it is earlier than last purge time") { val map = new SeenFilesMap(maxAgeMs = 10) - map.add(FileEntry("a", 20)) + map.add("a", 20) assert(map.size == 1) // Timestamp 5 should still considered a new file because purge time should be 0 - assert(map.isNewFile(FileEntry("b", 9))) - assert(map.isNewFile(FileEntry("b", 10))) + assert(map.isNewFile("b", 9)) + assert(map.isNewFile("b", 10)) // Once purge, purge time should be 10 and then b would be a old file if it is less than 10. map.purge() - assert(!map.isNewFile(FileEntry("b", 9))) - assert(map.isNewFile(FileEntry("b", 10))) + assert(!map.isNewFile("b", 9)) + assert(map.isNewFile("b", 10)) } testWithUninterruptibleThread("do not recheck that files exist during getBatch") { -- cgit v1.2.3