diff options
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala | 27 |
1 files changed, 23 insertions, 4 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index ab5a2d253b..4259384f0b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -46,14 +46,14 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { test("FileManager: FileContextManager") { withTempDir { temp => val path = new Path(temp.getAbsolutePath) - testManager(path, new FileContextManager(path, new Configuration)) + testFileManager(path, new FileContextManager(path, new Configuration)) } } test("FileManager: FileSystemManager") { withTempDir { temp => val path = new Path(temp.getAbsolutePath) - testManager(path, new FileSystemManager(path, new Configuration)) + testFileManager(path, new FileSystemManager(path, new Configuration)) } } @@ -103,6 +103,25 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } + testWithUninterruptibleThread("HDFSMetadataLog: purge") { + withTempDir { temp => + val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath) + assert(metadataLog.add(0, "batch0")) + assert(metadataLog.add(1, "batch1")) + assert(metadataLog.add(2, "batch2")) + assert(metadataLog.get(0).isDefined) + assert(metadataLog.get(1).isDefined) + assert(metadataLog.get(2).isDefined) + assert(metadataLog.getLatest().get._1 == 2) + + metadataLog.purge(2) + assert(metadataLog.get(0).isEmpty) + assert(metadataLog.get(1).isEmpty) + assert(metadataLog.get(2).isDefined) + assert(metadataLog.getLatest().get._1 == 2) + } + } + testWithUninterruptibleThread("HDFSMetadataLog: restart") { withTempDir { temp => val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath) @@ -155,8 +174,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } - - def testManager(basePath: Path, fm: FileManager): Unit = { + /** Basic test case for [[FileManager]] implementation. */ + private def testFileManager(basePath: Path, fm: FileManager): Unit = { // Mkdirs val dir = new Path(s"$basePath/dir/subdir/subsubdir") assert(!fm.exists(dir)) |