diff options
author | xukun 00228947 <xukun.xu@huawei.com> | 2015-02-26 13:24:00 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-02-26 13:24:00 -0800 |
commit | 8942b522d8a3269a2a357e3a274ed4b3e66ebdde (patch) | |
tree | e1f9a65a0784fb4a3c83ad344f7e853a554b1729 /core | |
parent | 10094a523e3993b775111ae9b22ca31cc0d76e03 (diff) | |
download | spark-8942b522d8a3269a2a357e3a274ed4b3e66ebdde.tar.gz spark-8942b522d8a3269a2a357e3a274ed4b3e66ebdde.tar.bz2 spark-8942b522d8a3269a2a357e3a274ed4b3e66ebdde.zip |
[SPARK-3562]Periodic cleanup event logs
Author: xukun 00228947 <xukun.xu@huawei.com>
Closes #4214 from viper-kun/cleaneventlog and squashes the following commits:
7a5b9c5 [xukun 00228947] fix issue
31674ee [xukun 00228947] fix issue
6e3d06b [xukun 00228947] fix issue
373f3b9 [xukun 00228947] fix issue
71782b5 [xukun 00228947] fix issue
5b45035 [xukun 00228947] fix issue
70c28d6 [xukun 00228947] fix issues
adcfe86 [xukun 00228947] Periodic cleanup event logs
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkConf.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala | 112 |
2 files changed, 86 insertions, 34 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 0dbd26146c..0f4922ab4e 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -362,7 +362,13 @@ private[spark] object SparkConf extends Logging { DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst", "1.3"), DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3", - "Use spark.{driver,executor}.userClassPathFirst instead.")) + "Use spark.{driver,executor}.userClassPathFirst instead."), + DeprecatedConfig("spark.history.fs.updateInterval", + "spark.history.fs.update.interval.seconds", + "1.3", "Use spark.history.fs.update.interval.seconds instead"), + DeprecatedConfig("spark.history.updateInterval", + "spark.history.fs.update.interval.seconds", + "1.3", "Use spark.history.fs.update.interval.seconds instead")) configs.map { x => (x.oldName, x) }.toMap } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 885fa0fdbf..1aaa7b7273 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -17,9 +17,13 @@ package org.apache.spark.deploy.history -import java.io.{BufferedInputStream, FileNotFoundException, InputStream} +import java.io.{IOException, BufferedInputStream, FileNotFoundException, InputStream} +import java.util.concurrent.{Executors, TimeUnit} import scala.collection.mutable +import scala.concurrent.duration.Duration + +import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.fs.permission.AccessControlException @@ -44,8 +48,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val NOT_STARTED = "<Not Started>" // Interval between each check for event log updates - private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", - conf.getInt("spark.history.updateInterval", 10)) * 1000 + private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds") + .orElse(conf.getOption(SparkConf.translateConfKey("spark.history.fs.updateInterval", true))) + .orElse(conf.getOption(SparkConf.translateConfKey("spark.history.updateInterval", true))) + .map(_.toInt) + .getOrElse(10) * 1000 + + // Interval between each cleaner checks for event logs to delete + private val CLEAN_INTERVAL_MS = conf.getLong("spark.history.fs.cleaner.interval.seconds", + DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S) * 1000 private val logDir = conf.getOption("spark.history.fs.logDirectory") .map { d => Utils.resolveURI(d).toString } @@ -53,8 +64,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf)) - // A timestamp of when the disk was last accessed to check for log updates - private var lastLogCheckTimeMs = -1L + // Used by check event thread and clean log thread. + // Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs + // and applications between check task and clean task. + private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() + .setNameFormat("spark-history-task-%d").setDaemon(true).build()) // The modification time of the newest log detected during the last scan. This is used // to ignore logs that are older during subsequent scans, to avoid processing data that @@ -73,25 +87,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" /** - * A background thread that periodically checks for event log updates on disk. - * - * If a log check is invoked manually in the middle of a period, this thread re-adjusts the - * time at which it performs the next log check to maintain the same period as before. - * - * TODO: Add a mechanism to update manually. + * Return a runnable that performs the given operation on the event logs. + * This operation is expected to be executed periodically. */ - private val logCheckingThread = new Thread("LogCheckingThread") { - override def run() = Utils.logUncaughtExceptions { - while (true) { - val now = getMonotonicTimeMs() - if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) { - Thread.sleep(UPDATE_INTERVAL_MS) - } else { - // If the user has manually checked for logs recently, wait until - // UPDATE_INTERVAL_MS after the last check time - Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now) - } - checkForLogs() + private def getRunner(operateFun: () => Unit): Runnable = { + new Runnable() { + override def run() = Utils.logUncaughtExceptions { + operateFun() } } } @@ -113,12 +115,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis "Logging directory specified is not a directory: %s".format(logDir)) } - checkForLogs() - // Disable the background thread during tests. if (!conf.contains("spark.testing")) { - logCheckingThread.setDaemon(true) - logCheckingThread.start() + // A task that periodically checks for event log updates on disk. + pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS, + TimeUnit.MILLISECONDS) + + if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) { + // A task that periodically cleans event logs on disk. + pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS, + TimeUnit.MILLISECONDS) + } } } @@ -163,9 +170,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis * applications that haven't been updated since last time the logs were checked. */ private[history] def checkForLogs(): Unit = { - lastLogCheckTimeMs = getMonotonicTimeMs() - logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs)) - try { var newLastModifiedTime = lastModifiedTime val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) @@ -231,6 +235,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } /** + * Delete event logs from the log directory according to the clean policy defined by the user. + */ + private def cleanLogs(): Unit = { + try { + val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) + .getOrElse(Seq[FileStatus]()) + val maxAge = conf.getLong("spark.history.fs.cleaner.maxAge.seconds", + DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 + + val now = System.currentTimeMillis() + val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + + applications.values.foreach { info => + if (now - info.lastUpdated <= maxAge) { + appsToRetain += (info.id -> info) + } + } + + applications = appsToRetain + + // Scan all logs from the log directory. + // Only directories older than the specified max age will be deleted + statusList.foreach { dir => + try { + if (now - dir.getModificationTime() > maxAge) { + // if path is a directory and set to true, + // the directory is deleted else throws an exception + fs.delete(dir.getPath, true) + } + } catch { + case t: IOException => logError(s"IOException in cleaning logs of $dir", t) + } + } + } catch { + case t: Exception => logError("Exception in cleaning logs", t) + } + } + + /** * Comparison function that defines the sort order for the application listing. * * @return Whether `i1` should precede `i2`. @@ -336,9 +379,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } - /** Returns the system's mononotically increasing time. */ - private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000) - /** * Return true when the application has completed. */ @@ -354,6 +394,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private object FsHistoryProvider { val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + + // One day + val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds + + // One week + val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds } private class FsApplicationHistoryInfo( |