aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorpetermaxlee <petermaxlee@gmail.com>2016-08-26 16:05:34 -0700
committerReynold Xin <rxin@databricks.com>2016-08-26 16:05:34 -0700
commitf64a1ddd09a34d5d867ccbaba46204d75fad038d (patch)
tree56f564936f93ba8b685c37f6c6001eba5c85adbe /sql/core/src/test/scala
parenta11d10f1826b578ff721c4738224eef2b3c3b9f3 (diff)
downloadspark-f64a1ddd09a34d5d867ccbaba46204d75fad038d.tar.gz
spark-f64a1ddd09a34d5d867ccbaba46204d75fad038d.tar.bz2
spark-f64a1ddd09a34d5d867ccbaba46204d75fad038d.zip
[SPARK-17235][SQL] Support purging of old logs in MetadataLog
## What changes were proposed in this pull request? This patch adds a purge interface to MetadataLog, and an implementation in HDFSMetadataLog. The purge function is currently unused, but I will use it to purge old execution and file source logs in follow-up patches. These changes are required in a production structured streaming job that runs for a long period of time. ## How was this patch tested? Added a unit test case in HDFSMetadataLogSuite. Author: petermaxlee <petermaxlee@gmail.com> Closes #14802 from petermaxlee/SPARK-17235.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala27
1 files changed, 23 insertions, 4 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index ab5a2d253b..4259384f0b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -46,14 +46,14 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
test("FileManager: FileContextManager") {
withTempDir { temp =>
val path = new Path(temp.getAbsolutePath)
- testManager(path, new FileContextManager(path, new Configuration))
+ testFileManager(path, new FileContextManager(path, new Configuration))
}
}
test("FileManager: FileSystemManager") {
withTempDir { temp =>
val path = new Path(temp.getAbsolutePath)
- testManager(path, new FileSystemManager(path, new Configuration))
+ testFileManager(path, new FileSystemManager(path, new Configuration))
}
}
@@ -103,6 +103,25 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
+ testWithUninterruptibleThread("HDFSMetadataLog: purge") {
+ withTempDir { temp =>
+ val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
+ assert(metadataLog.add(0, "batch0"))
+ assert(metadataLog.add(1, "batch1"))
+ assert(metadataLog.add(2, "batch2"))
+ assert(metadataLog.get(0).isDefined)
+ assert(metadataLog.get(1).isDefined)
+ assert(metadataLog.get(2).isDefined)
+ assert(metadataLog.getLatest().get._1 == 2)
+
+ metadataLog.purge(2)
+ assert(metadataLog.get(0).isEmpty)
+ assert(metadataLog.get(1).isEmpty)
+ assert(metadataLog.get(2).isDefined)
+ assert(metadataLog.getLatest().get._1 == 2)
+ }
+ }
+
testWithUninterruptibleThread("HDFSMetadataLog: restart") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
@@ -155,8 +174,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
-
- def testManager(basePath: Path, fm: FileManager): Unit = {
+ /** Basic test case for [[FileManager]] implementation. */
+ private def testFileManager(basePath: Path, fm: FileManager): Unit = {
// Mkdirs
val dir = new Path(s"$basePath/dir/subdir/subsubdir")
assert(!fm.exists(dir))