aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala211
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala111
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala243
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala11
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala7
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala12
21 files changed, 546 insertions, 201 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index fe24260cdb..3f7cba6dbc 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -217,6 +217,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private var _heartbeatReceiver: RpcEndpointRef = _
@volatile private var _dagScheduler: DAGScheduler = _
private var _applicationId: String = _
+ private var _applicationAttemptId: Option[String] = None
private var _eventLogger: Option[EventLoggingListener] = None
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
private var _cleaner: Option[ContextCleaner] = None
@@ -315,6 +316,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
def applicationId: String = _applicationId
+ def applicationAttemptId: Option[String] = _applicationAttemptId
def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null
@@ -472,6 +474,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_taskScheduler.start()
_applicationId = _taskScheduler.applicationId()
+ _applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
_env.blockManager.initialize(_applicationId)
@@ -484,7 +487,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_eventLogger =
if (isEventLogEnabled) {
val logger =
- new EventLoggingListener(_applicationId, _eventLogDir.get, _conf, _hadoopConfiguration)
+ new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
+ _conf, _hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
@@ -1868,7 +1872,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
- startTime, sparkUser))
+ startTime, sparkUser, applicationAttemptId))
}
/** Post the application end event */
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index ea6c85ee51..6a5011af17 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -19,15 +19,19 @@ package org.apache.spark.deploy.history
import org.apache.spark.ui.SparkUI
-private[history] case class ApplicationHistoryInfo(
- id: String,
- name: String,
+private[history] case class ApplicationAttemptInfo(
+ attemptId: Option[String],
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = false)
+private[history] case class ApplicationHistoryInfo(
+ id: String,
+ name: String,
+ attempts: List[ApplicationAttemptInfo])
+
private[history] abstract class ApplicationHistoryProvider {
/**
@@ -41,9 +45,10 @@ private[history] abstract class ApplicationHistoryProvider {
* Returns the Spark UI for a specific application.
*
* @param appId The application ID.
+ * @param attemptId The application attempt ID (or None if there is no attempt ID).
* @return The application's UI, or None if application is not found.
*/
- def getAppUI(appId: String): Option[SparkUI]
+ def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI]
/**
* Called when the server is shutting down.
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index fb2cbbcccc..993763f3aa 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -32,7 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
/**
@@ -40,8 +40,12 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
* This provider checks for new finished applications in the background periodically and
* renders the history application UI by parsing the associated event logs.
*/
-private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
- with Logging {
+private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
+ extends ApplicationHistoryProvider with Logging {
+
+ def this(conf: SparkConf) = {
+ this(conf, new SystemClock())
+ }
import FsHistoryProvider._
@@ -75,8 +79,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()
- // List of applications to be deleted by event log cleaner.
- private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
+ // List of application logs to be deleted by event log cleaner.
+ private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
@@ -138,31 +142,33 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values
- override def getAppUI(appId: String): Option[SparkUI] = {
+ override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = {
try {
- applications.get(appId).map { info =>
- val replayBus = new ReplayListenerBus()
- val ui = {
- val conf = this.conf.clone()
- val appSecManager = new SecurityManager(conf)
- SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
- s"${HistoryServer.UI_PATH_PREFIX}/$appId")
- // Do not call ui.bind() to avoid creating a new server for each application
- }
+ applications.get(appId).flatMap { appInfo =>
+ appInfo.attempts.find(_.attemptId == attemptId).map { attempt =>
+ val replayBus = new ReplayListenerBus()
+ val ui = {
+ val conf = this.conf.clone()
+ val appSecManager = new SecurityManager(conf)
+ SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
+ HistoryServer.getAttemptURI(appId, attempt.attemptId))
+ // Do not call ui.bind() to avoid creating a new server for each application
+ }
- val appListener = new ApplicationEventListener()
- replayBus.addListener(appListener)
- val appInfo = replay(fs.getFileStatus(new Path(logDir, info.logPath)), replayBus)
+ val appListener = new ApplicationEventListener()
+ replayBus.addListener(appListener)
+ val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
- ui.setAppName(s"${appInfo.name} ($appId)")
+ ui.setAppName(s"${appInfo.name} ($appId)")
- val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
- ui.getSecurityManager.setAcls(uiAclsEnabled)
- // make sure to set admin acls before view acls so they are properly picked up
- ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
- ui.getSecurityManager.setViewAcls(appInfo.sparkUser,
- appListener.viewAcls.getOrElse(""))
- ui
+ val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
+ ui.getSecurityManager.setAcls(uiAclsEnabled)
+ // make sure to set admin acls before view acls so they are properly picked up
+ ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
+ ui.getSecurityManager.setViewAcls(attempt.sparkUser,
+ appListener.viewAcls.getOrElse(""))
+ ui
+ }
}
} catch {
case e: FileNotFoundException => None
@@ -220,7 +226,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = {
val bus = new ReplayListenerBus()
- val newApps = logs.flatMap { fileStatus =>
+ val newAttempts = logs.flatMap { fileStatus =>
try {
val res = replay(fileStatus, bus)
logInfo(s"Application log ${res.logPath} loaded successfully.")
@@ -232,76 +238,104 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
e)
None
}
- }.toSeq.sortWith(compareAppInfo)
-
- // When there are new logs, merge the new list with the existing one, maintaining
- // the expected ordering (descending end time). Maintaining the order is important
- // to avoid having to sort the list every time there is a request for the log list.
- if (newApps.nonEmpty) {
- val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
- def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
- if (!mergedApps.contains(info.id) ||
- mergedApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
- !info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
- mergedApps += (info.id -> info)
- }
- }
+ }
- val newIterator = newApps.iterator.buffered
- val oldIterator = applications.values.iterator.buffered
- while (newIterator.hasNext && oldIterator.hasNext) {
- if (compareAppInfo(newIterator.head, oldIterator.head)) {
- addIfAbsent(newIterator.next())
- } else {
- addIfAbsent(oldIterator.next())
+ if (newAttempts.isEmpty) {
+ return
+ }
+
+ // Build a map containing all apps that contain new attempts. The app information in this map
+ // contains both the new app attempt, and those that were already loaded in the existing apps
+ // map. If an attempt has been updated, it replaces the old attempt in the list.
+ val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
+ newAttempts.foreach { attempt =>
+ val appInfo = newAppMap.get(attempt.appId)
+ .orElse(applications.get(attempt.appId))
+ .map { app =>
+ val attempts =
+ app.attempts.filter(_.attemptId != attempt.attemptId).toList ++ List(attempt)
+ new FsApplicationHistoryInfo(attempt.appId, attempt.name,
+ attempts.sortWith(compareAttemptInfo))
}
+ .getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt)))
+ newAppMap(attempt.appId) = appInfo
+ }
+
+ // Merge the new app list with the existing one, maintaining the expected ordering (descending
+ // end time). Maintaining the order is important to avoid having to sort the list every time
+ // there is a request for the log list.
+ val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
+ val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
+ def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
+ if (!mergedApps.contains(info.id)) {
+ mergedApps += (info.id -> info)
}
- newIterator.foreach(addIfAbsent)
- oldIterator.foreach(addIfAbsent)
+ }
- applications = mergedApps
+ val newIterator = newApps.iterator.buffered
+ val oldIterator = applications.values.iterator.buffered
+ while (newIterator.hasNext && oldIterator.hasNext) {
+ if (newAppMap.contains(oldIterator.head.id)) {
+ oldIterator.next()
+ } else if (compareAppInfo(newIterator.head, oldIterator.head)) {
+ addIfAbsent(newIterator.next())
+ } else {
+ addIfAbsent(oldIterator.next())
+ }
}
+ newIterator.foreach(addIfAbsent)
+ oldIterator.foreach(addIfAbsent)
+
+ applications = mergedApps
}
/**
* Delete event logs from the log directory according to the clean policy defined by the user.
*/
- private def cleanLogs(): Unit = {
+ private[history] def cleanLogs(): Unit = {
try {
val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000
- val now = System.currentTimeMillis()
+ val now = clock.getTimeMillis()
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
+ def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = {
+ now - attempt.lastUpdated > maxAge && attempt.completed
+ }
+
// Scan all logs from the log directory.
// Only completed applications older than the specified max age will be deleted.
- applications.values.foreach { info =>
- if (now - info.lastUpdated <= maxAge || !info.completed) {
- appsToRetain += (info.id -> info)
- } else {
- appsToClean += info
+ applications.values.foreach { app =>
+ val (toClean, toRetain) = app.attempts.partition(shouldClean)
+ attemptsToClean ++= toClean
+
+ if (toClean.isEmpty) {
+ appsToRetain += (app.id -> app)
+ } else if (toRetain.nonEmpty) {
+ appsToRetain += (app.id ->
+ new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList))
}
}
applications = appsToRetain
- val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
- appsToClean.foreach { info =>
+ val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
+ attemptsToClean.foreach { attempt =>
try {
- val path = new Path(logDir, info.logPath)
+ val path = new Path(logDir, attempt.logPath)
if (fs.exists(path)) {
fs.delete(path, true)
}
} catch {
case e: AccessControlException =>
- logInfo(s"No permission to delete ${info.logPath}, ignoring.")
+ logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
case t: IOException =>
- logError(s"IOException in cleaning logs of ${info.logPath}", t)
- leftToClean += info
+ logError(s"IOException in cleaning ${attempt.logPath}", t)
+ leftToClean += attempt
}
}
- appsToClean = leftToClean
+ attemptsToClean = leftToClean
} catch {
case t: Exception => logError("Exception in cleaning logs", t)
}
@@ -315,14 +349,36 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def compareAppInfo(
i1: FsApplicationHistoryInfo,
i2: FsApplicationHistoryInfo): Boolean = {
- if (i1.endTime != i2.endTime) i1.endTime >= i2.endTime else i1.startTime >= i2.startTime
+ val a1 = i1.attempts.head
+ val a2 = i2.attempts.head
+ if (a1.endTime != a2.endTime) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
+ }
+
+ /**
+ * Comparison function that defines the sort order for application attempts within the same
+ * application. Order is: running attempts before complete attempts, running attempts sorted
+ * by start time, completed attempts sorted by end time.
+ *
+ * Normally applications should have a single running attempt; but failure to call sc.stop()
+ * may cause multiple running attempts to show up.
+ *
+ * @return Whether `a1` should precede `a2`.
+ */
+ private def compareAttemptInfo(
+ a1: FsApplicationAttemptInfo,
+ a2: FsApplicationAttemptInfo): Boolean = {
+ if (a1.completed == a2.completed) {
+ if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
+ } else {
+ !a1.completed
+ }
}
/**
* Replays the events in the specified log file and returns information about the associated
* application.
*/
- private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
+ private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val logInput =
@@ -336,10 +392,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
- new FsApplicationHistoryInfo(
+ new FsApplicationAttemptInfo(
logPath.getName(),
- appListener.appId.getOrElse(logPath.getName()),
appListener.appName.getOrElse(NOT_STARTED),
+ appListener.appId.getOrElse(logPath.getName()),
+ appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
@@ -425,13 +482,21 @@ private object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
}
-private class FsApplicationHistoryInfo(
+private class FsApplicationAttemptInfo(
val logPath: String,
- id: String,
- name: String,
+ val name: String,
+ val appId: String,
+ attemptId: Option[String],
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = true)
- extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed)
+ extends ApplicationAttemptInfo(
+ attemptId, startTime, endTime, lastUpdated, sparkUser, completed)
+
+private class FsApplicationHistoryInfo(
+ id: String,
+ override val name: String,
+ override val attempts: List[FsApplicationAttemptInfo])
+ extends ApplicationHistoryInfo(id, name, attempts)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index 3781b4e8c1..0830cc1ba1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -34,18 +34,28 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
val requestedIncomplete =
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
- val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete)
- val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
- val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))
+ val allApps = parent.getApplicationList()
+ .filter(_.attempts.head.completed != requestedIncomplete)
+ val allAppsSize = allApps.size
+
+ val actualFirst = if (requestedFirst < allAppsSize) requestedFirst else 0
+ val appsToShow = allApps.slice(actualFirst, actualFirst + pageSize)
val actualPage = (actualFirst / pageSize) + 1
- val last = Math.min(actualFirst + pageSize, allApps.size) - 1
- val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0)
+ val last = Math.min(actualFirst + pageSize, allAppsSize) - 1
+ val pageCount = allAppsSize / pageSize + (if (allAppsSize % pageSize > 0) 1 else 0)
val secondPageFromLeft = 2
val secondPageFromRight = pageCount - 1
- val appTable = UIUtils.listingTable(appHeader, appRow, apps)
+ val hasMultipleAttempts = appsToShow.exists(_.attempts.size > 1)
+ val appTable =
+ if (hasMultipleAttempts) {
+ UIUtils.listingTable(appWithAttemptHeader, appWithAttemptRow, appsToShow)
+ } else {
+ UIUtils.listingTable(appHeader, appRow, appsToShow)
+ }
+
val providerConfig = parent.getProviderConfig()
val content =
<div class="row-fluid">
@@ -59,7 +69,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
// to the first and last page. If the current page +/- `plusOrMinus` is greater
// than the 2nd page from the first page or less than the 2nd page from the last
// page, `...` will be displayed.
- if (allApps.size > 0) {
+ if (allAppsSize > 0) {
val leftSideIndices =
rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _, requestedIncomplete)
val rightSideIndices =
@@ -67,7 +77,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
requestedIncomplete)
<h4>
- Showing {actualFirst + 1}-{last + 1} of {allApps.size}
+ Showing {actualFirst + 1}-{last + 1} of {allAppsSize}
{if (requestedIncomplete) "(Incomplete applications)"}
<span style="float: right">
{
@@ -125,30 +135,85 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
"Spark User",
"Last Updated")
- private def rangeIndices(range: Seq[Int], condition: Int => Boolean, showIncomplete: Boolean):
- Seq[Node] = {
+ private val appWithAttemptHeader = Seq(
+ "App ID",
+ "App Name",
+ "Attempt ID",
+ "Started",
+ "Completed",
+ "Duration",
+ "Spark User",
+ "Last Updated")
+
+ private def rangeIndices(
+ range: Seq[Int],
+ condition: Int => Boolean,
+ showIncomplete: Boolean): Seq[Node] = {
range.filter(condition).map(nextPage =>
<a href={makePageLink(nextPage, showIncomplete)}> {nextPage} </a>)
}
- private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
- val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
- val startTime = UIUtils.formatDate(info.startTime)
- val endTime = if (info.endTime > 0) UIUtils.formatDate(info.endTime) else "-"
+ private def attemptRow(
+ renderAttemptIdColumn: Boolean,
+ info: ApplicationHistoryInfo,
+ attempt: ApplicationAttemptInfo,
+ isFirst: Boolean): Seq[Node] = {
+ val uiAddress = HistoryServer.getAttemptURI(info.id, attempt.attemptId)
+ val startTime = UIUtils.formatDate(attempt.startTime)
+ val endTime = if (attempt.endTime > 0) UIUtils.formatDate(attempt.endTime) else "-"
val duration =
- if (info.endTime > 0) UIUtils.formatDuration(info.endTime - info.startTime) else "-"
- val lastUpdated = UIUtils.formatDate(info.lastUpdated)
+ if (attempt.endTime > 0) {
+ UIUtils.formatDuration(attempt.endTime - attempt.startTime)
+ } else {
+ "-"
+ }
+ val lastUpdated = UIUtils.formatDate(attempt.lastUpdated)
<tr>
- <td><a href={uiAddress}>{info.id}</a></td>
- <td>{info.name}</td>
- <td sorttable_customkey={info.startTime.toString}>{startTime}</td>
- <td sorttable_customkey={info.endTime.toString}>{endTime}</td>
- <td sorttable_customkey={(info.endTime - info.startTime).toString}>{duration}</td>
- <td>{info.sparkUser}</td>
- <td sorttable_customkey={info.lastUpdated.toString}>{lastUpdated}</td>
+ {
+ if (isFirst) {
+ if (info.attempts.size > 1 || renderAttemptIdColumn) {
+ <td rowspan={info.attempts.size.toString} style="background-color: #ffffff">
+ <a href={uiAddress}>{info.id}</a></td>
+ <td rowspan={info.attempts.size.toString} style="background-color: #ffffff">
+ {info.name}</td>
+ } else {
+ <td><a href={uiAddress}>{info.id}</a></td>
+ <td>{info.name}</td>
+ }
+ } else {
+ Nil
+ }
+ }
+ {
+ if (renderAttemptIdColumn) {
+ if (info.attempts.size > 1 && attempt.attemptId.isDefined) {
+ <td><a href={HistoryServer.getAttemptURI(info.id, attempt.attemptId)}>
+ {attempt.attemptId.get}</a></td>
+ } else {
+ <td>&nbsp;</td>
+ }
+ } else {
+ Nil
+ }
+ }
+ <td sorttable_customkey={attempt.startTime.toString}>{startTime}</td>
+ <td sorttable_customkey={attempt.endTime.toString}>{endTime}</td>
+ <td sorttable_customkey={(attempt.endTime - attempt.startTime).toString}>
+ {duration}</td>
+ <td>{attempt.sparkUser}</td>
+ <td sorttable_customkey={attempt.lastUpdated.toString}>{lastUpdated}</td>
</tr>
}
+ private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
+ attemptRow(false, info, info.attempts.head, true)
+ }
+
+ private def appWithAttemptRow(info: ApplicationHistoryInfo): Seq[Node] = {
+ attemptRow(true, info, info.attempts.head, true) ++
+ info.attempts.drop(1).flatMap(attemptRow(true, info, _, false))
+ }
+
private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = {
"/?" + Array(
"page=" + linkPage,
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 56bef57e55..754c8e9b66 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -52,7 +52,11 @@ class HistoryServer(
private val appLoader = new CacheLoader[String, SparkUI] {
override def load(key: String): SparkUI = {
- val ui = provider.getAppUI(key).getOrElse(throw new NoSuchElementException())
+ val parts = key.split("/")
+ require(parts.length == 1 || parts.length == 2, s"Invalid app key $key")
+ val ui = provider
+ .getAppUI(parts(0), if (parts.length > 1) Some(parts(1)) else None)
+ .getOrElse(throw new NoSuchElementException())
attachSparkUI(ui)
ui
}
@@ -69,6 +73,8 @@ class HistoryServer(
private val loaderServlet = new HttpServlet {
protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
+ // Parse the URI created by getAttemptURI(). It contains an app ID and an optional
+ // attempt ID (separated by a slash).
val parts = Option(req.getPathInfo()).getOrElse("").split("/")
if (parts.length < 2) {
res.sendError(HttpServletResponse.SC_BAD_REQUEST,
@@ -76,18 +82,23 @@ class HistoryServer(
return
}
- val appId = parts(1)
+ val appKey =
+ if (parts.length == 3) {
+ s"${parts(1)}/${parts(2)}"
+ } else {
+ parts(1)
+ }
// Note we don't use the UI retrieved from the cache; the cache loader above will register
// the app's UI, and all we need to do is redirect the user to the same URI that was
// requested, and the proper data should be served at that point.
try {
- appCache.get(appId)
+ appCache.get(appKey)
res.sendRedirect(res.encodeRedirectURL(req.getRequestURI()))
} catch {
case e: Exception => e.getCause() match {
case nsee: NoSuchElementException =>
- val msg = <div class="row-fluid">Application {appId} not found.</div>
+ val msg = <div class="row-fluid">Application {appKey} not found.</div>
res.setStatus(HttpServletResponse.SC_NOT_FOUND)
UIUtils.basicSparkPage(msg, "Not Found").foreach(
n => res.getWriter().write(n.toString))
@@ -213,4 +224,9 @@ object HistoryServer extends Logging {
}
}
+ private[history] def getAttemptURI(appId: String, attemptId: Option[String]): String = {
+ val attemptSuffix = attemptId.map { id => s"/$id" }.getOrElse("")
+ s"${HistoryServer.UI_PATH_PREFIX}/${appId}${attemptSuffix}"
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 1c21c17956..dc6077f3d1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -62,7 +62,7 @@ private[master] class Master(
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
-
+
private val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
@@ -86,7 +86,7 @@ private[master] class Master(
private val drivers = new HashSet[DriverInfo]
private val completedDrivers = new ArrayBuffer[DriverInfo]
// Drivers currently spooled for scheduling
- private val waitingDrivers = new ArrayBuffer[DriverInfo]
+ private val waitingDrivers = new ArrayBuffer[DriverInfo]
private var nextDriverNumber = 0
Utils.checkHost(host, "Expected hostname")
@@ -758,24 +758,24 @@ private[master] class Master(
app.desc.appUiUrl = notFoundBasePath
return false
}
-
+
val eventLogFilePrefix = EventLoggingListener.getLogPath(
- eventLogDir, app.id, app.desc.eventLogCodec)
+ eventLogDir, app.id, None, app.desc.eventLogCodec)
val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
- val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
+ val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
EventLoggingListener.IN_PROGRESS))
-
+
if (inProgressExists) {
// Event logging is enabled for this application, but the application is still in progress
logWarning(s"Application $appName is still in progress, it may be terminated abnormally.")
}
-
+
val (eventLogFile, status) = if (inProgressExists) {
(eventLogFilePrefix + EventLoggingListener.IN_PROGRESS, " (in progress)")
} else {
(eventLogFilePrefix, " (completed)")
}
-
+
val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
val replayBus = new ReplayListenerBus()
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
@@ -859,8 +859,8 @@ private[master] class Master(
}
private def removeDriver(
- driverId: String,
- finalState: DriverState,
+ driverId: String,
+ finalState: DriverState,
exception: Option[Exception]) {
drivers.find(d => d.id == driverId) match {
case Some(driver) =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
index 6d39a5e3fa..9f218c64ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
@@ -26,6 +26,7 @@ package org.apache.spark.scheduler
private[spark] class ApplicationEventListener extends SparkListener {
var appName: Option[String] = None
var appId: Option[String] = None
+ var appAttemptId: Option[String] = None
var sparkUser: Option[String] = None
var startTime: Option[Long] = None
var endTime: Option[Long] = None
@@ -35,6 +36,7 @@ private[spark] class ApplicationEventListener extends SparkListener {
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
appName = Some(applicationStart.appName)
appId = applicationStart.appId
+ appAttemptId = applicationStart.appAttemptId
startTime = Some(applicationStart.time)
sparkUser = Some(applicationStart.sparkUser)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 08e7727db2..529a5b2bf1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -47,6 +47,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
*/
private[spark] class EventLoggingListener(
appId: String,
+ appAttemptId : Option[String],
logBaseDir: URI,
sparkConf: SparkConf,
hadoopConf: Configuration)
@@ -54,8 +55,9 @@ private[spark] class EventLoggingListener(
import EventLoggingListener._
- def this(appId: String, logBaseDir: URI, sparkConf: SparkConf) =
- this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
+ def this(appId: String, appAttemptId : Option[String], logBaseDir: URI, sparkConf: SparkConf) =
+ this(appId, appAttemptId, logBaseDir, sparkConf,
+ SparkHadoopUtil.get.newConfiguration(sparkConf))
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
@@ -89,7 +91,7 @@ private[spark] class EventLoggingListener(
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
// Visible for tests only.
- private[scheduler] val logPath = getLogPath(logBaseDir, appId, compressionCodecName)
+ private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)
/**
* Creates the log file in the configured log directory.
@@ -252,8 +254,12 @@ private[spark] object EventLoggingListener extends Logging {
* we won't know which codec to use to decompress the metadata needed to open the file in
* the first place.
*
+ * The log file name will identify the compression codec used for the contents, if any.
+ * For example, app_123 for an uncompressed log, app_123.lzf for an LZF-compressed log.
+ *
* @param logBaseDir Directory where the log file will be written.
* @param appId A unique app ID.
+ * @param appAttemptId A unique attempt id of appId. May be the empty string.
* @param compressionCodecName Name to identify the codec used to compress the contents
* of the log, or None if compression is not enabled.
* @return A path which consists of file-system-safe characters.
@@ -261,11 +267,19 @@ private[spark] object EventLoggingListener extends Logging {
def getLogPath(
logBaseDir: URI,
appId: String,
+ appAttemptId: Option[String],
compressionCodecName: Option[String] = None): String = {
- val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
- // e.g. app_123, app_123.lzf
- val logName = sanitizedAppId + compressionCodecName.map { "." + _ }.getOrElse("")
- logBaseDir.toString.stripSuffix("/") + "/" + logName
+ val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId)
+ val codec = compressionCodecName.map("." + _).getOrElse("")
+ if (appAttemptId.isDefined) {
+ base + "_" + sanitize(appAttemptId.get) + codec
+ } else {
+ base + codec
+ }
+ }
+
+ private def sanitize(str: String): String = {
+ str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 992c477493..646820520e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -41,4 +41,12 @@ private[spark] trait SchedulerBackend {
*/
def applicationId(): String = appId
+ /**
+ * Get the attempt ID for this run, if the cluster manager supports multiple
+ * attempts. Applications run in client mode will not have attempt IDs.
+ *
+ * @return The application attempt id, if available.
+ */
+ def applicationAttemptId(): Option[String] = None
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index b711ff209a..169d4fd3a9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -110,8 +110,8 @@ case class SparkListenerExecutorMetricsUpdate(
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long,
- sparkUser: String) extends SparkListenerEvent
+case class SparkListenerApplicationStart(appName: String, appId: Option[String],
+ time: Long, sparkUser: String, appAttemptId: Option[String]) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index ed3418676e..f25f3ed0d9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -73,9 +73,17 @@ private[spark] trait TaskScheduler {
* @return An application ID
*/
def applicationId(): String = appId
-
+
/**
* Process a lost executor
*/
def executorLost(executorId: String, reason: ExecutorLossReason): Unit
+
+ /**
+ * Get an application's attempt ID associated with the job.
+ *
+ * @return An application's Attempt ID
+ */
+ def applicationAttemptId(): Option[String]
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 13a52d836f..b4b8a63069 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -513,6 +513,8 @@ private[spark] class TaskSchedulerImpl(
override def applicationId(): String = backend.applicationId()
+ override def applicationAttemptId(): Option[String] = backend.applicationAttemptId()
+
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 44d274956d..ee02fbd9ce 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -194,7 +194,8 @@ private[spark] object JsonProtocol {
("App Name" -> applicationStart.appName) ~
("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
("Timestamp" -> applicationStart.time) ~
- ("User" -> applicationStart.sparkUser)
+ ("User" -> applicationStart.sparkUser) ~
+ ("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing))
}
def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
@@ -562,7 +563,8 @@ private[spark] object JsonProtocol {
val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String])
val time = (json \ "Timestamp").extract[Long]
val sparkUser = (json \ "User").extract[String]
- SparkListenerApplicationStart(appName, appId, time, sparkUser)
+ val appAttemptId = Utils.jsonOption(json \ "App Attempt ID").map(_.extract[String])
+ SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId)
}
def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 9e367a0d9a..a0a0afa488 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.history
import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
import java.net.URI
+import java.util.concurrent.TimeUnit
import scala.io.Source
@@ -30,7 +31,7 @@ import org.scalatest.Matchers
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.io._
import org.apache.spark.scheduler._
-import org.apache.spark.util.{JsonProtocol, Utils}
+import org.apache.spark.util.{JsonProtocol, ManualClock, Utils}
class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
@@ -47,10 +48,11 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
/** Create a fake log file using the new log format used in Spark 1.3+ */
private def newLogFile(
appId: String,
+ appAttemptId: Option[String],
inProgress: Boolean,
codec: Option[String] = None): File = {
val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
- val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId)
+ val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId)
val logPath = new URI(logUri).getPath + ip
new File(logPath)
}
@@ -59,22 +61,23 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
val provider = new FsHistoryProvider(createTestConf())
// Write a new-style application log.
- val newAppComplete = newLogFile("new1", inProgress = false)
+ val newAppComplete = newLogFile("new1", None, inProgress = false)
writeFile(newAppComplete, true, None,
- SparkListenerApplicationStart("new-app-complete", None, 1L, "test"),
+ SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
SparkListenerApplicationEnd(5L)
)
// Write a new-style application log.
- val newAppCompressedComplete = newLogFile("new1compressed", inProgress = false, Some("lzf"))
+ val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
+ Some("lzf"))
writeFile(newAppCompressedComplete, true, None,
- SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test"),
+ SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
SparkListenerApplicationEnd(4L))
// Write an unfinished app, new-style.
- val newAppIncomplete = newLogFile("new2", inProgress = true)
+ val newAppIncomplete = newLogFile("new2", None, inProgress = true)
writeFile(newAppIncomplete, true, None,
- SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test")
+ SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
)
// Write an old-style application log.
@@ -82,7 +85,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
oldAppComplete.mkdir()
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("old-app-complete", None, 2L, "test"),
+ SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
@@ -96,33 +99,45 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
oldAppIncomplete.mkdir()
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test")
+ SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
)
// Force a reload of data from the log directory, and check that both logs are loaded.
// Take the opportunity to check that the offset checks work as expected.
- provider.checkForLogs()
+ updateAndCheck(provider) { list =>
+ list.size should be (5)
+ list.count(_.attempts.head.completed) should be (3)
+
+ def makeAppInfo(
+ id: String,
+ name: String,
+ start: Long,
+ end: Long,
+ lastMod: Long,
+ user: String,
+ completed: Boolean): ApplicationHistoryInfo = {
+ ApplicationHistoryInfo(id, name,
+ List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
+ }
- val list = provider.getListing().toSeq
- list should not be (null)
- list.size should be (5)
- list.count(_.completed) should be (3)
-
- list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
- newAppComplete.lastModified(), "test", true))
- list(1) should be (ApplicationHistoryInfo(newAppCompressedComplete.getName(),
- "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
- list(2) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
- oldAppComplete.lastModified(), "test", true))
- list(3) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L,
- -1L, oldAppIncomplete.lastModified(), "test", false))
- list(4) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L,
- -1L, newAppIncomplete.lastModified(), "test", false))
-
- // Make sure the UI can be rendered.
- list.foreach { case info =>
- val appUi = provider.getAppUI(info.id)
- appUi should not be null
+ list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
+ newAppComplete.lastModified(), "test", true))
+ list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
+ "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
+ true))
+ list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
+ oldAppComplete.lastModified(), "test", true))
+ list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
+ oldAppIncomplete.lastModified(), "test", false))
+ list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
+ newAppIncomplete.lastModified(), "test", false))
+
+ // Make sure the UI can be rendered.
+ list.foreach { case info =>
+ val appUi = provider.getAppUI(info.id, None)
+ appUi should not be null
+ appUi should not be None
+ }
}
}
@@ -138,7 +153,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
logDir.mkdir()
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
- SparkListenerApplicationStart("app2", None, 2L, "test"),
+ SparkListenerApplicationStart("app2", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
@@ -159,52 +174,52 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("SPARK-3697: ignore directories that cannot be read.") {
- val logFile1 = newLogFile("new1", inProgress = false)
+ val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
- SparkListenerApplicationStart("app1-1", None, 1L, "test"),
+ SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
- val logFile2 = newLogFile("new2", inProgress = false)
+ val logFile2 = newLogFile("new2", None, inProgress = false)
writeFile(logFile2, true, None,
- SparkListenerApplicationStart("app1-2", None, 1L, "test"),
+ SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
logFile2.setReadable(false, false)
val provider = new FsHistoryProvider(createTestConf())
- provider.checkForLogs()
-
- val list = provider.getListing().toSeq
- list should not be (null)
- list.size should be (1)
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ }
}
test("history file is renamed from inprogress to completed") {
val provider = new FsHistoryProvider(createTestConf())
- val logFile1 = newLogFile("app1", inProgress = true)
+ val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
- SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
- provider.checkForLogs()
- val appListBeforeRename = provider.getListing()
- appListBeforeRename.size should be (1)
- appListBeforeRename.head.logPath should endWith(EventLoggingListener.IN_PROGRESS)
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should
+ endWith(EventLoggingListener.IN_PROGRESS)
+ }
- logFile1.renameTo(newLogFile("app1", inProgress = false))
- provider.checkForLogs()
- val appListAfterRename = provider.getListing()
- appListAfterRename.size should be (1)
- appListAfterRename.head.logPath should not endWith(EventLoggingListener.IN_PROGRESS)
+ logFile1.renameTo(newLogFile("app1", None, inProgress = false))
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should not
+ endWith(EventLoggingListener.IN_PROGRESS)
+ }
}
test("SPARK-5582: empty log directory") {
val provider = new FsHistoryProvider(createTestConf())
- val logFile1 = newLogFile("app1", inProgress = true)
+ val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
- SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
SparkListenerApplicationEnd(2L))
val oldLog = new File(testDir, "old1")
@@ -215,6 +230,126 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
appListAfterRename.size should be (1)
}
+ test("apps with multiple attempts") {
+ val provider = new FsHistoryProvider(createTestConf())
+
+ val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false)
+ writeFile(attempt1, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
+ SparkListenerApplicationEnd(2L)
+ )
+
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.size should be (1)
+ }
+
+ val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
+ writeFile(attempt2, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2"))
+ )
+
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.size should be (2)
+ list.head.attempts.head.attemptId should be (Some("attempt2"))
+ }
+
+ val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false)
+ attempt2.delete()
+ writeFile(attempt2, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
+ SparkListenerApplicationEnd(4L)
+ )
+
+ updateAndCheck(provider) { list =>
+ list should not be (null)
+ list.size should be (1)
+ list.head.attempts.size should be (2)
+ list.head.attempts.head.attemptId should be (Some("attempt2"))
+ }
+
+ val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
+ writeFile(attempt2, true, None,
+ SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
+ SparkListenerApplicationEnd(6L)
+ )
+
+ updateAndCheck(provider) { list =>
+ list.size should be (2)
+ list.head.attempts.size should be (1)
+ list.last.attempts.size should be (2)
+ list.head.attempts.head.attemptId should be (Some("attempt1"))
+
+ list.foreach { case app =>
+ app.attempts.foreach { attempt =>
+ val appUi = provider.getAppUI(app.id, attempt.attemptId)
+ appUi should not be null
+ }
+ }
+
+ }
+ }
+
+ test("log cleaner") {
+ val maxAge = TimeUnit.SECONDS.toMillis(10)
+ val clock = new ManualClock(maxAge / 2)
+ val provider = new FsHistoryProvider(
+ createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
+
+ val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
+ writeFile(log1, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
+ SparkListenerApplicationEnd(2L)
+ )
+ log1.setLastModified(0L)
+
+ val log2 = newLogFile("app1", Some("attempt2"), inProgress = false)
+ writeFile(log2, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
+ SparkListenerApplicationEnd(4L)
+ )
+ log2.setLastModified(clock.getTimeMillis())
+
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.size should be (2)
+ }
+
+ // Move the clock forward so log1 exceeds the max age.
+ clock.advance(maxAge)
+
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.size should be (1)
+ list.head.attempts.head.attemptId should be (Some("attempt2"))
+ }
+ assert(!log1.exists())
+
+ // Do the same for the other log.
+ clock.advance(maxAge)
+
+ updateAndCheck(provider) { list =>
+ list.size should be (0)
+ }
+ assert(!log2.exists())
+ }
+
+ /**
+ * Asks the provider to check for logs and calls a function to perform checks on the updated
+ * app list. Example:
+ *
+ * updateAndCheck(provider) { list =>
+ * // asserts
+ * }
+ */
+ private def updateAndCheck(provider: FsHistoryProvider)
+ (checkFn: Seq[ApplicationHistoryInfo] => Unit): Unit = {
+ provider.checkForLogs()
+ provider.cleanLogs()
+ checkFn(provider.getListing().toSeq)
+ }
+
private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
events: SparkListenerEvent*) = {
val fstream = new FileOutputStream(file)
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 20de46fdab..71ba9c1825 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -36,7 +36,8 @@ class HistoryServerSuite extends FunSuite with Matchers with MockitoSugar {
val request = mock[HttpServletRequest]
val ui = mock[SparkUI]
val link = "/history/app1"
- val info = new ApplicationHistoryInfo("app1", "app1", 0, 2, 1, "xxx", true)
+ val info = new ApplicationHistoryInfo("app1", "app1",
+ List(ApplicationAttemptInfo(None, 0, 2, 1, "xxx", true)))
when(historyServer.getApplicationList()).thenReturn(Seq(info))
when(ui.basePath).thenReturn(link)
when(historyServer.getProviderConfig()).thenReturn(Map[String, String]())
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 3c52a8c446..2482603f42 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -95,6 +95,7 @@ class DAGSchedulerSuite
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+ override def applicationAttemptId(): Option[String] = None
}
/** Length of time to wait while draining listener events. */
@@ -404,6 +405,7 @@ class DAGSchedulerSuite
taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean = true
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+ override def applicationAttemptId(): Option[String] = None
}
val noKillScheduler = new DAGScheduler(
sc,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 6d25edb7d2..b52a8d11d1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -61,7 +61,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
test("Verify log file exist") {
// Verify logging directory exists
val conf = getLoggingConf(testDirPath)
- val eventLogger = new EventLoggingListener("test", testDirPath.toUri(), conf)
+ val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
eventLogger.start()
val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
@@ -95,7 +95,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
}
test("Log overwriting") {
- val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test")
+ val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test", None)
val logPath = new URI(logUri).getPath
// Create file before writing the event log
new FileOutputStream(new File(logPath)).close()
@@ -108,18 +108,18 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
test("Event log name") {
// without compression
assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath(
- Utils.resolveURI("/base-dir"), "app1"))
+ Utils.resolveURI("/base-dir"), "app1", None))
// with compression
assert(s"file:/base-dir/app1.lzf" ===
- EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", Some("lzf")))
+ EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", None, Some("lzf")))
// illegal characters in app ID
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
- "a fine:mind$dollar{bills}.1"))
+ "a fine:mind$dollar{bills}.1", None))
// illegal characters in app ID with compression
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
- "a fine:mind$dollar{bills}.1", Some("lz4")))
+ "a fine:mind$dollar{bills}.1", None, Some("lz4")))
}
/* ----------------- *
@@ -140,10 +140,10 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
val conf = getLoggingConf(testDirPath, compressionCodec)
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
- val eventLogger = new EventLoggingListener(logName, testDirPath.toUri(), conf)
+ val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
- 125L, "Mickey")
+ 125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
@@ -186,7 +186,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
val eventLogPath = eventLogger.logPath
val expectedLogDir = testDir.toURI()
assert(eventLogPath === EventLoggingListener.getLogPath(
- expectedLogDir, sc.applicationId, compressionCodec.map(CompressionCodec.getShortName)))
+ expectedLogDir, sc.applicationId, None, compressionCodec.map(CompressionCodec.getShortName)))
// Begin listening for events that trigger asserts
val eventExistenceListener = new EventExistenceListener(eventLogger)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 6de6d2fec6..dabe4574b6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -50,7 +50,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
val fstream = fileSystem.create(logFilePath)
val writer = new PrintWriter(fstream)
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
- 125L, "Mickey")
+ 125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
@@ -146,7 +146,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
* log the events.
*/
private class EventMonster(conf: SparkConf)
- extends EventLoggingListener("test", new URI("testdir"), conf) {
+ extends EventLoggingListener("test", None, new URI("testdir"), conf) {
override def start() { }
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 2d039cb75a..0c9cf5bc68 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -74,7 +74,8 @@ class JsonProtocolSuite extends FunSuite {
val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
BlockManagerId("Scarce", "to be counted...", 100))
val unpersistRdd = SparkListenerUnpersistRDD(12345)
- val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
+ val applicationStart = SparkListenerApplicationStart("The winner of all", Some("appId"),
+ 42L, "Garfield", Some("appAttempt"))
val applicationEnd = SparkListenerApplicationEnd(42L)
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
@@ -274,9 +275,11 @@ class JsonProtocolSuite extends FunSuite {
test("SparkListenerApplicationStart backwards compatibility") {
// SparkListenerApplicationStart in Spark 1.0.0 do not have an "appId" property.
- val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user")
+ // SparkListenerApplicationStart pre-Spark 1.4 does not have "appAttemptId".
+ val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user", None)
val oldEvent = JsonProtocol.applicationStartToJson(applicationStart)
.removeField({ _._1 == "App ID" })
+ .removeField({ _._1 == "App Attempt ID" })
assert(applicationStart === JsonProtocol.applicationStartFromJson(oldEvent))
}
@@ -1497,8 +1500,10 @@ class JsonProtocolSuite extends FunSuite {
|{
| "Event": "SparkListenerApplicationStart",
| "App Name": "The winner of all",
+ | "App ID": "appId",
| "Timestamp": 42,
- | "User": "Garfield"
+ | "User": "Garfield",
+ | "App Attempt ID": "appAttempt"
|}
"""
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 70cb57ffd8..27f804782f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -89,6 +89,10 @@ private[spark] class ApplicationMaster(
// Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
+
+ // Propagate the attempt if, so that in case of event logging,
+ // different attempt's logs gets created in different directory
+ System.setProperty("spark.yarn.app.attemptId", appAttemptId.getAttemptId().toString())
}
logInfo("ApplicationAttemptId: " + appAttemptId)
@@ -208,10 +212,11 @@ private[spark] class ApplicationMaster(
val sc = sparkContextRef.get()
val appId = client.getAttemptId().getApplicationId().toString()
+ val attemptId = client.getAttemptId().getAttemptId().toString()
val historyAddress =
sparkConf.getOption("spark.yarn.historyServer.address")
.map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) }
- .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" }
+ .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" }
.getOrElse("")
allocator = client.register(yarnConf,
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index b1de81e6a8..aeb218a575 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -39,12 +39,18 @@ private[spark] class YarnClusterSchedulerBackend(
}
override def applicationId(): String =
- // In YARN Cluster mode, spark.yarn.app.id is expect to be set
- // before user application is launched.
- // So, if spark.yarn.app.id is not set, it is something wrong.
+ // In YARN Cluster mode, the application ID is expected to be set, so log an error if it's
+ // not found.
sc.getConf.getOption("spark.yarn.app.id").getOrElse {
logError("Application ID is not set.")
super.applicationId
}
+ override def applicationAttemptId(): Option[String] =
+ // In YARN Cluster mode, the attempt ID is expected to be set, so log an error if it's
+ // not found.
+ sc.getConf.getOption("spark.yarn.app.attemptId").orElse {
+ logError("Application attempt ID is not set.")
+ super.applicationAttemptId
+ }
}