diff options
Diffstat (limited to 'core/src/main/scala/org/apache')
9 files changed, 16 insertions, 263 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 4ffb5283e9..53564d0e95 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -41,7 +41,6 @@ private[spark] class ApplicationInfo( @transient var coresGranted: Int = _ @transient var endTime: Long = _ @transient var appSource: ApplicationSource = _ - @transient @volatile var appUIUrlAtHistoryServer: Option[String] = None // A cap on the number of executors this application can have at any given time. // By default, this is infinite. Only after the first allocation request is issued by the @@ -66,7 +65,6 @@ private[spark] class ApplicationInfo( nextExecutorId = 0 removedExecutors = new ArrayBuffer[ExecutorDesc] executorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE) - appUIUrlAtHistoryServer = None } private def newExecutorId(useID: Option[Int] = None): Int = { @@ -136,11 +134,4 @@ private[spark] class ApplicationInfo( System.currentTimeMillis() - startTime } } - - /** - * Returns the original application UI url unless there is its address at history server - * is defined - */ - def curAppUIUrl: String = appUIUrlAtHistoryServer.getOrElse(desc.appUiUrl) - } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index edc9be2a8a..faed4f4dc9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -17,25 +17,17 @@ package org.apache.spark.deploy.master -import java.io.FileNotFoundException -import java.net.URLEncoder import java.text.SimpleDateFormat import java.util.Date -import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} +import java.util.concurrent.{ScheduledFuture, TimeUnit} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.concurrent.{ExecutionContext, Future} -import scala.concurrent.duration.Duration -import scala.language.postfixOps import scala.util.Random -import org.apache.hadoop.fs.Path - import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState, SparkHadoopUtil} import org.apache.spark.deploy.DeployMessages._ -import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI @@ -43,9 +35,7 @@ import org.apache.spark.deploy.rest.StandaloneRestServer import org.apache.spark.internal.Logging import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc._ -import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus} import org.apache.spark.serializer.{JavaSerializer, Serializer} -import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ThreadUtils, Utils} private[deploy] class Master( @@ -59,10 +49,6 @@ private[deploy] class Master( private val forwardMessageThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread") - private val rebuildUIThread = - ThreadUtils.newDaemonSingleThreadExecutor("master-rebuild-ui-thread") - private val rebuildUIContext = ExecutionContext.fromExecutor(rebuildUIThread) - private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs @@ -85,8 +71,6 @@ private[deploy] class Master( private val addressToApp = new HashMap[RpcAddress, ApplicationInfo] private val completedApps = new ArrayBuffer[ApplicationInfo] private var nextAppNumber = 0 - // Using ConcurrentHashMap so that master-rebuild-ui-thread can add a UI after asyncRebuildUI - private val appIdToUI = new ConcurrentHashMap[String, SparkUI] private val drivers = new HashSet[DriverInfo] private val completedDrivers = new ArrayBuffer[DriverInfo] @@ -199,7 +183,6 @@ private[deploy] class Master( checkForWorkerTimeOutTask.cancel(true) } forwardMessageThread.shutdownNow() - rebuildUIThread.shutdownNow() webUi.stop() restServer.foreach(_.stop()) masterMetricsSystem.stop() @@ -391,9 +374,6 @@ private[deploy] class Master( case CheckForWorkerTimeOut => timeOutDeadWorkers() - case AttachCompletedRebuildUI(appId) => - // An asyncRebuildSparkUI has completed, so need to attach to master webUi - Option(appIdToUI.get(appId)).foreach { ui => webUi.attachSparkUI(ui) } } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -844,7 +824,6 @@ private[deploy] class Master( if (completedApps.size >= RETAINED_APPLICATIONS) { val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1) completedApps.take(toRemove).foreach { a => - Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) } applicationMetricsSystem.removeSource(a.appSource) } completedApps.trimStart(toRemove) @@ -852,9 +831,6 @@ private[deploy] class Master( completedApps += app // Remember it in our history waitingApps -= app - // If application events are logged, use them to rebuild the UI - asyncRebuildSparkUI(app) - for (exec <- app.executors.values) { killExecutor(exec) } @@ -953,89 +929,6 @@ private[deploy] class Master( exec.state = ExecutorState.KILLED } - /** - * Rebuild a new SparkUI from the given application's event logs. - * Return the UI if successful, else None - */ - private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = { - val futureUI = asyncRebuildSparkUI(app) - ThreadUtils.awaitResult(futureUI, Duration.Inf) - } - - /** Rebuild a new SparkUI asynchronously to not block RPC event loop */ - private[master] def asyncRebuildSparkUI(app: ApplicationInfo): Future[Option[SparkUI]] = { - val appName = app.desc.name - val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found" - val eventLogDir = app.desc.eventLogDir - .getOrElse { - // Event logging is disabled for this application - app.appUIUrlAtHistoryServer = Some(notFoundBasePath) - return Future.successful(None) - } - val futureUI = Future { - val eventLogFilePrefix = EventLoggingListener.getLogPath( - eventLogDir, app.id, appAttemptId = None, compressionCodecName = app.desc.eventLogCodec) - val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf) - val inProgressExists = fs.exists(new Path(eventLogFilePrefix + - EventLoggingListener.IN_PROGRESS)) - - val eventLogFile = if (inProgressExists) { - // Event logging is enabled for this application, but the application is still in progress - logWarning(s"Application $appName is still in progress, it may be terminated abnormally.") - eventLogFilePrefix + EventLoggingListener.IN_PROGRESS - } else { - eventLogFilePrefix - } - - val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs) - val replayBus = new ReplayListenerBus() - val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf), - appName, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", app.startTime) - try { - replayBus.replay(logInput, eventLogFile, inProgressExists) - } finally { - logInput.close() - } - - Some(ui) - }(rebuildUIContext) - - futureUI.onSuccess { case Some(ui) => - appIdToUI.put(app.id, ui) - // `self` can be null if we are already in the process of shutting down - // This happens frequently in tests where `local-cluster` is used - if (self != null) { - self.send(AttachCompletedRebuildUI(app.id)) - } - // Application UI is successfully rebuilt, so link the Master UI to it - // NOTE - app.appUIUrlAtHistoryServer is volatile - app.appUIUrlAtHistoryServer = Some(ui.basePath) - }(ThreadUtils.sameThread) - - futureUI.onFailure { - case fnf: FileNotFoundException => - // Event logging is enabled for this application, but no event logs are found - val title = s"Application history not found (${app.id})" - var msg = s"No event logs found for application $appName in ${app.desc.eventLogDir.get}." - logWarning(msg) - msg += " Did you specify the correct logging directory?" - msg = URLEncoder.encode(msg, "UTF-8") - app.appUIUrlAtHistoryServer = Some(notFoundBasePath + s"?msg=$msg&title=$title") - - case e: Exception => - // Relay exception message to application UI page - val title = s"Application history load error (${app.id})" - val exception = URLEncoder.encode(Utils.exceptionString(e), "UTF-8") - var msg = s"Exception in replaying log for application $appName!" - logError(msg, e) - msg = URLEncoder.encode(msg, "UTF-8") - app.appUIUrlAtHistoryServer = - Some(notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title") - }(ThreadUtils.sameThread) - - futureUI - } - /** Generate a new app ID given a app's submission date */ private def newApplicationId(submitDate: Date): String = { val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala index a055d09767..a952cee36e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala @@ -39,6 +39,4 @@ private[master] object MasterMessages { case object BoundPortsRequest case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int]) - - case class AttachCompletedRebuildUI(appId: String) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 96274958d1..8875fc2232 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -75,7 +75,11 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") </li> <li><strong>Submit Date:</strong> {app.submitDate}</li> <li><strong>State:</strong> {app.state}</li> - <li><strong><a href={app.curAppUIUrl}>Application Detail UI</a></strong></li> + { + if (!app.isFinished) { + <li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li> + } + } </ul> </div> </div> diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala deleted file mode 100644 index e021f1eef7..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.master.ui - -import java.net.URLDecoder -import javax.servlet.http.HttpServletRequest - -import scala.xml.Node - -import org.apache.spark.ui.{UIUtils, WebUIPage} - -private[ui] class HistoryNotFoundPage(parent: MasterWebUI) - extends WebUIPage("history/not-found") { - - /** - * Render a page that conveys failure in loading application history. - * - * This accepts 3 HTTP parameters: - * msg = message to display to the user - * title = title of the page - * exception = detailed description of the exception in loading application history (if any) - * - * Parameters "msg" and "exception" are assumed to be UTF-8 encoded. - */ - def render(request: HttpServletRequest): Seq[Node] = { - val titleParam = request.getParameter("title") - val msgParam = request.getParameter("msg") - val exceptionParam = request.getParameter("exception") - - // If no parameters are specified, assume the user did not enable event logging - val defaultTitle = "Event logging is not enabled" - val defaultContent = - <div class="row-fluid"> - <div class="span12" style="font-size:14px"> - No event logs were found for this application! To - <a href="http://spark.apache.org/docs/latest/monitoring.html">enable event logging</a>, - set <span style="font-style:italic">spark.eventLog.enabled</span> to true and - <span style="font-style:italic">spark.eventLog.dir</span> to the directory to which your - event logs are written. - </div> - </div> - - val title = Option(titleParam).getOrElse(defaultTitle) - val content = Option(msgParam) - .map { msg => URLDecoder.decode(msg, "UTF-8") } - .map { msg => - <div class="row-fluid"> - <div class="span12" style="font-size:14px">{msg}</div> - </div> ++ - Option(exceptionParam) - .map { e => URLDecoder.decode(e, "UTF-8") } - .map { e => <pre>{e}</pre> } - .getOrElse(Seq.empty) - }.getOrElse(defaultContent) - - UIUtils.basicSparkPage(content, title) - } -} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index 363f4b84f8..75de3ede78 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -206,7 +206,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { {killLink} </td> <td> - <a href={app.curAppUIUrl}>{app.desc.name}</a> + { + if (app.isFinished) { + app.desc.name + } else { + <a href={app.desc.appUiUrl}>{app.desc.name}</a> + } + } </td> <td> {app.coresGranted} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index ae16ce90c8..a0727ad83f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -19,8 +19,6 @@ package org.apache.spark.deploy.master.ui import org.apache.spark.deploy.master.Master import org.apache.spark.internal.Logging -import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource, - UIRoot} import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ @@ -30,60 +28,26 @@ import org.apache.spark.ui.JettyUtils._ private[master] class MasterWebUI( val master: Master, - requestedPort: Int, - customMasterPage: Option[MasterPage] = None) + requestedPort: Int) extends WebUI(master.securityMgr, master.securityMgr.getSSLOptions("standalone"), - requestedPort, master.conf, name = "MasterUI") with Logging with UIRoot { + requestedPort, master.conf, name = "MasterUI") with Logging { val masterEndpointRef = master.self val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true) - val masterPage = customMasterPage.getOrElse(new MasterPage(this)) - initialize() /** Initialize all components of the server. */ def initialize() { val masterPage = new MasterPage(this) attachPage(new ApplicationPage(this)) - attachPage(new HistoryNotFoundPage(this)) attachPage(masterPage) attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static")) - attachHandler(ApiRootResource.getServletHandler(this)) attachHandler(createRedirectHandler( "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST"))) attachHandler(createRedirectHandler( "/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST"))) } - - /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */ - def attachSparkUI(ui: SparkUI) { - assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs") - ui.getHandlers.foreach(attachHandler) - } - - /** Detach a reconstructed UI from this Master UI. Only valid after bind(). */ - def detachSparkUI(ui: SparkUI) { - assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs") - ui.getHandlers.foreach(detachHandler) - } - - def getApplicationInfoList: Iterator[ApplicationInfo] = { - val state = masterPage.getMasterState - val activeApps = state.activeApps.sortBy(_.startTime).reverse - val completedApps = state.completedApps.sortBy(_.endTime).reverse - activeApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, false) } ++ - completedApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, true) } - } - - def getSparkUI(appId: String): Option[SparkUI] = { - val state = masterPage.getMasterState - val activeApps = state.activeApps.sortBy(_.startTime).reverse - val completedApps = state.completedApps.sortBy(_.endTime).reverse - (activeApps ++ completedApps).find { _.id == appId }.flatMap { - master.rebuildSparkUI - } - } } private[master] object MasterWebUI { diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala index ba9cd711f1..2cd51a9ed5 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala @@ -205,7 +205,7 @@ private[spark] object ApiRootResource { /** * This trait is shared by the all the root containers for application UI information -- - * the HistoryServer, the Master UI, and the application UI. This provides the common + * the HistoryServer and the application UI. This provides the common * interface needed for them all to expose application info as json. */ private[spark] trait UIRoot { diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index 0f30183682..02fd2985fa 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -21,7 +21,6 @@ import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam} import javax.ws.rs.core.MediaType import org.apache.spark.deploy.history.ApplicationHistoryInfo -import org.apache.spark.deploy.master.{ApplicationInfo => InternalApplicationInfo} @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class ApplicationListResource(uiRoot: UIRoot) { @@ -84,33 +83,4 @@ private[spark] object ApplicationsListResource { } ) } - - def convertApplicationInfo( - internal: InternalApplicationInfo, - completed: Boolean): ApplicationInfo = { - // standalone application info always has just one attempt - new ApplicationInfo( - id = internal.id, - name = internal.desc.name, - coresGranted = Some(internal.coresGranted), - maxCores = internal.desc.maxCores, - coresPerExecutor = internal.desc.coresPerExecutor, - memoryPerExecutorMB = Some(internal.desc.memoryPerExecutorMB), - attempts = Seq(new ApplicationAttemptInfo( - attemptId = None, - startTime = new Date(internal.startTime), - endTime = new Date(internal.endTime), - duration = - if (internal.endTime > 0) { - internal.endTime - internal.startTime - } else { - 0 - }, - lastUpdated = new Date(internal.endTime), - sparkUser = internal.desc.user, - completed = completed - )) - ) - } - } |