aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala211
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala111
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala6
13 files changed, 319 insertions, 128 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index fe24260cdb..3f7cba6dbc 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -217,6 +217,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private var _heartbeatReceiver: RpcEndpointRef = _
@volatile private var _dagScheduler: DAGScheduler = _
private var _applicationId: String = _
+ private var _applicationAttemptId: Option[String] = None
private var _eventLogger: Option[EventLoggingListener] = None
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
private var _cleaner: Option[ContextCleaner] = None
@@ -315,6 +316,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
def applicationId: String = _applicationId
+ def applicationAttemptId: Option[String] = _applicationAttemptId
def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null
@@ -472,6 +474,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_taskScheduler.start()
_applicationId = _taskScheduler.applicationId()
+ _applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
_env.blockManager.initialize(_applicationId)
@@ -484,7 +487,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_eventLogger =
if (isEventLogEnabled) {
val logger =
- new EventLoggingListener(_applicationId, _eventLogDir.get, _conf, _hadoopConfiguration)
+ new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
+ _conf, _hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
@@ -1868,7 +1872,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
- startTime, sparkUser))
+ startTime, sparkUser, applicationAttemptId))
}
/** Post the application end event */
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index ea6c85ee51..6a5011af17 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -19,15 +19,19 @@ package org.apache.spark.deploy.history
import org.apache.spark.ui.SparkUI
-private[history] case class ApplicationHistoryInfo(
- id: String,
- name: String,
+private[history] case class ApplicationAttemptInfo(
+ attemptId: Option[String],
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = false)
+private[history] case class ApplicationHistoryInfo(
+ id: String,
+ name: String,
+ attempts: List[ApplicationAttemptInfo])
+
private[history] abstract class ApplicationHistoryProvider {
/**
@@ -41,9 +45,10 @@ private[history] abstract class ApplicationHistoryProvider {
* Returns the Spark UI for a specific application.
*
* @param appId The application ID.
+ * @param attemptId The application attempt ID (or None if there is no attempt ID).
* @return The application's UI, or None if application is not found.
*/
- def getAppUI(appId: String): Option[SparkUI]
+ def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI]
/**
* Called when the server is shutting down.
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index fb2cbbcccc..993763f3aa 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -32,7 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
/**
@@ -40,8 +40,12 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
* This provider checks for new finished applications in the background periodically and
* renders the history application UI by parsing the associated event logs.
*/
-private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
- with Logging {
+private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
+ extends ApplicationHistoryProvider with Logging {
+
+ def this(conf: SparkConf) = {
+ this(conf, new SystemClock())
+ }
import FsHistoryProvider._
@@ -75,8 +79,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()
- // List of applications to be deleted by event log cleaner.
- private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
+ // List of application logs to be deleted by event log cleaner.
+ private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
@@ -138,31 +142,33 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values
- override def getAppUI(appId: String): Option[SparkUI] = {
+ override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = {
try {
- applications.get(appId).map { info =>
- val replayBus = new ReplayListenerBus()
- val ui = {
- val conf = this.conf.clone()
- val appSecManager = new SecurityManager(conf)
- SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
- s"${HistoryServer.UI_PATH_PREFIX}/$appId")
- // Do not call ui.bind() to avoid creating a new server for each application
- }
+ applications.get(appId).flatMap { appInfo =>
+ appInfo.attempts.find(_.attemptId == attemptId).map { attempt =>
+ val replayBus = new ReplayListenerBus()
+ val ui = {
+ val conf = this.conf.clone()
+ val appSecManager = new SecurityManager(conf)
+ SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
+ HistoryServer.getAttemptURI(appId, attempt.attemptId))
+ // Do not call ui.bind() to avoid creating a new server for each application
+ }
- val appListener = new ApplicationEventListener()
- replayBus.addListener(appListener)
- val appInfo = replay(fs.getFileStatus(new Path(logDir, info.logPath)), replayBus)
+ val appListener = new ApplicationEventListener()
+ replayBus.addListener(appListener)
+ val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
- ui.setAppName(s"${appInfo.name} ($appId)")
+ ui.setAppName(s"${appInfo.name} ($appId)")
- val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
- ui.getSecurityManager.setAcls(uiAclsEnabled)
- // make sure to set admin acls before view acls so they are properly picked up
- ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
- ui.getSecurityManager.setViewAcls(appInfo.sparkUser,
- appListener.viewAcls.getOrElse(""))
- ui
+ val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
+ ui.getSecurityManager.setAcls(uiAclsEnabled)
+ // make sure to set admin acls before view acls so they are properly picked up
+ ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
+ ui.getSecurityManager.setViewAcls(attempt.sparkUser,
+ appListener.viewAcls.getOrElse(""))
+ ui
+ }
}
} catch {
case e: FileNotFoundException => None
@@ -220,7 +226,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = {
val bus = new ReplayListenerBus()
- val newApps = logs.flatMap { fileStatus =>
+ val newAttempts = logs.flatMap { fileStatus =>
try {
val res = replay(fileStatus, bus)
logInfo(s"Application log ${res.logPath} loaded successfully.")
@@ -232,76 +238,104 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
e)
None
}
- }.toSeq.sortWith(compareAppInfo)
-
- // When there are new logs, merge the new list with the existing one, maintaining
- // the expected ordering (descending end time). Maintaining the order is important
- // to avoid having to sort the list every time there is a request for the log list.
- if (newApps.nonEmpty) {
- val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
- def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
- if (!mergedApps.contains(info.id) ||
- mergedApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
- !info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
- mergedApps += (info.id -> info)
- }
- }
+ }
- val newIterator = newApps.iterator.buffered
- val oldIterator = applications.values.iterator.buffered
- while (newIterator.hasNext && oldIterator.hasNext) {
- if (compareAppInfo(newIterator.head, oldIterator.head)) {
- addIfAbsent(newIterator.next())
- } else {
- addIfAbsent(oldIterator.next())
+ if (newAttempts.isEmpty) {
+ return
+ }
+
+ // Build a map containing all apps that contain new attempts. The app information in this map
+ // contains both the new app attempt, and those that were already loaded in the existing apps
+ // map. If an attempt has been updated, it replaces the old attempt in the list.
+ val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
+ newAttempts.foreach { attempt =>
+ val appInfo = newAppMap.get(attempt.appId)
+ .orElse(applications.get(attempt.appId))
+ .map { app =>
+ val attempts =
+ app.attempts.filter(_.attemptId != attempt.attemptId).toList ++ List(attempt)
+ new FsApplicationHistoryInfo(attempt.appId, attempt.name,
+ attempts.sortWith(compareAttemptInfo))
}
+ .getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt)))
+ newAppMap(attempt.appId) = appInfo
+ }
+
+ // Merge the new app list with the existing one, maintaining the expected ordering (descending
+ // end time). Maintaining the order is important to avoid having to sort the list every time
+ // there is a request for the log list.
+ val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
+ val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
+ def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
+ if (!mergedApps.contains(info.id)) {
+ mergedApps += (info.id -> info)
}
- newIterator.foreach(addIfAbsent)
- oldIterator.foreach(addIfAbsent)
+ }
- applications = mergedApps
+ val newIterator = newApps.iterator.buffered
+ val oldIterator = applications.values.iterator.buffered
+ while (newIterator.hasNext && oldIterator.hasNext) {
+ if (newAppMap.contains(oldIterator.head.id)) {
+ oldIterator.next()
+ } else if (compareAppInfo(newIterator.head, oldIterator.head)) {
+ addIfAbsent(newIterator.next())
+ } else {
+ addIfAbsent(oldIterator.next())
+ }
}
+ newIterator.foreach(addIfAbsent)
+ oldIterator.foreach(addIfAbsent)
+
+ applications = mergedApps
}
/**
* Delete event logs from the log directory according to the clean policy defined by the user.
*/
- private def cleanLogs(): Unit = {
+ private[history] def cleanLogs(): Unit = {
try {
val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000
- val now = System.currentTimeMillis()
+ val now = clock.getTimeMillis()
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
+ def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = {
+ now - attempt.lastUpdated > maxAge && attempt.completed
+ }
+
// Scan all logs from the log directory.
// Only completed applications older than the specified max age will be deleted.
- applications.values.foreach { info =>
- if (now - info.lastUpdated <= maxAge || !info.completed) {
- appsToRetain += (info.id -> info)
- } else {
- appsToClean += info
+ applications.values.foreach { app =>
+ val (toClean, toRetain) = app.attempts.partition(shouldClean)
+ attemptsToClean ++= toClean
+
+ if (toClean.isEmpty) {
+ appsToRetain += (app.id -> app)
+ } else if (toRetain.nonEmpty) {
+ appsToRetain += (app.id ->
+ new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList))
}
}
applications = appsToRetain
- val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
- appsToClean.foreach { info =>
+ val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
+ attemptsToClean.foreach { attempt =>
try {
- val path = new Path(logDir, info.logPath)
+ val path = new Path(logDir, attempt.logPath)
if (fs.exists(path)) {
fs.delete(path, true)
}
} catch {
case e: AccessControlException =>
- logInfo(s"No permission to delete ${info.logPath}, ignoring.")
+ logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
case t: IOException =>
- logError(s"IOException in cleaning logs of ${info.logPath}", t)
- leftToClean += info
+ logError(s"IOException in cleaning ${attempt.logPath}", t)
+ leftToClean += attempt
}
}
- appsToClean = leftToClean
+ attemptsToClean = leftToClean
} catch {
case t: Exception => logError("Exception in cleaning logs", t)
}
@@ -315,14 +349,36 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def compareAppInfo(
i1: FsApplicationHistoryInfo,
i2: FsApplicationHistoryInfo): Boolean = {
- if (i1.endTime != i2.endTime) i1.endTime >= i2.endTime else i1.startTime >= i2.startTime
+ val a1 = i1.attempts.head
+ val a2 = i2.attempts.head
+ if (a1.endTime != a2.endTime) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
+ }
+
+ /**
+ * Comparison function that defines the sort order for application attempts within the same
+ * application. Order is: running attempts before complete attempts, running attempts sorted
+ * by start time, completed attempts sorted by end time.
+ *
+ * Normally applications should have a single running attempt; but failure to call sc.stop()
+ * may cause multiple running attempts to show up.
+ *
+ * @return Whether `a1` should precede `a2`.
+ */
+ private def compareAttemptInfo(
+ a1: FsApplicationAttemptInfo,
+ a2: FsApplicationAttemptInfo): Boolean = {
+ if (a1.completed == a2.completed) {
+ if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
+ } else {
+ !a1.completed
+ }
}
/**
* Replays the events in the specified log file and returns information about the associated
* application.
*/
- private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
+ private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val logInput =
@@ -336,10 +392,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
- new FsApplicationHistoryInfo(
+ new FsApplicationAttemptInfo(
logPath.getName(),
- appListener.appId.getOrElse(logPath.getName()),
appListener.appName.getOrElse(NOT_STARTED),
+ appListener.appId.getOrElse(logPath.getName()),
+ appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
@@ -425,13 +482,21 @@ private object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
}
-private class FsApplicationHistoryInfo(
+private class FsApplicationAttemptInfo(
val logPath: String,
- id: String,
- name: String,
+ val name: String,
+ val appId: String,
+ attemptId: Option[String],
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = true)
- extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed)
+ extends ApplicationAttemptInfo(
+ attemptId, startTime, endTime, lastUpdated, sparkUser, completed)
+
+private class FsApplicationHistoryInfo(
+ id: String,
+ override val name: String,
+ override val attempts: List[FsApplicationAttemptInfo])
+ extends ApplicationHistoryInfo(id, name, attempts)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index 3781b4e8c1..0830cc1ba1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -34,18 +34,28 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
val requestedIncomplete =
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
- val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete)
- val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
- val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))
+ val allApps = parent.getApplicationList()
+ .filter(_.attempts.head.completed != requestedIncomplete)
+ val allAppsSize = allApps.size
+
+ val actualFirst = if (requestedFirst < allAppsSize) requestedFirst else 0
+ val appsToShow = allApps.slice(actualFirst, actualFirst + pageSize)
val actualPage = (actualFirst / pageSize) + 1
- val last = Math.min(actualFirst + pageSize, allApps.size) - 1
- val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0)
+ val last = Math.min(actualFirst + pageSize, allAppsSize) - 1
+ val pageCount = allAppsSize / pageSize + (if (allAppsSize % pageSize > 0) 1 else 0)
val secondPageFromLeft = 2
val secondPageFromRight = pageCount - 1
- val appTable = UIUtils.listingTable(appHeader, appRow, apps)
+ val hasMultipleAttempts = appsToShow.exists(_.attempts.size > 1)
+ val appTable =
+ if (hasMultipleAttempts) {
+ UIUtils.listingTable(appWithAttemptHeader, appWithAttemptRow, appsToShow)
+ } else {
+ UIUtils.listingTable(appHeader, appRow, appsToShow)
+ }
+
val providerConfig = parent.getProviderConfig()
val content =
<div class="row-fluid">
@@ -59,7 +69,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
// to the first and last page. If the current page +/- `plusOrMinus` is greater
// than the 2nd page from the first page or less than the 2nd page from the last
// page, `...` will be displayed.
- if (allApps.size > 0) {
+ if (allAppsSize > 0) {
val leftSideIndices =
rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _, requestedIncomplete)
val rightSideIndices =
@@ -67,7 +77,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
requestedIncomplete)
<h4>
- Showing {actualFirst + 1}-{last + 1} of {allApps.size}
+ Showing {actualFirst + 1}-{last + 1} of {allAppsSize}
{if (requestedIncomplete) "(Incomplete applications)"}
<span style="float: right">
{
@@ -125,30 +135,85 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
"Spark User",
"Last Updated")
- private def rangeIndices(range: Seq[Int], condition: Int => Boolean, showIncomplete: Boolean):
- Seq[Node] = {
+ private val appWithAttemptHeader = Seq(
+ "App ID",
+ "App Name",
+ "Attempt ID",
+ "Started",
+ "Completed",
+ "Duration",
+ "Spark User",
+ "Last Updated")
+
+ private def rangeIndices(
+ range: Seq[Int],
+ condition: Int => Boolean,
+ showIncomplete: Boolean): Seq[Node] = {
range.filter(condition).map(nextPage =>
<a href={makePageLink(nextPage, showIncomplete)}> {nextPage} </a>)
}
- private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
- val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
- val startTime = UIUtils.formatDate(info.startTime)
- val endTime = if (info.endTime > 0) UIUtils.formatDate(info.endTime) else "-"
+ private def attemptRow(
+ renderAttemptIdColumn: Boolean,
+ info: ApplicationHistoryInfo,
+ attempt: ApplicationAttemptInfo,
+ isFirst: Boolean): Seq[Node] = {
+ val uiAddress = HistoryServer.getAttemptURI(info.id, attempt.attemptId)
+ val startTime = UIUtils.formatDate(attempt.startTime)
+ val endTime = if (attempt.endTime > 0) UIUtils.formatDate(attempt.endTime) else "-"
val duration =
- if (info.endTime > 0) UIUtils.formatDuration(info.endTime - info.startTime) else "-"
- val lastUpdated = UIUtils.formatDate(info.lastUpdated)
+ if (attempt.endTime > 0) {
+ UIUtils.formatDuration(attempt.endTime - attempt.startTime)
+ } else {
+ "-"
+ }
+ val lastUpdated = UIUtils.formatDate(attempt.lastUpdated)
<tr>
- <td><a href={uiAddress}>{info.id}</a></td>
- <td>{info.name}</td>
- <td sorttable_customkey={info.startTime.toString}>{startTime}</td>
- <td sorttable_customkey={info.endTime.toString}>{endTime}</td>
- <td sorttable_customkey={(info.endTime - info.startTime).toString}>{duration}</td>
- <td>{info.sparkUser}</td>
- <td sorttable_customkey={info.lastUpdated.toString}>{lastUpdated}</td>
+ {
+ if (isFirst) {
+ if (info.attempts.size > 1 || renderAttemptIdColumn) {
+ <td rowspan={info.attempts.size.toString} style="background-color: #ffffff">
+ <a href={uiAddress}>{info.id}</a></td>
+ <td rowspan={info.attempts.size.toString} style="background-color: #ffffff">
+ {info.name}</td>
+ } else {
+ <td><a href={uiAddress}>{info.id}</a></td>
+ <td>{info.name}</td>
+ }
+ } else {
+ Nil
+ }
+ }
+ {
+ if (renderAttemptIdColumn) {
+ if (info.attempts.size > 1 && attempt.attemptId.isDefined) {
+ <td><a href={HistoryServer.getAttemptURI(info.id, attempt.attemptId)}>
+ {attempt.attemptId.get}</a></td>
+ } else {
+ <td>&nbsp;</td>
+ }
+ } else {
+ Nil
+ }
+ }
+ <td sorttable_customkey={attempt.startTime.toString}>{startTime}</td>
+ <td sorttable_customkey={attempt.endTime.toString}>{endTime}</td>
+ <td sorttable_customkey={(attempt.endTime - attempt.startTime).toString}>
+ {duration}</td>
+ <td>{attempt.sparkUser}</td>
+ <td sorttable_customkey={attempt.lastUpdated.toString}>{lastUpdated}</td>
</tr>
}
+ private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
+ attemptRow(false, info, info.attempts.head, true)
+ }
+
+ private def appWithAttemptRow(info: ApplicationHistoryInfo): Seq[Node] = {
+ attemptRow(true, info, info.attempts.head, true) ++
+ info.attempts.drop(1).flatMap(attemptRow(true, info, _, false))
+ }
+
private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = {
"/?" + Array(
"page=" + linkPage,
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 56bef57e55..754c8e9b66 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -52,7 +52,11 @@ class HistoryServer(
private val appLoader = new CacheLoader[String, SparkUI] {
override def load(key: String): SparkUI = {
- val ui = provider.getAppUI(key).getOrElse(throw new NoSuchElementException())
+ val parts = key.split("/")
+ require(parts.length == 1 || parts.length == 2, s"Invalid app key $key")
+ val ui = provider
+ .getAppUI(parts(0), if (parts.length > 1) Some(parts(1)) else None)
+ .getOrElse(throw new NoSuchElementException())
attachSparkUI(ui)
ui
}
@@ -69,6 +73,8 @@ class HistoryServer(
private val loaderServlet = new HttpServlet {
protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
+ // Parse the URI created by getAttemptURI(). It contains an app ID and an optional
+ // attempt ID (separated by a slash).
val parts = Option(req.getPathInfo()).getOrElse("").split("/")
if (parts.length < 2) {
res.sendError(HttpServletResponse.SC_BAD_REQUEST,
@@ -76,18 +82,23 @@ class HistoryServer(
return
}
- val appId = parts(1)
+ val appKey =
+ if (parts.length == 3) {
+ s"${parts(1)}/${parts(2)}"
+ } else {
+ parts(1)
+ }
// Note we don't use the UI retrieved from the cache; the cache loader above will register
// the app's UI, and all we need to do is redirect the user to the same URI that was
// requested, and the proper data should be served at that point.
try {
- appCache.get(appId)
+ appCache.get(appKey)
res.sendRedirect(res.encodeRedirectURL(req.getRequestURI()))
} catch {
case e: Exception => e.getCause() match {
case nsee: NoSuchElementException =>
- val msg = <div class="row-fluid">Application {appId} not found.</div>
+ val msg = <div class="row-fluid">Application {appKey} not found.</div>
res.setStatus(HttpServletResponse.SC_NOT_FOUND)
UIUtils.basicSparkPage(msg, "Not Found").foreach(
n => res.getWriter().write(n.toString))
@@ -213,4 +224,9 @@ object HistoryServer extends Logging {
}
}
+ private[history] def getAttemptURI(appId: String, attemptId: Option[String]): String = {
+ val attemptSuffix = attemptId.map { id => s"/$id" }.getOrElse("")
+ s"${HistoryServer.UI_PATH_PREFIX}/${appId}${attemptSuffix}"
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 1c21c17956..dc6077f3d1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -62,7 +62,7 @@ private[master] class Master(
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
-
+
private val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
@@ -86,7 +86,7 @@ private[master] class Master(
private val drivers = new HashSet[DriverInfo]
private val completedDrivers = new ArrayBuffer[DriverInfo]
// Drivers currently spooled for scheduling
- private val waitingDrivers = new ArrayBuffer[DriverInfo]
+ private val waitingDrivers = new ArrayBuffer[DriverInfo]
private var nextDriverNumber = 0
Utils.checkHost(host, "Expected hostname")
@@ -758,24 +758,24 @@ private[master] class Master(
app.desc.appUiUrl = notFoundBasePath
return false
}
-
+
val eventLogFilePrefix = EventLoggingListener.getLogPath(
- eventLogDir, app.id, app.desc.eventLogCodec)
+ eventLogDir, app.id, None, app.desc.eventLogCodec)
val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
- val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
+ val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
EventLoggingListener.IN_PROGRESS))
-
+
if (inProgressExists) {
// Event logging is enabled for this application, but the application is still in progress
logWarning(s"Application $appName is still in progress, it may be terminated abnormally.")
}
-
+
val (eventLogFile, status) = if (inProgressExists) {
(eventLogFilePrefix + EventLoggingListener.IN_PROGRESS, " (in progress)")
} else {
(eventLogFilePrefix, " (completed)")
}
-
+
val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
val replayBus = new ReplayListenerBus()
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
@@ -859,8 +859,8 @@ private[master] class Master(
}
private def removeDriver(
- driverId: String,
- finalState: DriverState,
+ driverId: String,
+ finalState: DriverState,
exception: Option[Exception]) {
drivers.find(d => d.id == driverId) match {
case Some(driver) =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
index 6d39a5e3fa..9f218c64ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
@@ -26,6 +26,7 @@ package org.apache.spark.scheduler
private[spark] class ApplicationEventListener extends SparkListener {
var appName: Option[String] = None
var appId: Option[String] = None
+ var appAttemptId: Option[String] = None
var sparkUser: Option[String] = None
var startTime: Option[Long] = None
var endTime: Option[Long] = None
@@ -35,6 +36,7 @@ private[spark] class ApplicationEventListener extends SparkListener {
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
appName = Some(applicationStart.appName)
appId = applicationStart.appId
+ appAttemptId = applicationStart.appAttemptId
startTime = Some(applicationStart.time)
sparkUser = Some(applicationStart.sparkUser)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 08e7727db2..529a5b2bf1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -47,6 +47,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
*/
private[spark] class EventLoggingListener(
appId: String,
+ appAttemptId : Option[String],
logBaseDir: URI,
sparkConf: SparkConf,
hadoopConf: Configuration)
@@ -54,8 +55,9 @@ private[spark] class EventLoggingListener(
import EventLoggingListener._
- def this(appId: String, logBaseDir: URI, sparkConf: SparkConf) =
- this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
+ def this(appId: String, appAttemptId : Option[String], logBaseDir: URI, sparkConf: SparkConf) =
+ this(appId, appAttemptId, logBaseDir, sparkConf,
+ SparkHadoopUtil.get.newConfiguration(sparkConf))
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
@@ -89,7 +91,7 @@ private[spark] class EventLoggingListener(
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
// Visible for tests only.
- private[scheduler] val logPath = getLogPath(logBaseDir, appId, compressionCodecName)
+ private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)
/**
* Creates the log file in the configured log directory.
@@ -252,8 +254,12 @@ private[spark] object EventLoggingListener extends Logging {
* we won't know which codec to use to decompress the metadata needed to open the file in
* the first place.
*
+ * The log file name will identify the compression codec used for the contents, if any.
+ * For example, app_123 for an uncompressed log, app_123.lzf for an LZF-compressed log.
+ *
* @param logBaseDir Directory where the log file will be written.
* @param appId A unique app ID.
+ * @param appAttemptId A unique attempt id of appId. May be the empty string.
* @param compressionCodecName Name to identify the codec used to compress the contents
* of the log, or None if compression is not enabled.
* @return A path which consists of file-system-safe characters.
@@ -261,11 +267,19 @@ private[spark] object EventLoggingListener extends Logging {
def getLogPath(
logBaseDir: URI,
appId: String,
+ appAttemptId: Option[String],
compressionCodecName: Option[String] = None): String = {
- val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
- // e.g. app_123, app_123.lzf
- val logName = sanitizedAppId + compressionCodecName.map { "." + _ }.getOrElse("")
- logBaseDir.toString.stripSuffix("/") + "/" + logName
+ val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId)
+ val codec = compressionCodecName.map("." + _).getOrElse("")
+ if (appAttemptId.isDefined) {
+ base + "_" + sanitize(appAttemptId.get) + codec
+ } else {
+ base + codec
+ }
+ }
+
+ private def sanitize(str: String): String = {
+ str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 992c477493..646820520e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -41,4 +41,12 @@ private[spark] trait SchedulerBackend {
*/
def applicationId(): String = appId
+ /**
+ * Get the attempt ID for this run, if the cluster manager supports multiple
+ * attempts. Applications run in client mode will not have attempt IDs.
+ *
+ * @return The application attempt id, if available.
+ */
+ def applicationAttemptId(): Option[String] = None
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index b711ff209a..169d4fd3a9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -110,8 +110,8 @@ case class SparkListenerExecutorMetricsUpdate(
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long,
- sparkUser: String) extends SparkListenerEvent
+case class SparkListenerApplicationStart(appName: String, appId: Option[String],
+ time: Long, sparkUser: String, appAttemptId: Option[String]) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index ed3418676e..f25f3ed0d9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -73,9 +73,17 @@ private[spark] trait TaskScheduler {
* @return An application ID
*/
def applicationId(): String = appId
-
+
/**
* Process a lost executor
*/
def executorLost(executorId: String, reason: ExecutorLossReason): Unit
+
+ /**
+ * Get an application's attempt ID associated with the job.
+ *
+ * @return An application's Attempt ID
+ */
+ def applicationAttemptId(): Option[String]
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 13a52d836f..b4b8a63069 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -513,6 +513,8 @@ private[spark] class TaskSchedulerImpl(
override def applicationId(): String = backend.applicationId()
+ override def applicationAttemptId(): Option[String] = backend.applicationAttemptId()
+
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 44d274956d..ee02fbd9ce 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -194,7 +194,8 @@ private[spark] object JsonProtocol {
("App Name" -> applicationStart.appName) ~
("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
("Timestamp" -> applicationStart.time) ~
- ("User" -> applicationStart.sparkUser)
+ ("User" -> applicationStart.sparkUser) ~
+ ("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing))
}
def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
@@ -562,7 +563,8 @@ private[spark] object JsonProtocol {
val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String])
val time = (json \ "Timestamp").extract[Long]
val sparkUser = (json \ "User").extract[String]
- SparkListenerApplicationStart(appName, appId, time, sparkUser)
+ val appAttemptId = Utils.jsonOption(json \ "App Attempt ID").map(_.extract[String])
+ SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId)
}
def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {