aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-07-30 21:57:32 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-07-30 21:57:32 -0700
commita7c305b86b3b83645ae5ff5d3dfeafc20c443204 (patch)
tree3f941b44a601f55c6fa1a5ea5381d8c50794027d /core
parent118c1c422d3dfbfb2277995062678f0a808af6c3 (diff)
downloadspark-a7c305b86b3b83645ae5ff5d3dfeafc20c443204.tar.gz
spark-a7c305b86b3b83645ae5ff5d3dfeafc20c443204.tar.bz2
spark-a7c305b86b3b83645ae5ff5d3dfeafc20c443204.zip
[SPARK-2340] Resolve event logging and History Server paths properly
We resolve relative paths to the local `file:/` system for `--jars` and `--files` in spark submit (#853). We should do the same for the history server. Author: Andrew Or <andrewor14@gmail.com> Closes #1280 from andrewor14/hist-serv-fix and squashes the following commits: 13ff406 [Andrew Or] Merge branch 'master' of github.com:apache/spark into hist-serv-fix b393e17 [Andrew Or] Strip trailing "/" from logging directory 622a471 [Andrew Or] Fix test in EventLoggingListenerSuite 0e20f71 [Andrew Or] Shift responsibility of resolving paths up one level b037c0c [Andrew Or] Use resolved paths for everything in history server c7e36ee [Andrew Or] Resolve paths for event logging too 40e3933 [Andrew Or] Resolve history server file paths
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/FileLogger.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala2
7 files changed, 28 insertions, 29 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 01e7065c17..6d2d4cef1e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -36,11 +36,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
conf.getInt("spark.history.updateInterval", 10)) * 1000
private val logDir = conf.get("spark.history.fs.logDirectory", null)
- if (logDir == null) {
- throw new IllegalArgumentException("Logging directory must be specified.")
- }
+ private val resolvedLogDir = Option(logDir)
+ .map { d => Utils.resolveURI(d) }
+ .getOrElse { throw new IllegalArgumentException("Logging directory must be specified.") }
- private val fs = Utils.getHadoopFileSystem(logDir)
+ private val fs = Utils.getHadoopFileSystem(resolvedLogDir)
// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L
@@ -76,14 +76,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def initialize() {
// Validate the log directory.
- val path = new Path(logDir)
+ val path = new Path(resolvedLogDir)
if (!fs.exists(path)) {
throw new IllegalArgumentException(
- "Logging directory specified does not exist: %s".format(logDir))
+ "Logging directory specified does not exist: %s".format(resolvedLogDir))
}
if (!fs.getFileStatus(path).isDir) {
throw new IllegalArgumentException(
- "Logging directory specified is not a directory: %s".format(logDir))
+ "Logging directory specified is not a directory: %s".format(resolvedLogDir))
}
checkForLogs()
@@ -95,15 +95,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
override def getAppUI(appId: String): SparkUI = {
try {
- val appLogDir = fs.getFileStatus(new Path(logDir, appId))
- loadAppInfo(appLogDir, true)._2
+ val appLogDir = fs.getFileStatus(new Path(resolvedLogDir.toString, appId))
+ val (_, ui) = loadAppInfo(appLogDir, renderUI = true)
+ ui
} catch {
case e: FileNotFoundException => null
}
}
override def getConfig(): Map[String, String] =
- Map(("Event Log Location" -> logDir))
+ Map("Event Log Location" -> resolvedLogDir.toString)
/**
* Builds the application list based on the current contents of the log directory.
@@ -114,14 +115,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
lastLogCheckTimeMs = getMonotonicTimeMs()
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
try {
- val logStatus = fs.listStatus(new Path(logDir))
+ val logStatus = fs.listStatus(new Path(resolvedLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
- val logInfos = logDirs.filter {
- dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))
+ val logInfos = logDirs.filter { dir =>
+ fs.isFile(new Path(dir.getPath, EventLoggingListener.APPLICATION_COMPLETE))
}
val currentApps = Map[String, ApplicationHistoryInfo](
- appList.map(app => (app.id -> app)):_*)
+ appList.map(app => app.id -> app):_*)
// For any application that either (i) is not listed or (ii) has changed since the last time
// the listing was created (defined by the log dir's modification time), load the app's info.
@@ -131,7 +132,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val curr = currentApps.getOrElse(dir.getPath().getName(), null)
if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
try {
- newApps += loadAppInfo(dir, false)._1
+ val (app, _) = loadAppInfo(dir, renderUI = false)
+ newApps += app
} catch {
case e: Exception => logError(s"Failed to load app info from directory $dir.")
}
@@ -159,9 +161,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
* @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
*/
private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
- val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs)
val path = logDir.getPath
val appId = path.getName
+ val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index d7a3e3f120..c4ef8b63b0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -45,7 +45,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
<div class="row-fluid">
<div class="span12">
<ul class="unstyled">
- { providerConfig.map(e => <li><strong>{e._1}:</strong> {e._2}</li>) }
+ {providerConfig.map { case (k, v) => <li><strong>{k}:</strong> {v}</li> }}
</ul>
{
if (allApps.size > 0) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index cacb9da8c9..d1a64c1912 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -25,9 +25,9 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.ui.{WebUI, SparkUI, UIUtils}
+import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.{SignalLogger, Utils}
+import org.apache.spark.util.SignalLogger
/**
* A web server that renders SparkUIs of completed applications.
@@ -177,7 +177,7 @@ object HistoryServer extends Logging {
def main(argStrings: Array[String]) {
SignalLogger.register(log)
initSecurity()
- val args = new HistoryServerArguments(conf, argStrings)
+ new HistoryServerArguments(conf, argStrings)
val securityManager = new SecurityManager(conf)
val providerName = conf.getOption("spark.history.provider")
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index be9361b754..25fc76c23e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -18,7 +18,6 @@
package org.apache.spark.deploy.history
import org.apache.spark.SparkConf
-import org.apache.spark.util.Utils
/**
* Command-line parser for the master.
@@ -32,6 +31,7 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
args match {
case ("--dir" | "-d") :: value :: tail =>
logDir = value
+ conf.set("spark.history.fs.logDirectory", value)
parse(tail)
case ("--help" | "-h") :: tail =>
@@ -42,9 +42,6 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
case _ =>
printUsageAndExit(1)
}
- if (logDir != null) {
- conf.set("spark.history.fs.logDirectory", logDir)
- }
}
private def printUsageAndExit(exitCode: Int) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index ae6ca9f4e7..406147f167 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -29,7 +29,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
-import org.apache.spark.util.{FileLogger, JsonProtocol}
+import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}
/**
* A SparkListener that logs events to persistent storage.
@@ -55,7 +55,7 @@ private[spark] class EventLoggingListener(
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
- val logDir = logBaseDir + "/" + name
+ val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
@@ -215,7 +215,7 @@ private[spark] object EventLoggingListener extends Logging {
} catch {
case e: Exception =>
logError("Exception in parsing logging info from directory %s".format(logDir), e)
- EventLoggingInfo.empty
+ EventLoggingInfo.empty
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index 9dcdafdd63..2e8fbf5a91 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -52,7 +52,7 @@ private[spark] class FileLogger(
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}
- private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
+ private val fileSystem = Utils.getHadoopFileSystem(logDir)
var fileIndex = 0
// Only used if compression is enabled
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 21e3db34b8..10d8b29931 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -259,7 +259,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
val expectedLogDir = logDirPath.toString
- assert(eventLogger.logDir.startsWith(expectedLogDir))
+ assert(eventLogger.logDir.contains(expectedLogDir))
// Begin listening for events that trigger asserts
val eventExistenceListener = new EventExistenceListener(eventLogger)