aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala15
-rw-r--r--docs/monitoring.md2
4 files changed, 40 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 2d1609b973..82a54dbfb5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -29,22 +29,27 @@ import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils
+/**
+ * A class that provides application history from event logs stored in the file system.
+ * This provider checks for new finished applications in the background periodically and
+ * renders the history application UI by parsing the associated event logs.
+ */
private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
with Logging {
+ import FsHistoryProvider._
+
private val NOT_STARTED = "<Not Started>"
// Interval between each check for event log updates
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
conf.getInt("spark.history.updateInterval", 10)) * 1000
- private val logDir = conf.get("spark.history.fs.logDirectory", null)
- private val resolvedLogDir = Option(logDir)
- .map { d => Utils.resolveURI(d) }
- .getOrElse { throw new IllegalArgumentException("Logging directory must be specified.") }
+ private val logDir = conf.getOption("spark.history.fs.logDirectory")
+ .map { d => Utils.resolveURI(d).toString }
+ .getOrElse(DEFAULT_LOG_DIR)
- private val fs = Utils.getHadoopFileSystem(resolvedLogDir,
- SparkHadoopUtil.get.newConfiguration(conf))
+ private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf))
// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L
@@ -87,14 +92,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def initialize() {
// Validate the log directory.
- val path = new Path(resolvedLogDir)
+ val path = new Path(logDir)
if (!fs.exists(path)) {
- throw new IllegalArgumentException(
- "Logging directory specified does not exist: %s".format(resolvedLogDir))
+ var msg = s"Log directory specified does not exist: $logDir."
+ if (logDir == DEFAULT_LOG_DIR) {
+ msg += " Did you configure the correct one through spark.fs.history.logDirectory?"
+ }
+ throw new IllegalArgumentException(msg)
}
if (!fs.getFileStatus(path).isDir) {
throw new IllegalArgumentException(
- "Logging directory specified is not a directory: %s".format(resolvedLogDir))
+ "Logging directory specified is not a directory: %s".format(logDir))
}
checkForLogs()
@@ -134,8 +142,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}
}
- override def getConfig(): Map[String, String] =
- Map("Event Log Location" -> resolvedLogDir.toString)
+ override def getConfig(): Map[String, String] = Map("Event log directory" -> logDir.toString)
/**
* Builds the application list based on the current contents of the log directory.
@@ -146,7 +153,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
lastLogCheckTimeMs = getMonotonicTimeMs()
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
try {
- val logStatus = fs.listStatus(new Path(resolvedLogDir))
+ val logStatus = fs.listStatus(new Path(logDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
// Load all new logs from the log directory. Only directories that have a modification time
@@ -244,6 +251,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}
+private object FsHistoryProvider {
+ val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+}
+
private class FsApplicationHistoryInfo(
val logDir: String,
id: String,
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index 0e249e51a7..5fdc350cd8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -58,7 +58,13 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
</h4> ++
appTable
} else {
- <h4>No Completed Applications Found</h4>
+ <h4>No completed applications found!</h4> ++
+ <p>Did you specify the correct logging directory?
+ Please verify your setting of <span style="font-style:italic">
+ spark.history.fs.logDirectory</span> and whether you have the permissions to
+ access it.<br /> It is also possible that your application did not run to
+ completion or did not stop the SparkContext.
+ </p>
}
}
</div>
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index 5bce32a04d..b1270ade9f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -17,14 +17,13 @@
package org.apache.spark.deploy.history
-import org.apache.spark.SparkConf
+import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.util.Utils
/**
* Command-line parser for the master.
*/
-private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) {
- private var logDir: String = null
+private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) extends Logging {
private var propertiesFile: String = null
parse(args.toList)
@@ -32,7 +31,8 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
private def parse(args: List[String]): Unit = {
args match {
case ("--dir" | "-d") :: value :: tail =>
- logDir = value
+ logWarning("Setting log directory through the command line is deprecated as of " +
+ "Spark 1.1.0. Please set this through spark.history.fs.logDirectory instead.")
conf.set("spark.history.fs.logDirectory", value)
System.setProperty("spark.history.fs.logDirectory", value)
parse(tail)
@@ -78,9 +78,10 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
| (default 50)
|FsHistoryProvider options:
|
- | spark.history.fs.logDirectory Directory where app logs are stored (required)
- | spark.history.fs.updateInterval How often to reload log data from storage (in seconds,
- | default 10)
+ | spark.history.fs.logDirectory Directory where app logs are stored
+ | (default: file:/tmp/spark-events)
+ | spark.history.fs.updateInterval How often to reload log data from storage
+ | (in seconds, default: 10)
|""".stripMargin)
System.exit(exitCode)
}
diff --git a/docs/monitoring.md b/docs/monitoring.md
index e3f81a76ac..f32cdef240 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -79,7 +79,7 @@ follows:
</tr>
<tr>
<td>spark.history.fs.logDirectory</td>
- <td>(none)</td>
+ <td>file:/tmp/spark-events</td>
<td>
Directory that contains application event logs to be loaded by the history server
</td>