diff options
Diffstat (limited to 'core/src/main/scala/org/apache')
13 files changed, 319 insertions, 128 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index fe24260cdb..3f7cba6dbc 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -217,6 +217,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private var _heartbeatReceiver: RpcEndpointRef = _ @volatile private var _dagScheduler: DAGScheduler = _ private var _applicationId: String = _ + private var _applicationAttemptId: Option[String] = None private var _eventLogger: Option[EventLoggingListener] = None private var _executorAllocationManager: Option[ExecutorAllocationManager] = None private var _cleaner: Option[ContextCleaner] = None @@ -315,6 +316,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } def applicationId: String = _applicationId + def applicationAttemptId: Option[String] = _applicationAttemptId def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null @@ -472,6 +474,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _taskScheduler.start() _applicationId = _taskScheduler.applicationId() + _applicationAttemptId = taskScheduler.applicationAttemptId() _conf.set("spark.app.id", _applicationId) _env.blockManager.initialize(_applicationId) @@ -484,7 +487,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _eventLogger = if (isEventLogEnabled) { val logger = - new EventLoggingListener(_applicationId, _eventLogDir.get, _conf, _hadoopConfiguration) + new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, + _conf, _hadoopConfiguration) logger.start() listenerBus.addListener(logger) Some(logger) @@ -1868,7 +1872,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Note: this code assumes that the task scheduler has been initialized and has contacted // the cluster manager to get an application ID (in case the cluster manager provides one). listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), - startTime, sparkUser)) + startTime, sparkUser, applicationAttemptId)) } /** Post the application end event */ diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index ea6c85ee51..6a5011af17 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -19,15 +19,19 @@ package org.apache.spark.deploy.history import org.apache.spark.ui.SparkUI -private[history] case class ApplicationHistoryInfo( - id: String, - name: String, +private[history] case class ApplicationAttemptInfo( + attemptId: Option[String], startTime: Long, endTime: Long, lastUpdated: Long, sparkUser: String, completed: Boolean = false) +private[history] case class ApplicationHistoryInfo( + id: String, + name: String, + attempts: List[ApplicationAttemptInfo]) + private[history] abstract class ApplicationHistoryProvider { /** @@ -41,9 +45,10 @@ private[history] abstract class ApplicationHistoryProvider { * Returns the Spark UI for a specific application. * * @param appId The application ID. + * @param attemptId The application attempt ID (or None if there is no attempt ID). * @return The application's UI, or None if application is not found. */ - def getAppUI(appId: String): Option[SparkUI] + def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] /** * Called when the server is shutting down. diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index fb2cbbcccc..993763f3aa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -32,7 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} import org.apache.spark.{Logging, SecurityManager, SparkConf} /** @@ -40,8 +40,12 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf} * This provider checks for new finished applications in the background periodically and * renders the history application UI by parsing the associated event logs. */ -private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider - with Logging { +private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) + extends ApplicationHistoryProvider with Logging { + + def this(conf: SparkConf) = { + this(conf, new SystemClock()) + } import FsHistoryProvider._ @@ -75,8 +79,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] = new mutable.LinkedHashMap() - // List of applications to be deleted by event log cleaner. - private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo] + // List of application logs to be deleted by event log cleaner. + private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] // Constants used to parse Spark 1.0.0 log directories. private[history] val LOG_PREFIX = "EVENT_LOG_" @@ -138,31 +142,33 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values - override def getAppUI(appId: String): Option[SparkUI] = { + override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = { try { - applications.get(appId).map { info => - val replayBus = new ReplayListenerBus() - val ui = { - val conf = this.conf.clone() - val appSecManager = new SecurityManager(conf) - SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId, - s"${HistoryServer.UI_PATH_PREFIX}/$appId") - // Do not call ui.bind() to avoid creating a new server for each application - } + applications.get(appId).flatMap { appInfo => + appInfo.attempts.find(_.attemptId == attemptId).map { attempt => + val replayBus = new ReplayListenerBus() + val ui = { + val conf = this.conf.clone() + val appSecManager = new SecurityManager(conf) + SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId, + HistoryServer.getAttemptURI(appId, attempt.attemptId)) + // Do not call ui.bind() to avoid creating a new server for each application + } - val appListener = new ApplicationEventListener() - replayBus.addListener(appListener) - val appInfo = replay(fs.getFileStatus(new Path(logDir, info.logPath)), replayBus) + val appListener = new ApplicationEventListener() + replayBus.addListener(appListener) + val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus) - ui.setAppName(s"${appInfo.name} ($appId)") + ui.setAppName(s"${appInfo.name} ($appId)") - val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) - ui.getSecurityManager.setAcls(uiAclsEnabled) - // make sure to set admin acls before view acls so they are properly picked up - ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse("")) - ui.getSecurityManager.setViewAcls(appInfo.sparkUser, - appListener.viewAcls.getOrElse("")) - ui + val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) + ui.getSecurityManager.setAcls(uiAclsEnabled) + // make sure to set admin acls before view acls so they are properly picked up + ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse("")) + ui.getSecurityManager.setViewAcls(attempt.sparkUser, + appListener.viewAcls.getOrElse("")) + ui + } } } catch { case e: FileNotFoundException => None @@ -220,7 +226,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis */ private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = { val bus = new ReplayListenerBus() - val newApps = logs.flatMap { fileStatus => + val newAttempts = logs.flatMap { fileStatus => try { val res = replay(fileStatus, bus) logInfo(s"Application log ${res.logPath} loaded successfully.") @@ -232,76 +238,104 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis e) None } - }.toSeq.sortWith(compareAppInfo) - - // When there are new logs, merge the new list with the existing one, maintaining - // the expected ordering (descending end time). Maintaining the order is important - // to avoid having to sort the list every time there is a request for the log list. - if (newApps.nonEmpty) { - val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - def addIfAbsent(info: FsApplicationHistoryInfo): Unit = { - if (!mergedApps.contains(info.id) || - mergedApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) && - !info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) { - mergedApps += (info.id -> info) - } - } + } - val newIterator = newApps.iterator.buffered - val oldIterator = applications.values.iterator.buffered - while (newIterator.hasNext && oldIterator.hasNext) { - if (compareAppInfo(newIterator.head, oldIterator.head)) { - addIfAbsent(newIterator.next()) - } else { - addIfAbsent(oldIterator.next()) + if (newAttempts.isEmpty) { + return + } + + // Build a map containing all apps that contain new attempts. The app information in this map + // contains both the new app attempt, and those that were already loaded in the existing apps + // map. If an attempt has been updated, it replaces the old attempt in the list. + val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]() + newAttempts.foreach { attempt => + val appInfo = newAppMap.get(attempt.appId) + .orElse(applications.get(attempt.appId)) + .map { app => + val attempts = + app.attempts.filter(_.attemptId != attempt.attemptId).toList ++ List(attempt) + new FsApplicationHistoryInfo(attempt.appId, attempt.name, + attempts.sortWith(compareAttemptInfo)) } + .getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt))) + newAppMap(attempt.appId) = appInfo + } + + // Merge the new app list with the existing one, maintaining the expected ordering (descending + // end time). Maintaining the order is important to avoid having to sort the list every time + // there is a request for the log list. + val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo) + val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + def addIfAbsent(info: FsApplicationHistoryInfo): Unit = { + if (!mergedApps.contains(info.id)) { + mergedApps += (info.id -> info) } - newIterator.foreach(addIfAbsent) - oldIterator.foreach(addIfAbsent) + } - applications = mergedApps + val newIterator = newApps.iterator.buffered + val oldIterator = applications.values.iterator.buffered + while (newIterator.hasNext && oldIterator.hasNext) { + if (newAppMap.contains(oldIterator.head.id)) { + oldIterator.next() + } else if (compareAppInfo(newIterator.head, oldIterator.head)) { + addIfAbsent(newIterator.next()) + } else { + addIfAbsent(oldIterator.next()) + } } + newIterator.foreach(addIfAbsent) + oldIterator.foreach(addIfAbsent) + + applications = mergedApps } /** * Delete event logs from the log directory according to the clean policy defined by the user. */ - private def cleanLogs(): Unit = { + private[history] def cleanLogs(): Unit = { try { val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000 - val now = System.currentTimeMillis() + val now = clock.getTimeMillis() val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = { + now - attempt.lastUpdated > maxAge && attempt.completed + } + // Scan all logs from the log directory. // Only completed applications older than the specified max age will be deleted. - applications.values.foreach { info => - if (now - info.lastUpdated <= maxAge || !info.completed) { - appsToRetain += (info.id -> info) - } else { - appsToClean += info + applications.values.foreach { app => + val (toClean, toRetain) = app.attempts.partition(shouldClean) + attemptsToClean ++= toClean + + if (toClean.isEmpty) { + appsToRetain += (app.id -> app) + } else if (toRetain.nonEmpty) { + appsToRetain += (app.id -> + new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList)) } } applications = appsToRetain - val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo] - appsToClean.foreach { info => + val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] + attemptsToClean.foreach { attempt => try { - val path = new Path(logDir, info.logPath) + val path = new Path(logDir, attempt.logPath) if (fs.exists(path)) { fs.delete(path, true) } } catch { case e: AccessControlException => - logInfo(s"No permission to delete ${info.logPath}, ignoring.") + logInfo(s"No permission to delete ${attempt.logPath}, ignoring.") case t: IOException => - logError(s"IOException in cleaning logs of ${info.logPath}", t) - leftToClean += info + logError(s"IOException in cleaning ${attempt.logPath}", t) + leftToClean += attempt } } - appsToClean = leftToClean + attemptsToClean = leftToClean } catch { case t: Exception => logError("Exception in cleaning logs", t) } @@ -315,14 +349,36 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private def compareAppInfo( i1: FsApplicationHistoryInfo, i2: FsApplicationHistoryInfo): Boolean = { - if (i1.endTime != i2.endTime) i1.endTime >= i2.endTime else i1.startTime >= i2.startTime + val a1 = i1.attempts.head + val a2 = i2.attempts.head + if (a1.endTime != a2.endTime) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime + } + + /** + * Comparison function that defines the sort order for application attempts within the same + * application. Order is: running attempts before complete attempts, running attempts sorted + * by start time, completed attempts sorted by end time. + * + * Normally applications should have a single running attempt; but failure to call sc.stop() + * may cause multiple running attempts to show up. + * + * @return Whether `a1` should precede `a2`. + */ + private def compareAttemptInfo( + a1: FsApplicationAttemptInfo, + a2: FsApplicationAttemptInfo): Boolean = { + if (a1.completed == a2.completed) { + if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime + } else { + !a1.completed + } } /** * Replays the events in the specified log file and returns information about the associated * application. */ - private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = { + private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = { val logPath = eventLog.getPath() logInfo(s"Replaying log path: $logPath") val logInput = @@ -336,10 +392,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val appCompleted = isApplicationCompleted(eventLog) bus.addListener(appListener) bus.replay(logInput, logPath.toString, !appCompleted) - new FsApplicationHistoryInfo( + new FsApplicationAttemptInfo( logPath.getName(), - appListener.appId.getOrElse(logPath.getName()), appListener.appName.getOrElse(NOT_STARTED), + appListener.appId.getOrElse(logPath.getName()), + appListener.appAttemptId, appListener.startTime.getOrElse(-1L), appListener.endTime.getOrElse(-1L), getModificationTime(eventLog).get, @@ -425,13 +482,21 @@ private object FsHistoryProvider { val DEFAULT_LOG_DIR = "file:/tmp/spark-events" } -private class FsApplicationHistoryInfo( +private class FsApplicationAttemptInfo( val logPath: String, - id: String, - name: String, + val name: String, + val appId: String, + attemptId: Option[String], startTime: Long, endTime: Long, lastUpdated: Long, sparkUser: String, completed: Boolean = true) - extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed) + extends ApplicationAttemptInfo( + attemptId, startTime, endTime, lastUpdated, sparkUser, completed) + +private class FsApplicationHistoryInfo( + id: String, + override val name: String, + override val attempts: List[FsApplicationAttemptInfo]) + extends ApplicationHistoryInfo(id, name, attempts) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 3781b4e8c1..0830cc1ba1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -34,18 +34,28 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") val requestedIncomplete = Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean - val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete) - val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0 - val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size)) + val allApps = parent.getApplicationList() + .filter(_.attempts.head.completed != requestedIncomplete) + val allAppsSize = allApps.size + + val actualFirst = if (requestedFirst < allAppsSize) requestedFirst else 0 + val appsToShow = allApps.slice(actualFirst, actualFirst + pageSize) val actualPage = (actualFirst / pageSize) + 1 - val last = Math.min(actualFirst + pageSize, allApps.size) - 1 - val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0) + val last = Math.min(actualFirst + pageSize, allAppsSize) - 1 + val pageCount = allAppsSize / pageSize + (if (allAppsSize % pageSize > 0) 1 else 0) val secondPageFromLeft = 2 val secondPageFromRight = pageCount - 1 - val appTable = UIUtils.listingTable(appHeader, appRow, apps) + val hasMultipleAttempts = appsToShow.exists(_.attempts.size > 1) + val appTable = + if (hasMultipleAttempts) { + UIUtils.listingTable(appWithAttemptHeader, appWithAttemptRow, appsToShow) + } else { + UIUtils.listingTable(appHeader, appRow, appsToShow) + } + val providerConfig = parent.getProviderConfig() val content = <div class="row-fluid"> @@ -59,7 +69,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") // to the first and last page. If the current page +/- `plusOrMinus` is greater // than the 2nd page from the first page or less than the 2nd page from the last // page, `...` will be displayed. - if (allApps.size > 0) { + if (allAppsSize > 0) { val leftSideIndices = rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _, requestedIncomplete) val rightSideIndices = @@ -67,7 +77,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") requestedIncomplete) <h4> - Showing {actualFirst + 1}-{last + 1} of {allApps.size} + Showing {actualFirst + 1}-{last + 1} of {allAppsSize} {if (requestedIncomplete) "(Incomplete applications)"} <span style="float: right"> { @@ -125,30 +135,85 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") "Spark User", "Last Updated") - private def rangeIndices(range: Seq[Int], condition: Int => Boolean, showIncomplete: Boolean): - Seq[Node] = { + private val appWithAttemptHeader = Seq( + "App ID", + "App Name", + "Attempt ID", + "Started", + "Completed", + "Duration", + "Spark User", + "Last Updated") + + private def rangeIndices( + range: Seq[Int], + condition: Int => Boolean, + showIncomplete: Boolean): Seq[Node] = { range.filter(condition).map(nextPage => <a href={makePageLink(nextPage, showIncomplete)}> {nextPage} </a>) } - private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { - val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}" - val startTime = UIUtils.formatDate(info.startTime) - val endTime = if (info.endTime > 0) UIUtils.formatDate(info.endTime) else "-" + private def attemptRow( + renderAttemptIdColumn: Boolean, + info: ApplicationHistoryInfo, + attempt: ApplicationAttemptInfo, + isFirst: Boolean): Seq[Node] = { + val uiAddress = HistoryServer.getAttemptURI(info.id, attempt.attemptId) + val startTime = UIUtils.formatDate(attempt.startTime) + val endTime = if (attempt.endTime > 0) UIUtils.formatDate(attempt.endTime) else "-" val duration = - if (info.endTime > 0) UIUtils.formatDuration(info.endTime - info.startTime) else "-" - val lastUpdated = UIUtils.formatDate(info.lastUpdated) + if (attempt.endTime > 0) { + UIUtils.formatDuration(attempt.endTime - attempt.startTime) + } else { + "-" + } + val lastUpdated = UIUtils.formatDate(attempt.lastUpdated) <tr> - <td><a href={uiAddress}>{info.id}</a></td> - <td>{info.name}</td> - <td sorttable_customkey={info.startTime.toString}>{startTime}</td> - <td sorttable_customkey={info.endTime.toString}>{endTime}</td> - <td sorttable_customkey={(info.endTime - info.startTime).toString}>{duration}</td> - <td>{info.sparkUser}</td> - <td sorttable_customkey={info.lastUpdated.toString}>{lastUpdated}</td> + { + if (isFirst) { + if (info.attempts.size > 1 || renderAttemptIdColumn) { + <td rowspan={info.attempts.size.toString} style="background-color: #ffffff"> + <a href={uiAddress}>{info.id}</a></td> + <td rowspan={info.attempts.size.toString} style="background-color: #ffffff"> + {info.name}</td> + } else { + <td><a href={uiAddress}>{info.id}</a></td> + <td>{info.name}</td> + } + } else { + Nil + } + } + { + if (renderAttemptIdColumn) { + if (info.attempts.size > 1 && attempt.attemptId.isDefined) { + <td><a href={HistoryServer.getAttemptURI(info.id, attempt.attemptId)}> + {attempt.attemptId.get}</a></td> + } else { + <td> </td> + } + } else { + Nil + } + } + <td sorttable_customkey={attempt.startTime.toString}>{startTime}</td> + <td sorttable_customkey={attempt.endTime.toString}>{endTime}</td> + <td sorttable_customkey={(attempt.endTime - attempt.startTime).toString}> + {duration}</td> + <td>{attempt.sparkUser}</td> + <td sorttable_customkey={attempt.lastUpdated.toString}>{lastUpdated}</td> </tr> } + private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { + attemptRow(false, info, info.attempts.head, true) + } + + private def appWithAttemptRow(info: ApplicationHistoryInfo): Seq[Node] = { + attemptRow(true, info, info.attempts.head, true) ++ + info.attempts.drop(1).flatMap(attemptRow(true, info, _, false)) + } + private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = { "/?" + Array( "page=" + linkPage, diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 56bef57e55..754c8e9b66 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -52,7 +52,11 @@ class HistoryServer( private val appLoader = new CacheLoader[String, SparkUI] { override def load(key: String): SparkUI = { - val ui = provider.getAppUI(key).getOrElse(throw new NoSuchElementException()) + val parts = key.split("/") + require(parts.length == 1 || parts.length == 2, s"Invalid app key $key") + val ui = provider + .getAppUI(parts(0), if (parts.length > 1) Some(parts(1)) else None) + .getOrElse(throw new NoSuchElementException()) attachSparkUI(ui) ui } @@ -69,6 +73,8 @@ class HistoryServer( private val loaderServlet = new HttpServlet { protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = { + // Parse the URI created by getAttemptURI(). It contains an app ID and an optional + // attempt ID (separated by a slash). val parts = Option(req.getPathInfo()).getOrElse("").split("/") if (parts.length < 2) { res.sendError(HttpServletResponse.SC_BAD_REQUEST, @@ -76,18 +82,23 @@ class HistoryServer( return } - val appId = parts(1) + val appKey = + if (parts.length == 3) { + s"${parts(1)}/${parts(2)}" + } else { + parts(1) + } // Note we don't use the UI retrieved from the cache; the cache loader above will register // the app's UI, and all we need to do is redirect the user to the same URI that was // requested, and the proper data should be served at that point. try { - appCache.get(appId) + appCache.get(appKey) res.sendRedirect(res.encodeRedirectURL(req.getRequestURI())) } catch { case e: Exception => e.getCause() match { case nsee: NoSuchElementException => - val msg = <div class="row-fluid">Application {appId} not found.</div> + val msg = <div class="row-fluid">Application {appKey} not found.</div> res.setStatus(HttpServletResponse.SC_NOT_FOUND) UIUtils.basicSparkPage(msg, "Not Found").foreach( n => res.getWriter().write(n.toString)) @@ -213,4 +224,9 @@ object HistoryServer extends Logging { } } + private[history] def getAttemptURI(appId: String, attemptId: Option[String]): String = { + val attemptSuffix = attemptId.map { id => s"/$id" }.getOrElse("") + s"${HistoryServer.UI_PATH_PREFIX}/${appId}${attemptSuffix}" + } + } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 1c21c17956..dc6077f3d1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -62,7 +62,7 @@ private[master] class Master( private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs - + private val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000 private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200) private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200) @@ -86,7 +86,7 @@ private[master] class Master( private val drivers = new HashSet[DriverInfo] private val completedDrivers = new ArrayBuffer[DriverInfo] // Drivers currently spooled for scheduling - private val waitingDrivers = new ArrayBuffer[DriverInfo] + private val waitingDrivers = new ArrayBuffer[DriverInfo] private var nextDriverNumber = 0 Utils.checkHost(host, "Expected hostname") @@ -758,24 +758,24 @@ private[master] class Master( app.desc.appUiUrl = notFoundBasePath return false } - + val eventLogFilePrefix = EventLoggingListener.getLogPath( - eventLogDir, app.id, app.desc.eventLogCodec) + eventLogDir, app.id, None, app.desc.eventLogCodec) val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf) - val inProgressExists = fs.exists(new Path(eventLogFilePrefix + + val inProgressExists = fs.exists(new Path(eventLogFilePrefix + EventLoggingListener.IN_PROGRESS)) - + if (inProgressExists) { // Event logging is enabled for this application, but the application is still in progress logWarning(s"Application $appName is still in progress, it may be terminated abnormally.") } - + val (eventLogFile, status) = if (inProgressExists) { (eventLogFilePrefix + EventLoggingListener.IN_PROGRESS, " (in progress)") } else { (eventLogFilePrefix, " (completed)") } - + val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs) val replayBus = new ReplayListenerBus() val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf), @@ -859,8 +859,8 @@ private[master] class Master( } private def removeDriver( - driverId: String, - finalState: DriverState, + driverId: String, + finalState: DriverState, exception: Option[Exception]) { drivers.find(d => d.id == driverId) match { case Some(driver) => diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala index 6d39a5e3fa..9f218c64ca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala @@ -26,6 +26,7 @@ package org.apache.spark.scheduler private[spark] class ApplicationEventListener extends SparkListener { var appName: Option[String] = None var appId: Option[String] = None + var appAttemptId: Option[String] = None var sparkUser: Option[String] = None var startTime: Option[Long] = None var endTime: Option[Long] = None @@ -35,6 +36,7 @@ private[spark] class ApplicationEventListener extends SparkListener { override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { appName = Some(applicationStart.appName) appId = applicationStart.appId + appAttemptId = applicationStart.appAttemptId startTime = Some(applicationStart.time) sparkUser = Some(applicationStart.sparkUser) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 08e7727db2..529a5b2bf1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -47,6 +47,7 @@ import org.apache.spark.util.{JsonProtocol, Utils} */ private[spark] class EventLoggingListener( appId: String, + appAttemptId : Option[String], logBaseDir: URI, sparkConf: SparkConf, hadoopConf: Configuration) @@ -54,8 +55,9 @@ private[spark] class EventLoggingListener( import EventLoggingListener._ - def this(appId: String, logBaseDir: URI, sparkConf: SparkConf) = - this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf)) + def this(appId: String, appAttemptId : Option[String], logBaseDir: URI, sparkConf: SparkConf) = + this(appId, appAttemptId, logBaseDir, sparkConf, + SparkHadoopUtil.get.newConfiguration(sparkConf)) private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false) private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) @@ -89,7 +91,7 @@ private[spark] class EventLoggingListener( private[scheduler] val loggedEvents = new ArrayBuffer[JValue] // Visible for tests only. - private[scheduler] val logPath = getLogPath(logBaseDir, appId, compressionCodecName) + private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) /** * Creates the log file in the configured log directory. @@ -252,8 +254,12 @@ private[spark] object EventLoggingListener extends Logging { * we won't know which codec to use to decompress the metadata needed to open the file in * the first place. * + * The log file name will identify the compression codec used for the contents, if any. + * For example, app_123 for an uncompressed log, app_123.lzf for an LZF-compressed log. + * * @param logBaseDir Directory where the log file will be written. * @param appId A unique app ID. + * @param appAttemptId A unique attempt id of appId. May be the empty string. * @param compressionCodecName Name to identify the codec used to compress the contents * of the log, or None if compression is not enabled. * @return A path which consists of file-system-safe characters. @@ -261,11 +267,19 @@ private[spark] object EventLoggingListener extends Logging { def getLogPath( logBaseDir: URI, appId: String, + appAttemptId: Option[String], compressionCodecName: Option[String] = None): String = { - val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase - // e.g. app_123, app_123.lzf - val logName = sanitizedAppId + compressionCodecName.map { "." + _ }.getOrElse("") - logBaseDir.toString.stripSuffix("/") + "/" + logName + val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId) + val codec = compressionCodecName.map("." + _).getOrElse("") + if (appAttemptId.isDefined) { + base + "_" + sanitize(appAttemptId.get) + codec + } else { + base + codec + } + } + + private def sanitize(str: String): String = { + str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 992c477493..646820520e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -41,4 +41,12 @@ private[spark] trait SchedulerBackend { */ def applicationId(): String = appId + /** + * Get the attempt ID for this run, if the cluster manager supports multiple + * attempts. Applications run in client mode will not have attempt IDs. + * + * @return The application attempt id, if available. + */ + def applicationAttemptId(): Option[String] = None + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index b711ff209a..169d4fd3a9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -110,8 +110,8 @@ case class SparkListenerExecutorMetricsUpdate( extends SparkListenerEvent @DeveloperApi -case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long, - sparkUser: String) extends SparkListenerEvent +case class SparkListenerApplicationStart(appName: String, appId: Option[String], + time: Long, sparkUser: String, appAttemptId: Option[String]) extends SparkListenerEvent @DeveloperApi case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index ed3418676e..f25f3ed0d9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -73,9 +73,17 @@ private[spark] trait TaskScheduler { * @return An application ID */ def applicationId(): String = appId - + /** * Process a lost executor */ def executorLost(executorId: String, reason: ExecutorLossReason): Unit + + /** + * Get an application's attempt ID associated with the job. + * + * @return An application's Attempt ID + */ + def applicationAttemptId(): Option[String] + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 13a52d836f..b4b8a63069 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -513,6 +513,8 @@ private[spark] class TaskSchedulerImpl( override def applicationId(): String = backend.applicationId() + override def applicationAttemptId(): Option[String] = backend.applicationAttemptId() + } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 44d274956d..ee02fbd9ce 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -194,7 +194,8 @@ private[spark] object JsonProtocol { ("App Name" -> applicationStart.appName) ~ ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~ ("Timestamp" -> applicationStart.time) ~ - ("User" -> applicationStart.sparkUser) + ("User" -> applicationStart.sparkUser) ~ + ("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) } def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = { @@ -562,7 +563,8 @@ private[spark] object JsonProtocol { val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String]) val time = (json \ "Timestamp").extract[Long] val sparkUser = (json \ "User").extract[String] - SparkListenerApplicationStart(appName, appId, time, sparkUser) + val appAttemptId = Utils.jsonOption(json \ "App Attempt ID").map(_.extract[String]) + SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId) } def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = { |