aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorxukun 00228947 <xukun.xu@huawei.com>2015-02-26 13:24:00 -0800
committerAndrew Or <andrew@databricks.com>2015-02-26 13:24:00 -0800
commit8942b522d8a3269a2a357e3a274ed4b3e66ebdde (patch)
treee1f9a65a0784fb4a3c83ad344f7e853a554b1729
parent10094a523e3993b775111ae9b22ca31cc0d76e03 (diff)
downloadspark-8942b522d8a3269a2a357e3a274ed4b3e66ebdde.tar.gz
spark-8942b522d8a3269a2a357e3a274ed4b3e66ebdde.tar.bz2
spark-8942b522d8a3269a2a357e3a274ed4b3e66ebdde.zip
[SPARK-3562]Periodic cleanup event logs
Author: xukun 00228947 <xukun.xu@huawei.com> Closes #4214 from viper-kun/cleaneventlog and squashes the following commits: 7a5b9c5 [xukun 00228947] fix issue 31674ee [xukun 00228947] fix issue 6e3d06b [xukun 00228947] fix issue 373f3b9 [xukun 00228947] fix issue 71782b5 [xukun 00228947] fix issue 5b45035 [xukun 00228947] fix issue 70c28d6 [xukun 00228947] fix issues adcfe86 [xukun 00228947] Periodic cleanup event logs
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala112
-rw-r--r--docs/monitoring.md25
3 files changed, 110 insertions, 35 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 0dbd26146c..0f4922ab4e 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -362,7 +362,13 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
"1.3"),
DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
- "Use spark.{driver,executor}.userClassPathFirst instead."))
+ "Use spark.{driver,executor}.userClassPathFirst instead."),
+ DeprecatedConfig("spark.history.fs.updateInterval",
+ "spark.history.fs.update.interval.seconds",
+ "1.3", "Use spark.history.fs.update.interval.seconds instead"),
+ DeprecatedConfig("spark.history.updateInterval",
+ "spark.history.fs.update.interval.seconds",
+ "1.3", "Use spark.history.fs.update.interval.seconds instead"))
configs.map { x => (x.oldName, x) }.toMap
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 885fa0fdbf..1aaa7b7273 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -17,9 +17,13 @@
package org.apache.spark.deploy.history
-import java.io.{BufferedInputStream, FileNotFoundException, InputStream}
+import java.io.{IOException, BufferedInputStream, FileNotFoundException, InputStream}
+import java.util.concurrent.{Executors, TimeUnit}
import scala.collection.mutable
+import scala.concurrent.duration.Duration
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.permission.AccessControlException
@@ -44,8 +48,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private val NOT_STARTED = "<Not Started>"
// Interval between each check for event log updates
- private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
- conf.getInt("spark.history.updateInterval", 10)) * 1000
+ private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds")
+ .orElse(conf.getOption(SparkConf.translateConfKey("spark.history.fs.updateInterval", true)))
+ .orElse(conf.getOption(SparkConf.translateConfKey("spark.history.updateInterval", true)))
+ .map(_.toInt)
+ .getOrElse(10) * 1000
+
+ // Interval between each cleaner checks for event logs to delete
+ private val CLEAN_INTERVAL_MS = conf.getLong("spark.history.fs.cleaner.interval.seconds",
+ DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S) * 1000
private val logDir = conf.getOption("spark.history.fs.logDirectory")
.map { d => Utils.resolveURI(d).toString }
@@ -53,8 +64,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf))
- // A timestamp of when the disk was last accessed to check for log updates
- private var lastLogCheckTimeMs = -1L
+ // Used by check event thread and clean log thread.
+ // Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
+ // and applications between check task and clean task.
+ private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
+ .setNameFormat("spark-history-task-%d").setDaemon(true).build())
// The modification time of the newest log detected during the last scan. This is used
// to ignore logs that are older during subsequent scans, to avoid processing data that
@@ -73,25 +87,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
/**
- * A background thread that periodically checks for event log updates on disk.
- *
- * If a log check is invoked manually in the middle of a period, this thread re-adjusts the
- * time at which it performs the next log check to maintain the same period as before.
- *
- * TODO: Add a mechanism to update manually.
+ * Return a runnable that performs the given operation on the event logs.
+ * This operation is expected to be executed periodically.
*/
- private val logCheckingThread = new Thread("LogCheckingThread") {
- override def run() = Utils.logUncaughtExceptions {
- while (true) {
- val now = getMonotonicTimeMs()
- if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
- Thread.sleep(UPDATE_INTERVAL_MS)
- } else {
- // If the user has manually checked for logs recently, wait until
- // UPDATE_INTERVAL_MS after the last check time
- Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
- }
- checkForLogs()
+ private def getRunner(operateFun: () => Unit): Runnable = {
+ new Runnable() {
+ override def run() = Utils.logUncaughtExceptions {
+ operateFun()
}
}
}
@@ -113,12 +115,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
"Logging directory specified is not a directory: %s".format(logDir))
}
- checkForLogs()
-
// Disable the background thread during tests.
if (!conf.contains("spark.testing")) {
- logCheckingThread.setDaemon(true)
- logCheckingThread.start()
+ // A task that periodically checks for event log updates on disk.
+ pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS,
+ TimeUnit.MILLISECONDS)
+
+ if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
+ // A task that periodically cleans event logs on disk.
+ pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS,
+ TimeUnit.MILLISECONDS)
+ }
}
}
@@ -163,9 +170,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
* applications that haven't been updated since last time the logs were checked.
*/
private[history] def checkForLogs(): Unit = {
- lastLogCheckTimeMs = getMonotonicTimeMs()
- logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
-
try {
var newLastModifiedTime = lastModifiedTime
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
@@ -231,6 +235,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}
/**
+ * Delete event logs from the log directory according to the clean policy defined by the user.
+ */
+ private def cleanLogs(): Unit = {
+ try {
+ val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
+ .getOrElse(Seq[FileStatus]())
+ val maxAge = conf.getLong("spark.history.fs.cleaner.maxAge.seconds",
+ DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
+
+ val now = System.currentTimeMillis()
+ val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
+
+ applications.values.foreach { info =>
+ if (now - info.lastUpdated <= maxAge) {
+ appsToRetain += (info.id -> info)
+ }
+ }
+
+ applications = appsToRetain
+
+ // Scan all logs from the log directory.
+ // Only directories older than the specified max age will be deleted
+ statusList.foreach { dir =>
+ try {
+ if (now - dir.getModificationTime() > maxAge) {
+ // if path is a directory and set to true,
+ // the directory is deleted else throws an exception
+ fs.delete(dir.getPath, true)
+ }
+ } catch {
+ case t: IOException => logError(s"IOException in cleaning logs of $dir", t)
+ }
+ }
+ } catch {
+ case t: Exception => logError("Exception in cleaning logs", t)
+ }
+ }
+
+ /**
* Comparison function that defines the sort order for the application listing.
*
* @return Whether `i1` should precede `i2`.
@@ -336,9 +379,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}
}
- /** Returns the system's mononotically increasing time. */
- private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000)
-
/**
* Return true when the application has completed.
*/
@@ -354,6 +394,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+ // One day
+ val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds
+
+ // One week
+ val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds
}
private class FsApplicationHistoryInfo(
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 009a344dff..37ede476c1 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -86,7 +86,7 @@ follows:
</td>
</tr>
<tr>
- <td>spark.history.fs.updateInterval</td>
+ <td>spark.history.fs.update.interval.seconds</td>
<td>10</td>
<td>
The period, in seconds, at which information displayed by this history server is updated.
@@ -145,6 +145,29 @@ follows:
If disabled, no access control checks are made.
</td>
</tr>
+ <tr>
+ <td>spark.history.fs.cleaner.enabled</td>
+ <td>false</td>
+ <td>
+ Specifies whether the History Server should periodically clean up event logs from storage.
+ </td>
+ </tr>
+ <tr>
+ <td>spark.history.fs.cleaner.interval.seconds</td>
+ <td>86400</td>
+ <td>
+ How often the job history cleaner checks for files to delete, in seconds. Defaults to 86400 (one day).
+ Files are only deleted if they are older than spark.history.fs.cleaner.maxAge.seconds.
+ </td>
+ </tr>
+ <tr>
+ <td>spark.history.fs.cleaner.maxAge.seconds</td>
+ <td>3600 * 24 * 7</td>
+ <td>
+ Job history files older than this many seconds will be deleted when the history cleaner runs.
+ Defaults to 3600 * 24 * 7 (1 week).
+ </td>
+ </tr>
</table>
Note that in all of these UIs, the tables are sortable by clicking their headers,