aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala109
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala73
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala40
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala30
9 files changed, 16 insertions, 263 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index 4ffb5283e9..53564d0e95 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -41,7 +41,6 @@ private[spark] class ApplicationInfo(
@transient var coresGranted: Int = _
@transient var endTime: Long = _
@transient var appSource: ApplicationSource = _
- @transient @volatile var appUIUrlAtHistoryServer: Option[String] = None
// A cap on the number of executors this application can have at any given time.
// By default, this is infinite. Only after the first allocation request is issued by the
@@ -66,7 +65,6 @@ private[spark] class ApplicationInfo(
nextExecutorId = 0
removedExecutors = new ArrayBuffer[ExecutorDesc]
executorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE)
- appUIUrlAtHistoryServer = None
}
private def newExecutorId(useID: Option[Int] = None): Int = {
@@ -136,11 +134,4 @@ private[spark] class ApplicationInfo(
System.currentTimeMillis() - startTime
}
}
-
- /**
- * Returns the original application UI url unless there is its address at history server
- * is defined
- */
- def curAppUIUrl: String = appUIUrlAtHistoryServer.getOrElse(desc.appUiUrl)
-
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index edc9be2a8a..faed4f4dc9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -17,25 +17,17 @@
package org.apache.spark.deploy.master
-import java.io.FileNotFoundException
-import java.net.URLEncoder
import java.text.SimpleDateFormat
import java.util.Date
-import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.concurrent.{ExecutionContext, Future}
-import scala.concurrent.duration.Duration
-import scala.language.postfixOps
import scala.util.Random
-import org.apache.hadoop.fs.Path
-
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
ExecutorState, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages._
-import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
@@ -43,9 +35,7 @@ import org.apache.spark.deploy.rest.StandaloneRestServer
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
-import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.serializer.{JavaSerializer, Serializer}
-import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ThreadUtils, Utils}
private[deploy] class Master(
@@ -59,10 +49,6 @@ private[deploy] class Master(
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
- private val rebuildUIThread =
- ThreadUtils.newDaemonSingleThreadExecutor("master-rebuild-ui-thread")
- private val rebuildUIContext = ExecutionContext.fromExecutor(rebuildUIThread)
-
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
@@ -85,8 +71,6 @@ private[deploy] class Master(
private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
private val completedApps = new ArrayBuffer[ApplicationInfo]
private var nextAppNumber = 0
- // Using ConcurrentHashMap so that master-rebuild-ui-thread can add a UI after asyncRebuildUI
- private val appIdToUI = new ConcurrentHashMap[String, SparkUI]
private val drivers = new HashSet[DriverInfo]
private val completedDrivers = new ArrayBuffer[DriverInfo]
@@ -199,7 +183,6 @@ private[deploy] class Master(
checkForWorkerTimeOutTask.cancel(true)
}
forwardMessageThread.shutdownNow()
- rebuildUIThread.shutdownNow()
webUi.stop()
restServer.foreach(_.stop())
masterMetricsSystem.stop()
@@ -391,9 +374,6 @@ private[deploy] class Master(
case CheckForWorkerTimeOut =>
timeOutDeadWorkers()
- case AttachCompletedRebuildUI(appId) =>
- // An asyncRebuildSparkUI has completed, so need to attach to master webUi
- Option(appIdToUI.get(appId)).foreach { ui => webUi.attachSparkUI(ui) }
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -844,7 +824,6 @@ private[deploy] class Master(
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach { a =>
- Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
}
completedApps.trimStart(toRemove)
@@ -852,9 +831,6 @@ private[deploy] class Master(
completedApps += app // Remember it in our history
waitingApps -= app
- // If application events are logged, use them to rebuild the UI
- asyncRebuildSparkUI(app)
-
for (exec <- app.executors.values) {
killExecutor(exec)
}
@@ -953,89 +929,6 @@ private[deploy] class Master(
exec.state = ExecutorState.KILLED
}
- /**
- * Rebuild a new SparkUI from the given application's event logs.
- * Return the UI if successful, else None
- */
- private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = {
- val futureUI = asyncRebuildSparkUI(app)
- ThreadUtils.awaitResult(futureUI, Duration.Inf)
- }
-
- /** Rebuild a new SparkUI asynchronously to not block RPC event loop */
- private[master] def asyncRebuildSparkUI(app: ApplicationInfo): Future[Option[SparkUI]] = {
- val appName = app.desc.name
- val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
- val eventLogDir = app.desc.eventLogDir
- .getOrElse {
- // Event logging is disabled for this application
- app.appUIUrlAtHistoryServer = Some(notFoundBasePath)
- return Future.successful(None)
- }
- val futureUI = Future {
- val eventLogFilePrefix = EventLoggingListener.getLogPath(
- eventLogDir, app.id, appAttemptId = None, compressionCodecName = app.desc.eventLogCodec)
- val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
- val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
- EventLoggingListener.IN_PROGRESS))
-
- val eventLogFile = if (inProgressExists) {
- // Event logging is enabled for this application, but the application is still in progress
- logWarning(s"Application $appName is still in progress, it may be terminated abnormally.")
- eventLogFilePrefix + EventLoggingListener.IN_PROGRESS
- } else {
- eventLogFilePrefix
- }
-
- val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
- val replayBus = new ReplayListenerBus()
- val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
- appName, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", app.startTime)
- try {
- replayBus.replay(logInput, eventLogFile, inProgressExists)
- } finally {
- logInput.close()
- }
-
- Some(ui)
- }(rebuildUIContext)
-
- futureUI.onSuccess { case Some(ui) =>
- appIdToUI.put(app.id, ui)
- // `self` can be null if we are already in the process of shutting down
- // This happens frequently in tests where `local-cluster` is used
- if (self != null) {
- self.send(AttachCompletedRebuildUI(app.id))
- }
- // Application UI is successfully rebuilt, so link the Master UI to it
- // NOTE - app.appUIUrlAtHistoryServer is volatile
- app.appUIUrlAtHistoryServer = Some(ui.basePath)
- }(ThreadUtils.sameThread)
-
- futureUI.onFailure {
- case fnf: FileNotFoundException =>
- // Event logging is enabled for this application, but no event logs are found
- val title = s"Application history not found (${app.id})"
- var msg = s"No event logs found for application $appName in ${app.desc.eventLogDir.get}."
- logWarning(msg)
- msg += " Did you specify the correct logging directory?"
- msg = URLEncoder.encode(msg, "UTF-8")
- app.appUIUrlAtHistoryServer = Some(notFoundBasePath + s"?msg=$msg&title=$title")
-
- case e: Exception =>
- // Relay exception message to application UI page
- val title = s"Application history load error (${app.id})"
- val exception = URLEncoder.encode(Utils.exceptionString(e), "UTF-8")
- var msg = s"Exception in replaying log for application $appName!"
- logError(msg, e)
- msg = URLEncoder.encode(msg, "UTF-8")
- app.appUIUrlAtHistoryServer =
- Some(notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title")
- }(ThreadUtils.sameThread)
-
- futureUI
- }
-
/** Generate a new app ID given a app's submission date */
private def newApplicationId(submitDate: Date): String = {
val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
index a055d09767..a952cee36e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
@@ -39,6 +39,4 @@ private[master] object MasterMessages {
case object BoundPortsRequest
case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int])
-
- case class AttachCompletedRebuildUI(appId: String)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 96274958d1..8875fc2232 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -75,7 +75,11 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
</li>
<li><strong>Submit Date:</strong> {app.submitDate}</li>
<li><strong>State:</strong> {app.state}</li>
- <li><strong><a href={app.curAppUIUrl}>Application Detail UI</a></strong></li>
+ {
+ if (!app.isFinished) {
+ <li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
+ }
+ }
</ul>
</div>
</div>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala
deleted file mode 100644
index e021f1eef7..0000000000
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.master.ui
-
-import java.net.URLDecoder
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import org.apache.spark.ui.{UIUtils, WebUIPage}
-
-private[ui] class HistoryNotFoundPage(parent: MasterWebUI)
- extends WebUIPage("history/not-found") {
-
- /**
- * Render a page that conveys failure in loading application history.
- *
- * This accepts 3 HTTP parameters:
- * msg = message to display to the user
- * title = title of the page
- * exception = detailed description of the exception in loading application history (if any)
- *
- * Parameters "msg" and "exception" are assumed to be UTF-8 encoded.
- */
- def render(request: HttpServletRequest): Seq[Node] = {
- val titleParam = request.getParameter("title")
- val msgParam = request.getParameter("msg")
- val exceptionParam = request.getParameter("exception")
-
- // If no parameters are specified, assume the user did not enable event logging
- val defaultTitle = "Event logging is not enabled"
- val defaultContent =
- <div class="row-fluid">
- <div class="span12" style="font-size:14px">
- No event logs were found for this application! To
- <a href="http://spark.apache.org/docs/latest/monitoring.html">enable event logging</a>,
- set <span style="font-style:italic">spark.eventLog.enabled</span> to true and
- <span style="font-style:italic">spark.eventLog.dir</span> to the directory to which your
- event logs are written.
- </div>
- </div>
-
- val title = Option(titleParam).getOrElse(defaultTitle)
- val content = Option(msgParam)
- .map { msg => URLDecoder.decode(msg, "UTF-8") }
- .map { msg =>
- <div class="row-fluid">
- <div class="span12" style="font-size:14px">{msg}</div>
- </div> ++
- Option(exceptionParam)
- .map { e => URLDecoder.decode(e, "UTF-8") }
- .map { e => <pre>{e}</pre> }
- .getOrElse(Seq.empty)
- }.getOrElse(defaultContent)
-
- UIUtils.basicSparkPage(content, title)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index 363f4b84f8..75de3ede78 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -206,7 +206,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
{killLink}
</td>
<td>
- <a href={app.curAppUIUrl}>{app.desc.name}</a>
+ {
+ if (app.isFinished) {
+ app.desc.name
+ } else {
+ <a href={app.desc.appUiUrl}>{app.desc.name}</a>
+ }
+ }
</td>
<td>
{app.coresGranted}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index ae16ce90c8..a0727ad83f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -19,8 +19,6 @@ package org.apache.spark.deploy.master.ui
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.Logging
-import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource,
- UIRoot}
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
@@ -30,60 +28,26 @@ import org.apache.spark.ui.JettyUtils._
private[master]
class MasterWebUI(
val master: Master,
- requestedPort: Int,
- customMasterPage: Option[MasterPage] = None)
+ requestedPort: Int)
extends WebUI(master.securityMgr, master.securityMgr.getSSLOptions("standalone"),
- requestedPort, master.conf, name = "MasterUI") with Logging with UIRoot {
+ requestedPort, master.conf, name = "MasterUI") with Logging {
val masterEndpointRef = master.self
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
- val masterPage = customMasterPage.getOrElse(new MasterPage(this))
-
initialize()
/** Initialize all components of the server. */
def initialize() {
val masterPage = new MasterPage(this)
attachPage(new ApplicationPage(this))
- attachPage(new HistoryNotFoundPage(this))
attachPage(masterPage)
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
- attachHandler(ApiRootResource.getServletHandler(this))
attachHandler(createRedirectHandler(
"/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST")))
attachHandler(createRedirectHandler(
"/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST")))
}
-
- /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
- def attachSparkUI(ui: SparkUI) {
- assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
- ui.getHandlers.foreach(attachHandler)
- }
-
- /** Detach a reconstructed UI from this Master UI. Only valid after bind(). */
- def detachSparkUI(ui: SparkUI) {
- assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
- ui.getHandlers.foreach(detachHandler)
- }
-
- def getApplicationInfoList: Iterator[ApplicationInfo] = {
- val state = masterPage.getMasterState
- val activeApps = state.activeApps.sortBy(_.startTime).reverse
- val completedApps = state.completedApps.sortBy(_.endTime).reverse
- activeApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, false) } ++
- completedApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, true) }
- }
-
- def getSparkUI(appId: String): Option[SparkUI] = {
- val state = masterPage.getMasterState
- val activeApps = state.activeApps.sortBy(_.startTime).reverse
- val completedApps = state.completedApps.sortBy(_.endTime).reverse
- (activeApps ++ completedApps).find { _.id == appId }.flatMap {
- master.rebuildSparkUI
- }
- }
}
private[master] object MasterWebUI {
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
index ba9cd711f1..2cd51a9ed5 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
@@ -205,7 +205,7 @@ private[spark] object ApiRootResource {
/**
* This trait is shared by the all the root containers for application UI information --
- * the HistoryServer, the Master UI, and the application UI. This provides the common
+ * the HistoryServer and the application UI. This provides the common
* interface needed for them all to expose application info as json.
*/
private[spark] trait UIRoot {
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
index 0f30183682..02fd2985fa 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
@@ -21,7 +21,6 @@ import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam}
import javax.ws.rs.core.MediaType
import org.apache.spark.deploy.history.ApplicationHistoryInfo
-import org.apache.spark.deploy.master.{ApplicationInfo => InternalApplicationInfo}
@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class ApplicationListResource(uiRoot: UIRoot) {
@@ -84,33 +83,4 @@ private[spark] object ApplicationsListResource {
}
)
}
-
- def convertApplicationInfo(
- internal: InternalApplicationInfo,
- completed: Boolean): ApplicationInfo = {
- // standalone application info always has just one attempt
- new ApplicationInfo(
- id = internal.id,
- name = internal.desc.name,
- coresGranted = Some(internal.coresGranted),
- maxCores = internal.desc.maxCores,
- coresPerExecutor = internal.desc.coresPerExecutor,
- memoryPerExecutorMB = Some(internal.desc.memoryPerExecutorMB),
- attempts = Seq(new ApplicationAttemptInfo(
- attemptId = None,
- startTime = new Date(internal.startTime),
- endTime = new Date(internal.endTime),
- duration =
- if (internal.endTime > 0) {
- internal.endTime - internal.startTime
- } else {
- 0
- },
- lastUpdated = new Date(internal.endTime),
- sparkUser = internal.desc.user,
- completed = completed
- ))
- )
- }
-
}