aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala27
1 files changed, 23 insertions, 4 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index ab5a2d253b..4259384f0b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -46,14 +46,14 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
test("FileManager: FileContextManager") {
withTempDir { temp =>
val path = new Path(temp.getAbsolutePath)
- testManager(path, new FileContextManager(path, new Configuration))
+ testFileManager(path, new FileContextManager(path, new Configuration))
}
}
test("FileManager: FileSystemManager") {
withTempDir { temp =>
val path = new Path(temp.getAbsolutePath)
- testManager(path, new FileSystemManager(path, new Configuration))
+ testFileManager(path, new FileSystemManager(path, new Configuration))
}
}
@@ -103,6 +103,25 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
+ testWithUninterruptibleThread("HDFSMetadataLog: purge") {
+ withTempDir { temp =>
+ val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
+ assert(metadataLog.add(0, "batch0"))
+ assert(metadataLog.add(1, "batch1"))
+ assert(metadataLog.add(2, "batch2"))
+ assert(metadataLog.get(0).isDefined)
+ assert(metadataLog.get(1).isDefined)
+ assert(metadataLog.get(2).isDefined)
+ assert(metadataLog.getLatest().get._1 == 2)
+
+ metadataLog.purge(2)
+ assert(metadataLog.get(0).isEmpty)
+ assert(metadataLog.get(1).isEmpty)
+ assert(metadataLog.get(2).isDefined)
+ assert(metadataLog.getLatest().get._1 == 2)
+ }
+ }
+
testWithUninterruptibleThread("HDFSMetadataLog: restart") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
@@ -155,8 +174,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
-
- def testManager(basePath: Path, fm: FileManager): Unit = {
+ /** Basic test case for [[FileManager]] implementation. */
+ private def testFileManager(basePath: Path, fm: FileManager): Unit = {
// Mkdirs
val dir = new Path(s"$basePath/dir/subdir/subsubdir")
assert(!fm.exists(dir))