aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala7
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala4
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala2
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala16
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala3
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala5
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala2
11 files changed, 46 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index a8c9ac0724..01e7065c17 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -169,7 +169,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val ui: SparkUI = if (renderUI) {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
- new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
+ new SparkUI(conf, appSecManager, replayBus, appId,
+ HistoryServer.UI_PATH_PREFIX + s"/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
} else {
null
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index a958c837c2..d7a3e3f120 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -75,7 +75,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
"Last Updated")
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
- val uiAddress = "/history/" + info.id
+ val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
val startTime = UIUtils.formatDate(info.startTime)
val endTime = UIUtils.formatDate(info.endTime)
val duration = UIUtils.formatDuration(info.endTime - info.startTime)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 56b38ddfc9..cacb9da8c9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -114,7 +114,7 @@ class HistoryServer(
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
val contextHandler = new ServletContextHandler
- contextHandler.setContextPath("/history")
+ contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX)
contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
attachHandler(contextHandler)
}
@@ -172,6 +172,8 @@ class HistoryServer(
object HistoryServer extends Logging {
private val conf = new SparkConf
+ val UI_PATH_PREFIX = "/history"
+
def main(argStrings: Array[String]) {
SignalLogger.register(log)
initSecurity()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index bb1fcc8190..21f8667819 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -35,6 +35,7 @@ import akka.serialization.SerializationExtension
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
@@ -664,9 +665,10 @@ private[spark] class Master(
*/
def rebuildSparkUI(app: ApplicationInfo): Boolean = {
val appName = app.desc.name
+ val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
val eventLogDir = app.desc.eventLogDir.getOrElse {
// Event logging is not enabled for this application
- app.desc.appUiUrl = "/history/not-found"
+ app.desc.appUiUrl = notFoundBasePath
return false
}
val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
@@ -681,13 +683,14 @@ private[spark] class Master(
logWarning(msg)
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
- app.desc.appUiUrl = s"/history/not-found?msg=$msg&title=$title"
+ app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
return false
}
try {
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
- val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id)
+ val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)",
+ HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
replayBus.replay()
appIdToUI(app.id) = ui
webUi.attachSparkUI(ui)
@@ -702,7 +705,7 @@ private[spark] class Master(
var msg = s"Exception in replaying log for application $appName!"
logError(msg, e)
msg = URLEncoder.encode(msg, "UTF-8")
- app.desc.appUiUrl = s"/history/not-found?msg=$msg&exception=$exception&title=$title"
+ app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title"
false
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index a90b0d475c..ae6ca9f4e7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -64,6 +64,13 @@ private[spark] class EventLoggingListener(
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
/**
+ * Return only the unique application directory without the base directory.
+ */
+ def getApplicationLogDir(): String = {
+ name
+ }
+
+ /**
* Begin logging events.
* If compression is used, log a file that indicates which compression library is used.
*/
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 3ec36487dc..62b5c3bc5f 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -60,6 +60,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var yarnAllocator: YarnAllocationHandler = _
private var isFinished: Boolean = false
private var uiAddress: String = _
+ private var uiHistoryAddress: String = _
private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true
@@ -237,6 +238,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
if (null != sparkContext) {
uiAddress = sparkContext.ui.appUIHostPort
+ uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf)
this.yarnAllocator = YarnAllocationHandler.newAllocator(
yarnConf,
resourceManager,
@@ -360,7 +362,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
finishReq.setDiagnostics(diagnostics)
- finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", ""))
+ finishReq.setTrackingUrl(uiHistoryAddress)
resourceManager.finishApplicationMaster(finishReq)
}
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index a86ad256df..d232c18d2f 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -289,7 +289,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
- finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", ""))
+ finishReq.setTrackingUrl(sparkConf.get("spark.driver.appUIHistoryAddress", ""))
resourceManager.finishApplicationMaster(finishReq)
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 718cb19f57..e98308cdbd 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -30,6 +30,9 @@ import org.apache.hadoop.util.StringInterner
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.SparkHadoopUtil
/**
@@ -132,4 +135,17 @@ object YarnSparkHadoopUtil {
}
}
+ def getUIHistoryAddress(sc: SparkContext, conf: SparkConf) : String = {
+ val eventLogDir = sc.eventLogger match {
+ case Some(logger) => logger.getApplicationLogDir()
+ case None => ""
+ }
+ val historyServerAddress = conf.get("spark.yarn.historyServer.address", "")
+ if (historyServerAddress != "" && eventLogDir != "") {
+ historyServerAddress + HistoryServer.UI_PATH_PREFIX + s"/$eventLogDir"
+ } else {
+ ""
+ }
+ }
+
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index d8266f7b0c..77b91f8e26 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher, YarnSparkHadoopUtil}
import org.apache.spark.scheduler.TaskSchedulerImpl
import scala.collection.mutable.ArrayBuffer
@@ -54,6 +54,7 @@ private[spark] class YarnClientSchedulerBackend(
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
+ conf.set("spark.driver.appUIHistoryAddress", YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf))
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index eaf594c8b4..035356d390 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -59,6 +59,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var yarnAllocator: YarnAllocationHandler = _
private var isFinished: Boolean = false
private var uiAddress: String = _
+ private var uiHistoryAddress: String = _
private val maxAppAttempts: Int = conf.getInt(
YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
private var isLastAMRetry: Boolean = true
@@ -216,6 +217,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
if (sparkContext != null) {
uiAddress = sparkContext.ui.appUIHostPort
+ uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf)
this.yarnAllocator = YarnAllocationHandler.newAllocator(
yarnConf,
amClient,
@@ -312,8 +314,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
logInfo("Unregistering ApplicationMaster with " + status)
if (registered) {
- val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
- amClient.unregisterApplicationMaster(status, diagnostics, trackingUrl)
+ amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
}
}
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 5ac95f3798..7158d9442a 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -250,7 +250,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
def finishApplicationMaster(status: FinalApplicationStatus) {
logInfo("Unregistering ApplicationMaster with " + status)
- val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
+ val trackingUrl = sparkConf.get("spark.driver.appUIHistoryAddress", "")
amClient.unregisterApplicationMaster(status, "" /* appMessage */ , trackingUrl)
}