aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorpetermaxlee <petermaxlee@gmail.com>2016-08-26 16:05:34 -0700
committerReynold Xin <rxin@databricks.com>2016-08-26 16:05:34 -0700
commitf64a1ddd09a34d5d867ccbaba46204d75fad038d (patch)
tree56f564936f93ba8b685c37f6c6001eba5c85adbe /sql
parenta11d10f1826b578ff721c4738224eef2b3c3b9f3 (diff)
downloadspark-f64a1ddd09a34d5d867ccbaba46204d75fad038d.tar.gz
spark-f64a1ddd09a34d5d867ccbaba46204d75fad038d.tar.bz2
spark-f64a1ddd09a34d5d867ccbaba46204d75fad038d.zip
[SPARK-17235][SQL] Support purging of old logs in MetadataLog
## What changes were proposed in this pull request? This patch adds a purge interface to MetadataLog, and an implementation in HDFSMetadataLog. The purge function is currently unused, but I will use it to purge old execution and file source logs in follow-up patches. These changes are required in a production structured streaming job that runs for a long period of time. ## How was this patch tested? Added a unit test case in HDFSMetadataLogSuite. Author: petermaxlee <petermaxlee@gmail.com> Closes #14802 from petermaxlee/SPARK-17235.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala27
3 files changed, 43 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 2b6f76ca28..127ece9ab0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -227,6 +227,20 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
None
}
+ /**
+ * Removes all the log entry earlier than thresholdBatchId (exclusive).
+ */
+ override def purge(thresholdBatchId: Long): Unit = {
+ val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+ .map(f => pathToBatchId(f.getPath))
+
+ for (batchId <- batchIds if batchId < thresholdBatchId) {
+ val path = batchIdToPath(batchId)
+ fileManager.delete(path)
+ logTrace(s"Removed metadata log file: $path")
+ }
+ }
+
private def createFileManager(): FileManager = {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
try {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
index cc70e1d314..78d6be17df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
@@ -48,4 +48,10 @@ trait MetadataLog[T] {
* Return the latest batch Id and its metadata if exist.
*/
def getLatest(): Option[(Long, T)]
+
+ /**
+ * Removes all the log entry earlier than thresholdBatchId (exclusive).
+ * This operation should be idempotent.
+ */
+ def purge(thresholdBatchId: Long): Unit
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index ab5a2d253b..4259384f0b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -46,14 +46,14 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
test("FileManager: FileContextManager") {
withTempDir { temp =>
val path = new Path(temp.getAbsolutePath)
- testManager(path, new FileContextManager(path, new Configuration))
+ testFileManager(path, new FileContextManager(path, new Configuration))
}
}
test("FileManager: FileSystemManager") {
withTempDir { temp =>
val path = new Path(temp.getAbsolutePath)
- testManager(path, new FileSystemManager(path, new Configuration))
+ testFileManager(path, new FileSystemManager(path, new Configuration))
}
}
@@ -103,6 +103,25 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
+ testWithUninterruptibleThread("HDFSMetadataLog: purge") {
+ withTempDir { temp =>
+ val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
+ assert(metadataLog.add(0, "batch0"))
+ assert(metadataLog.add(1, "batch1"))
+ assert(metadataLog.add(2, "batch2"))
+ assert(metadataLog.get(0).isDefined)
+ assert(metadataLog.get(1).isDefined)
+ assert(metadataLog.get(2).isDefined)
+ assert(metadataLog.getLatest().get._1 == 2)
+
+ metadataLog.purge(2)
+ assert(metadataLog.get(0).isEmpty)
+ assert(metadataLog.get(1).isEmpty)
+ assert(metadataLog.get(2).isDefined)
+ assert(metadataLog.getLatest().get._1 == 2)
+ }
+ }
+
testWithUninterruptibleThread("HDFSMetadataLog: restart") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
@@ -155,8 +174,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
-
- def testManager(basePath: Path, fm: FileManager): Unit = {
+ /** Basic test case for [[FileManager]] implementation. */
+ private def testFileManager(basePath: Path, fm: FileManager): Unit = {
// Mkdirs
val dir = new Path(s"$basePath/dir/subdir/subsubdir")
assert(!fm.exists(dir))