aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorWangTaoTheTonic <wangtao111@huawei.com>2015-04-23 17:20:17 -0400
committerSean Owen <sowen@cloudera.com>2015-04-23 17:20:17 -0400
commitbaa83a9a6769c5e119438d65d7264dceb8d743d5 (patch)
tree129e93ab31802591ea9919cfb94b9eb149d00fa7 /core
parent3e91cc273d281053618bfa032bc610e2cf8d8e78 (diff)
downloadspark-baa83a9a6769c5e119438d65d7264dceb8d743d5.tar.gz
spark-baa83a9a6769c5e119438d65d7264dceb8d743d5.tar.bz2
spark-baa83a9a6769c5e119438d65d7264dceb8d743d5.zip
[SPARK-6879] [HISTORYSERVER] check if app is completed before clean it up
https://issues.apache.org/jira/browse/SPARK-6879 Use `applications` to replace `FileStatus`, and check if the app is completed before clean it up. If an exception was throwed, add it to `applications` to wait for the next loop. Author: WangTaoTheTonic <wangtao111@huawei.com> Closes #5491 from WangTaoTheTonic/SPARK-6879 and squashes the following commits: 4a533eb [WangTaoTheTonic] treat ACE specially cb45105 [WangTaoTheTonic] rebase d4d5251 [WangTaoTheTonic] per Marcelo's comments d7455d8 [WangTaoTheTonic] slightly change when delete file b0abca5 [WangTaoTheTonic] use global var to store apps to clean 94adfe1 [WangTaoTheTonic] leave expired apps alone to be deleted 9872a9d [WangTaoTheTonic] use the right path fdef4d6 [WangTaoTheTonic] check if app is completed before clean it up
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala32
1 files changed, 20 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 9847d5944a..a94ebf6e53 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -35,7 +35,6 @@ import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
-
/**
* A class that provides application history from event logs stored in the file system.
* This provider checks for new finished applications in the background periodically and
@@ -76,6 +75,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()
+ // List of applications to be deleted by event log cleaner.
+ private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
+
// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
@@ -266,34 +268,40 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def cleanLogs(): Unit = {
try {
- val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
- .getOrElse(Seq[FileStatus]())
val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000
val now = System.currentTimeMillis()
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
+ // Scan all logs from the log directory.
+ // Only completed applications older than the specified max age will be deleted.
applications.values.foreach { info =>
- if (now - info.lastUpdated <= maxAge) {
+ if (now - info.lastUpdated <= maxAge || !info.completed) {
appsToRetain += (info.id -> info)
+ } else {
+ appsToClean += info
}
}
applications = appsToRetain
- // Scan all logs from the log directory.
- // Only directories older than the specified max age will be deleted
- statusList.foreach { dir =>
+ val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
+ appsToClean.foreach { info =>
try {
- if (now - dir.getModificationTime() > maxAge) {
- // if path is a directory and set to true,
- // the directory is deleted else throws an exception
- fs.delete(dir.getPath, true)
+ val path = new Path(logDir, info.logPath)
+ if (fs.exists(path)) {
+ fs.delete(path, true)
}
} catch {
- case t: IOException => logError(s"IOException in cleaning logs of $dir", t)
+ case e: AccessControlException =>
+ logInfo(s"No permission to delete ${info.logPath}, ignoring.")
+ case t: IOException =>
+ logError(s"IOException in cleaning logs of ${info.logPath}", t)
+ leftToClean += info
}
}
+
+ appsToClean = leftToClean
} catch {
case t: Exception => logError("Exception in cleaning logs", t)
}