aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-05-01 09:50:55 -0500
committerImran Rashid <irashid@cloudera.com>2015-05-01 09:50:55 -0500
commit3052f4916e7f2c7fbc4837f00f4463b7d0b34718 (patch)
tree60796c615223dd96109cfcee74e6978528539425 /core
parent7fe0f3f2b46c61a5cc4af9166781624409fda8a4 (diff)
downloadspark-3052f4916e7f2c7fbc4837f00f4463b7d0b34718.tar.gz
spark-3052f4916e7f2c7fbc4837f00f4463b7d0b34718.tar.bz2
spark-3052f4916e7f2c7fbc4837f00f4463b7d0b34718.zip
[SPARK-4705] Handle multiple app attempts event logs, history server.
This change modifies the event logging listener to write the logs for different application attempts to different files. The attempt ID is set by the scheduler backend, so as long as the backend returns that ID to SparkContext, things should work. Currently, the YARN backend does that. The history server was also modified to model multiple attempts per application. Each attempt has its own UI and a separate row in the listing table, so that users can look at all the attempts separately. The UI "adapts" itself to avoid showing attempt-specific info when all the applications being shown have a single attempt. Author: Marcelo Vanzin <vanzin@cloudera.com> Author: twinkle sachdeva <twinkle@kite.ggn.in.guavus.com> Author: twinkle.sachdeva <twinkle.sachdeva@guavus.com> Author: twinkle sachdeva <twinkle.sachdeva@guavus.com> Closes #5432 from vanzin/SPARK-4705 and squashes the following commits: 7e289fa [Marcelo Vanzin] Review feedback. f66dcc5 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705 bc885b7 [Marcelo Vanzin] Review feedback. 76a3651 [Marcelo Vanzin] Fix log cleaner, add test. 7c381ec [Marcelo Vanzin] Merge branch 'master' into SPARK-4705 1aa309d [Marcelo Vanzin] Improve sorting of app attempts. 2ad77e7 [Marcelo Vanzin] Missed a reference to the old property name. 9d59d92 [Marcelo Vanzin] Scalastyle... d5a9c37 [Marcelo Vanzin] Update JsonProtocol test, make property name consistent. ba34b69 [Marcelo Vanzin] Use Option[String] for attempt id. f1cb9b3 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705 c14ec19 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705 9092d39 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705 86de638 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705 07446c6 [Marcelo Vanzin] Disable striping for app id / name when multiple attempts exist. 9092af5 [Marcelo Vanzin] Fix HistoryServer test. 3a14503 [Marcelo Vanzin] Argh scalastyle. 657ec18 [Marcelo Vanzin] Fix yarn history URL, app links. c3e0a82 [Marcelo Vanzin] Move app name to app info, more UI fixes. ce5ee5d [Marcelo Vanzin] Misc UI, test, style fixes. cbe8bba [Marcelo Vanzin] Attempt ID in listener event should be an option. 88b1de8 [Marcelo Vanzin] Add a test for apps with multiple attempts. 3245aa2 [Marcelo Vanzin] Make app attempts part of the history server model. 5fd5c6f [Marcelo Vanzin] Fix my broken rebase. 318525a [twinkle.sachdeva] SPARK-4705: 1) moved from directory structure to single file, as per the master branch. 2) Added the attempt id inside the SparkListenerApplicationStart, to make the info available independent of directory structure. 3) Changes in History Server to render the UI as per the snaphot II 6b2e521 [twinkle sachdeva] SPARK-4705 Incorporating the review comments regarding formatting, will do the rest of the changes after this 4c1fc26 [twinkle sachdeva] SPARK-4705 Incorporating the review comments regarding formatting, will do the rest of the changes after this 0eb7722 [twinkle sachdeva] SPARK-4705: Doing cherry-pick of fix into master
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala211
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala111
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala243
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala11
19 files changed, 531 insertions, 197 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index fe24260cdb..3f7cba6dbc 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -217,6 +217,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private var _heartbeatReceiver: RpcEndpointRef = _
@volatile private var _dagScheduler: DAGScheduler = _
private var _applicationId: String = _
+ private var _applicationAttemptId: Option[String] = None
private var _eventLogger: Option[EventLoggingListener] = None
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
private var _cleaner: Option[ContextCleaner] = None
@@ -315,6 +316,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
def applicationId: String = _applicationId
+ def applicationAttemptId: Option[String] = _applicationAttemptId
def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null
@@ -472,6 +474,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_taskScheduler.start()
_applicationId = _taskScheduler.applicationId()
+ _applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
_env.blockManager.initialize(_applicationId)
@@ -484,7 +487,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_eventLogger =
if (isEventLogEnabled) {
val logger =
- new EventLoggingListener(_applicationId, _eventLogDir.get, _conf, _hadoopConfiguration)
+ new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
+ _conf, _hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
@@ -1868,7 +1872,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
- startTime, sparkUser))
+ startTime, sparkUser, applicationAttemptId))
}
/** Post the application end event */
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index ea6c85ee51..6a5011af17 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -19,15 +19,19 @@ package org.apache.spark.deploy.history
import org.apache.spark.ui.SparkUI
-private[history] case class ApplicationHistoryInfo(
- id: String,
- name: String,
+private[history] case class ApplicationAttemptInfo(
+ attemptId: Option[String],
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = false)
+private[history] case class ApplicationHistoryInfo(
+ id: String,
+ name: String,
+ attempts: List[ApplicationAttemptInfo])
+
private[history] abstract class ApplicationHistoryProvider {
/**
@@ -41,9 +45,10 @@ private[history] abstract class ApplicationHistoryProvider {
* Returns the Spark UI for a specific application.
*
* @param appId The application ID.
+ * @param attemptId The application attempt ID (or None if there is no attempt ID).
* @return The application's UI, or None if application is not found.
*/
- def getAppUI(appId: String): Option[SparkUI]
+ def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI]
/**
* Called when the server is shutting down.
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index fb2cbbcccc..993763f3aa 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -32,7 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
/**
@@ -40,8 +40,12 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
* This provider checks for new finished applications in the background periodically and
* renders the history application UI by parsing the associated event logs.
*/
-private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
- with Logging {
+private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
+ extends ApplicationHistoryProvider with Logging {
+
+ def this(conf: SparkConf) = {
+ this(conf, new SystemClock())
+ }
import FsHistoryProvider._
@@ -75,8 +79,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()
- // List of applications to be deleted by event log cleaner.
- private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
+ // List of application logs to be deleted by event log cleaner.
+ private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
@@ -138,31 +142,33 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values
- override def getAppUI(appId: String): Option[SparkUI] = {
+ override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = {
try {
- applications.get(appId).map { info =>
- val replayBus = new ReplayListenerBus()
- val ui = {
- val conf = this.conf.clone()
- val appSecManager = new SecurityManager(conf)
- SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
- s"${HistoryServer.UI_PATH_PREFIX}/$appId")
- // Do not call ui.bind() to avoid creating a new server for each application
- }
+ applications.get(appId).flatMap { appInfo =>
+ appInfo.attempts.find(_.attemptId == attemptId).map { attempt =>
+ val replayBus = new ReplayListenerBus()
+ val ui = {
+ val conf = this.conf.clone()
+ val appSecManager = new SecurityManager(conf)
+ SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
+ HistoryServer.getAttemptURI(appId, attempt.attemptId))
+ // Do not call ui.bind() to avoid creating a new server for each application
+ }
- val appListener = new ApplicationEventListener()
- replayBus.addListener(appListener)
- val appInfo = replay(fs.getFileStatus(new Path(logDir, info.logPath)), replayBus)
+ val appListener = new ApplicationEventListener()
+ replayBus.addListener(appListener)
+ val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
- ui.setAppName(s"${appInfo.name} ($appId)")
+ ui.setAppName(s"${appInfo.name} ($appId)")
- val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
- ui.getSecurityManager.setAcls(uiAclsEnabled)
- // make sure to set admin acls before view acls so they are properly picked up
- ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
- ui.getSecurityManager.setViewAcls(appInfo.sparkUser,
- appListener.viewAcls.getOrElse(""))
- ui
+ val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
+ ui.getSecurityManager.setAcls(uiAclsEnabled)
+ // make sure to set admin acls before view acls so they are properly picked up
+ ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
+ ui.getSecurityManager.setViewAcls(attempt.sparkUser,
+ appListener.viewAcls.getOrElse(""))
+ ui
+ }
}
} catch {
case e: FileNotFoundException => None
@@ -220,7 +226,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = {
val bus = new ReplayListenerBus()
- val newApps = logs.flatMap { fileStatus =>
+ val newAttempts = logs.flatMap { fileStatus =>
try {
val res = replay(fileStatus, bus)
logInfo(s"Application log ${res.logPath} loaded successfully.")
@@ -232,76 +238,104 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
e)
None
}
- }.toSeq.sortWith(compareAppInfo)
-
- // When there are new logs, merge the new list with the existing one, maintaining
- // the expected ordering (descending end time). Maintaining the order is important
- // to avoid having to sort the list every time there is a request for the log list.
- if (newApps.nonEmpty) {
- val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
- def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
- if (!mergedApps.contains(info.id) ||
- mergedApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
- !info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
- mergedApps += (info.id -> info)
- }
- }
+ }
- val newIterator = newApps.iterator.buffered
- val oldIterator = applications.values.iterator.buffered
- while (newIterator.hasNext && oldIterator.hasNext) {
- if (compareAppInfo(newIterator.head, oldIterator.head)) {
- addIfAbsent(newIterator.next())
- } else {
- addIfAbsent(oldIterator.next())
+ if (newAttempts.isEmpty) {
+ return
+ }
+
+ // Build a map containing all apps that contain new attempts. The app information in this map
+ // contains both the new app attempt, and those that were already loaded in the existing apps
+ // map. If an attempt has been updated, it replaces the old attempt in the list.
+ val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
+ newAttempts.foreach { attempt =>
+ val appInfo = newAppMap.get(attempt.appId)
+ .orElse(applications.get(attempt.appId))
+ .map { app =>
+ val attempts =
+ app.attempts.filter(_.attemptId != attempt.attemptId).toList ++ List(attempt)
+ new FsApplicationHistoryInfo(attempt.appId, attempt.name,
+ attempts.sortWith(compareAttemptInfo))
}
+ .getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt)))
+ newAppMap(attempt.appId) = appInfo
+ }
+
+ // Merge the new app list with the existing one, maintaining the expected ordering (descending
+ // end time). Maintaining the order is important to avoid having to sort the list every time
+ // there is a request for the log list.
+ val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
+ val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
+ def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
+ if (!mergedApps.contains(info.id)) {
+ mergedApps += (info.id -> info)
}
- newIterator.foreach(addIfAbsent)
- oldIterator.foreach(addIfAbsent)
+ }
- applications = mergedApps
+ val newIterator = newApps.iterator.buffered
+ val oldIterator = applications.values.iterator.buffered
+ while (newIterator.hasNext && oldIterator.hasNext) {
+ if (newAppMap.contains(oldIterator.head.id)) {
+ oldIterator.next()
+ } else if (compareAppInfo(newIterator.head, oldIterator.head)) {
+ addIfAbsent(newIterator.next())
+ } else {
+ addIfAbsent(oldIterator.next())
+ }
}
+ newIterator.foreach(addIfAbsent)
+ oldIterator.foreach(addIfAbsent)
+
+ applications = mergedApps
}
/**
* Delete event logs from the log directory according to the clean policy defined by the user.
*/
- private def cleanLogs(): Unit = {
+ private[history] def cleanLogs(): Unit = {
try {
val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000
- val now = System.currentTimeMillis()
+ val now = clock.getTimeMillis()
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
+ def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = {
+ now - attempt.lastUpdated > maxAge && attempt.completed
+ }
+
// Scan all logs from the log directory.
// Only completed applications older than the specified max age will be deleted.
- applications.values.foreach { info =>
- if (now - info.lastUpdated <= maxAge || !info.completed) {
- appsToRetain += (info.id -> info)
- } else {
- appsToClean += info
+ applications.values.foreach { app =>
+ val (toClean, toRetain) = app.attempts.partition(shouldClean)
+ attemptsToClean ++= toClean
+
+ if (toClean.isEmpty) {
+ appsToRetain += (app.id -> app)
+ } else if (toRetain.nonEmpty) {
+ appsToRetain += (app.id ->
+ new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList))
}
}
applications = appsToRetain
- val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
- appsToClean.foreach { info =>
+ val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
+ attemptsToClean.foreach { attempt =>
try {
- val path = new Path(logDir, info.logPath)
+ val path = new Path(logDir, attempt.logPath)
if (fs.exists(path)) {
fs.delete(path, true)
}
} catch {
case e: AccessControlException =>
- logInfo(s"No permission to delete ${info.logPath}, ignoring.")
+ logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
case t: IOException =>
- logError(s"IOException in cleaning logs of ${info.logPath}", t)
- leftToClean += info
+ logError(s"IOException in cleaning ${attempt.logPath}", t)
+ leftToClean += attempt
}
}
- appsToClean = leftToClean
+ attemptsToClean = leftToClean
} catch {
case t: Exception => logError("Exception in cleaning logs", t)
}
@@ -315,14 +349,36 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def compareAppInfo(
i1: FsApplicationHistoryInfo,
i2: FsApplicationHistoryInfo): Boolean = {
- if (i1.endTime != i2.endTime) i1.endTime >= i2.endTime else i1.startTime >= i2.startTime
+ val a1 = i1.attempts.head
+ val a2 = i2.attempts.head
+ if (a1.endTime != a2.endTime) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
+ }
+
+ /**
+ * Comparison function that defines the sort order for application attempts within the same
+ * application. Order is: running attempts before complete attempts, running attempts sorted
+ * by start time, completed attempts sorted by end time.
+ *
+ * Normally applications should have a single running attempt; but failure to call sc.stop()
+ * may cause multiple running attempts to show up.
+ *
+ * @return Whether `a1` should precede `a2`.
+ */
+ private def compareAttemptInfo(
+ a1: FsApplicationAttemptInfo,
+ a2: FsApplicationAttemptInfo): Boolean = {
+ if (a1.completed == a2.completed) {
+ if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
+ } else {
+ !a1.completed
+ }
}
/**
* Replays the events in the specified log file and returns information about the associated
* application.
*/
- private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
+ private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val logInput =
@@ -336,10 +392,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
- new FsApplicationHistoryInfo(
+ new FsApplicationAttemptInfo(
logPath.getName(),
- appListener.appId.getOrElse(logPath.getName()),
appListener.appName.getOrElse(NOT_STARTED),
+ appListener.appId.getOrElse(logPath.getName()),
+ appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
@@ -425,13 +482,21 @@ private object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
}
-private class FsApplicationHistoryInfo(
+private class FsApplicationAttemptInfo(
val logPath: String,
- id: String,
- name: String,
+ val name: String,
+ val appId: String,
+ attemptId: Option[String],
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = true)
- extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed)
+ extends ApplicationAttemptInfo(
+ attemptId, startTime, endTime, lastUpdated, sparkUser, completed)
+
+private class FsApplicationHistoryInfo(
+ id: String,
+ override val name: String,
+ override val attempts: List[FsApplicationAttemptInfo])
+ extends ApplicationHistoryInfo(id, name, attempts)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index 3781b4e8c1..0830cc1ba1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -34,18 +34,28 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
val requestedIncomplete =
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
- val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete)
- val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
- val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))
+ val allApps = parent.getApplicationList()
+ .filter(_.attempts.head.completed != requestedIncomplete)
+ val allAppsSize = allApps.size
+
+ val actualFirst = if (requestedFirst < allAppsSize) requestedFirst else 0
+ val appsToShow = allApps.slice(actualFirst, actualFirst + pageSize)
val actualPage = (actualFirst / pageSize) + 1
- val last = Math.min(actualFirst + pageSize, allApps.size) - 1
- val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0)
+ val last = Math.min(actualFirst + pageSize, allAppsSize) - 1
+ val pageCount = allAppsSize / pageSize + (if (allAppsSize % pageSize > 0) 1 else 0)
val secondPageFromLeft = 2
val secondPageFromRight = pageCount - 1
- val appTable = UIUtils.listingTable(appHeader, appRow, apps)
+ val hasMultipleAttempts = appsToShow.exists(_.attempts.size > 1)
+ val appTable =
+ if (hasMultipleAttempts) {
+ UIUtils.listingTable(appWithAttemptHeader, appWithAttemptRow, appsToShow)
+ } else {
+ UIUtils.listingTable(appHeader, appRow, appsToShow)
+ }
+
val providerConfig = parent.getProviderConfig()
val content =
<div class="row-fluid">
@@ -59,7 +69,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
// to the first and last page. If the current page +/- `plusOrMinus` is greater
// than the 2nd page from the first page or less than the 2nd page from the last
// page, `...` will be displayed.
- if (allApps.size > 0) {
+ if (allAppsSize > 0) {
val leftSideIndices =
rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _, requestedIncomplete)
val rightSideIndices =
@@ -67,7 +77,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
requestedIncomplete)
<h4>
- Showing {actualFirst + 1}-{last + 1} of {allApps.size}
+ Showing {actualFirst + 1}-{last + 1} of {allAppsSize}
{if (requestedIncomplete) "(Incomplete applications)"}
<span style="float: right">
{
@@ -125,30 +135,85 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
"Spark User",
"Last Updated")
- private def rangeIndices(range: Seq[Int], condition: Int => Boolean, showIncomplete: Boolean):
- Seq[Node] = {
+ private val appWithAttemptHeader = Seq(
+ "App ID",
+ "App Name",
+ "Attempt ID",
+ "Started",
+ "Completed",
+ "Duration",
+ "Spark User",
+ "Last Updated")
+
+ private def rangeIndices(
+ range: Seq[Int],
+ condition: Int => Boolean,
+ showIncomplete: Boolean): Seq[Node] = {
range.filter(condition).map(nextPage =>
<a href={makePageLink(nextPage, showIncomplete)}> {nextPage} </a>)
}
- private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
- val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
- val startTime = UIUtils.formatDate(info.startTime)
- val endTime = if (info.endTime > 0) UIUtils.formatDate(info.endTime) else "-"
+ private def attemptRow(
+ renderAttemptIdColumn: Boolean,
+ info: ApplicationHistoryInfo,
+ attempt: ApplicationAttemptInfo,
+ isFirst: Boolean): Seq[Node] = {
+ val uiAddress = HistoryServer.getAttemptURI(info.id, attempt.attemptId)
+ val startTime = UIUtils.formatDate(attempt.startTime)
+ val endTime = if (attempt.endTime > 0) UIUtils.formatDate(attempt.endTime) else "-"
val duration =
- if (info.endTime > 0) UIUtils.formatDuration(info.endTime - info.startTime) else "-"
- val lastUpdated = UIUtils.formatDate(info.lastUpdated)
+ if (attempt.endTime > 0) {
+ UIUtils.formatDuration(attempt.endTime - attempt.startTime)
+ } else {
+ "-"
+ }
+ val lastUpdated = UIUtils.formatDate(attempt.lastUpdated)
<tr>
- <td><a href={uiAddress}>{info.id}</a></td>
- <td>{info.name}</td>
- <td sorttable_customkey={info.startTime.toString}>{startTime}</td>
- <td sorttable_customkey={info.endTime.toString}>{endTime}</td>
- <td sorttable_customkey={(info.endTime - info.startTime).toString}>{duration}</td>
- <td>{info.sparkUser}</td>
- <td sorttable_customkey={info.lastUpdated.toString}>{lastUpdated}</td>
+ {
+ if (isFirst) {
+ if (info.attempts.size > 1 || renderAttemptIdColumn) {
+ <td rowspan={info.attempts.size.toString} style="background-color: #ffffff">
+ <a href={uiAddress}>{info.id}</a></td>
+ <td rowspan={info.attempts.size.toString} style="background-color: #ffffff">
+ {info.name}</td>
+ } else {
+ <td><a href={uiAddress}>{info.id}</a></td>
+ <td>{info.name}</td>
+ }
+ } else {
+ Nil
+ }
+ }
+ {
+ if (renderAttemptIdColumn) {
+ if (info.attempts.size > 1 && attempt.attemptId.isDefined) {
+ <td><a href={HistoryServer.getAttemptURI(info.id, attempt.attemptId)}>
+ {attempt.attemptId.get}</a></td>
+ } else {
+ <td>&nbsp;</td>
+ }
+ } else {
+ Nil
+ }
+ }
+ <td sorttable_customkey={attempt.startTime.toString}>{startTime}</td>
+ <td sorttable_customkey={attempt.endTime.toString}>{endTime}</td>
+ <td sorttable_customkey={(attempt.endTime - attempt.startTime).toString}>
+ {duration}</td>
+ <td>{attempt.sparkUser}</td>
+ <td sorttable_customkey={attempt.lastUpdated.toString}>{lastUpdated}</td>
</tr>
}
+ private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
+ attemptRow(false, info, info.attempts.head, true)
+ }
+
+ private def appWithAttemptRow(info: ApplicationHistoryInfo): Seq[Node] = {
+ attemptRow(true, info, info.attempts.head, true) ++
+ info.attempts.drop(1).flatMap(attemptRow(true, info, _, false))
+ }
+
private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = {
"/?" + Array(
"page=" + linkPage,
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 56bef57e55..754c8e9b66 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -52,7 +52,11 @@ class HistoryServer(
private val appLoader = new CacheLoader[String, SparkUI] {
override def load(key: String): SparkUI = {
- val ui = provider.getAppUI(key).getOrElse(throw new NoSuchElementException())
+ val parts = key.split("/")
+ require(parts.length == 1 || parts.length == 2, s"Invalid app key $key")
+ val ui = provider
+ .getAppUI(parts(0), if (parts.length > 1) Some(parts(1)) else None)
+ .getOrElse(throw new NoSuchElementException())
attachSparkUI(ui)
ui
}
@@ -69,6 +73,8 @@ class HistoryServer(
private val loaderServlet = new HttpServlet {
protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
+ // Parse the URI created by getAttemptURI(). It contains an app ID and an optional
+ // attempt ID (separated by a slash).
val parts = Option(req.getPathInfo()).getOrElse("").split("/")
if (parts.length < 2) {
res.sendError(HttpServletResponse.SC_BAD_REQUEST,
@@ -76,18 +82,23 @@ class HistoryServer(
return
}
- val appId = parts(1)
+ val appKey =
+ if (parts.length == 3) {
+ s"${parts(1)}/${parts(2)}"
+ } else {
+ parts(1)
+ }
// Note we don't use the UI retrieved from the cache; the cache loader above will register
// the app's UI, and all we need to do is redirect the user to the same URI that was
// requested, and the proper data should be served at that point.
try {
- appCache.get(appId)
+ appCache.get(appKey)
res.sendRedirect(res.encodeRedirectURL(req.getRequestURI()))
} catch {
case e: Exception => e.getCause() match {
case nsee: NoSuchElementException =>
- val msg = <div class="row-fluid">Application {appId} not found.</div>
+ val msg = <div class="row-fluid">Application {appKey} not found.</div>
res.setStatus(HttpServletResponse.SC_NOT_FOUND)
UIUtils.basicSparkPage(msg, "Not Found").foreach(
n => res.getWriter().write(n.toString))
@@ -213,4 +224,9 @@ object HistoryServer extends Logging {
}
}
+ private[history] def getAttemptURI(appId: String, attemptId: Option[String]): String = {
+ val attemptSuffix = attemptId.map { id => s"/$id" }.getOrElse("")
+ s"${HistoryServer.UI_PATH_PREFIX}/${appId}${attemptSuffix}"
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 1c21c17956..dc6077f3d1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -62,7 +62,7 @@ private[master] class Master(
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
-
+
private val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
@@ -86,7 +86,7 @@ private[master] class Master(
private val drivers = new HashSet[DriverInfo]
private val completedDrivers = new ArrayBuffer[DriverInfo]
// Drivers currently spooled for scheduling
- private val waitingDrivers = new ArrayBuffer[DriverInfo]
+ private val waitingDrivers = new ArrayBuffer[DriverInfo]
private var nextDriverNumber = 0
Utils.checkHost(host, "Expected hostname")
@@ -758,24 +758,24 @@ private[master] class Master(
app.desc.appUiUrl = notFoundBasePath
return false
}
-
+
val eventLogFilePrefix = EventLoggingListener.getLogPath(
- eventLogDir, app.id, app.desc.eventLogCodec)
+ eventLogDir, app.id, None, app.desc.eventLogCodec)
val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
- val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
+ val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
EventLoggingListener.IN_PROGRESS))
-
+
if (inProgressExists) {
// Event logging is enabled for this application, but the application is still in progress
logWarning(s"Application $appName is still in progress, it may be terminated abnormally.")
}
-
+
val (eventLogFile, status) = if (inProgressExists) {
(eventLogFilePrefix + EventLoggingListener.IN_PROGRESS, " (in progress)")
} else {
(eventLogFilePrefix, " (completed)")
}
-
+
val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
val replayBus = new ReplayListenerBus()
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
@@ -859,8 +859,8 @@ private[master] class Master(
}
private def removeDriver(
- driverId: String,
- finalState: DriverState,
+ driverId: String,
+ finalState: DriverState,
exception: Option[Exception]) {
drivers.find(d => d.id == driverId) match {
case Some(driver) =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
index 6d39a5e3fa..9f218c64ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
@@ -26,6 +26,7 @@ package org.apache.spark.scheduler
private[spark] class ApplicationEventListener extends SparkListener {
var appName: Option[String] = None
var appId: Option[String] = None
+ var appAttemptId: Option[String] = None
var sparkUser: Option[String] = None
var startTime: Option[Long] = None
var endTime: Option[Long] = None
@@ -35,6 +36,7 @@ private[spark] class ApplicationEventListener extends SparkListener {
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
appName = Some(applicationStart.appName)
appId = applicationStart.appId
+ appAttemptId = applicationStart.appAttemptId
startTime = Some(applicationStart.time)
sparkUser = Some(applicationStart.sparkUser)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 08e7727db2..529a5b2bf1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -47,6 +47,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
*/
private[spark] class EventLoggingListener(
appId: String,
+ appAttemptId : Option[String],
logBaseDir: URI,
sparkConf: SparkConf,
hadoopConf: Configuration)
@@ -54,8 +55,9 @@ private[spark] class EventLoggingListener(
import EventLoggingListener._
- def this(appId: String, logBaseDir: URI, sparkConf: SparkConf) =
- this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
+ def this(appId: String, appAttemptId : Option[String], logBaseDir: URI, sparkConf: SparkConf) =
+ this(appId, appAttemptId, logBaseDir, sparkConf,
+ SparkHadoopUtil.get.newConfiguration(sparkConf))
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
@@ -89,7 +91,7 @@ private[spark] class EventLoggingListener(
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
// Visible for tests only.
- private[scheduler] val logPath = getLogPath(logBaseDir, appId, compressionCodecName)
+ private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)
/**
* Creates the log file in the configured log directory.
@@ -252,8 +254,12 @@ private[spark] object EventLoggingListener extends Logging {
* we won't know which codec to use to decompress the metadata needed to open the file in
* the first place.
*
+ * The log file name will identify the compression codec used for the contents, if any.
+ * For example, app_123 for an uncompressed log, app_123.lzf for an LZF-compressed log.
+ *
* @param logBaseDir Directory where the log file will be written.
* @param appId A unique app ID.
+ * @param appAttemptId A unique attempt id of appId. May be the empty string.
* @param compressionCodecName Name to identify the codec used to compress the contents
* of the log, or None if compression is not enabled.
* @return A path which consists of file-system-safe characters.
@@ -261,11 +267,19 @@ private[spark] object EventLoggingListener extends Logging {
def getLogPath(
logBaseDir: URI,
appId: String,
+ appAttemptId: Option[String],
compressionCodecName: Option[String] = None): String = {
- val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
- // e.g. app_123, app_123.lzf
- val logName = sanitizedAppId + compressionCodecName.map { "." + _ }.getOrElse("")
- logBaseDir.toString.stripSuffix("/") + "/" + logName
+ val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId)
+ val codec = compressionCodecName.map("." + _).getOrElse("")
+ if (appAttemptId.isDefined) {
+ base + "_" + sanitize(appAttemptId.get) + codec
+ } else {
+ base + codec
+ }
+ }
+
+ private def sanitize(str: String): String = {
+ str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 992c477493..646820520e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -41,4 +41,12 @@ private[spark] trait SchedulerBackend {
*/
def applicationId(): String = appId
+ /**
+ * Get the attempt ID for this run, if the cluster manager supports multiple
+ * attempts. Applications run in client mode will not have attempt IDs.
+ *
+ * @return The application attempt id, if available.
+ */
+ def applicationAttemptId(): Option[String] = None
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index b711ff209a..169d4fd3a9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -110,8 +110,8 @@ case class SparkListenerExecutorMetricsUpdate(
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long,
- sparkUser: String) extends SparkListenerEvent
+case class SparkListenerApplicationStart(appName: String, appId: Option[String],
+ time: Long, sparkUser: String, appAttemptId: Option[String]) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index ed3418676e..f25f3ed0d9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -73,9 +73,17 @@ private[spark] trait TaskScheduler {
* @return An application ID
*/
def applicationId(): String = appId
-
+
/**
* Process a lost executor
*/
def executorLost(executorId: String, reason: ExecutorLossReason): Unit
+
+ /**
+ * Get an application's attempt ID associated with the job.
+ *
+ * @return An application's Attempt ID
+ */
+ def applicationAttemptId(): Option[String]
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 13a52d836f..b4b8a63069 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -513,6 +513,8 @@ private[spark] class TaskSchedulerImpl(
override def applicationId(): String = backend.applicationId()
+ override def applicationAttemptId(): Option[String] = backend.applicationAttemptId()
+
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 44d274956d..ee02fbd9ce 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -194,7 +194,8 @@ private[spark] object JsonProtocol {
("App Name" -> applicationStart.appName) ~
("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
("Timestamp" -> applicationStart.time) ~
- ("User" -> applicationStart.sparkUser)
+ ("User" -> applicationStart.sparkUser) ~
+ ("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing))
}
def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
@@ -562,7 +563,8 @@ private[spark] object JsonProtocol {
val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String])
val time = (json \ "Timestamp").extract[Long]
val sparkUser = (json \ "User").extract[String]
- SparkListenerApplicationStart(appName, appId, time, sparkUser)
+ val appAttemptId = Utils.jsonOption(json \ "App Attempt ID").map(_.extract[String])
+ SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId)
}
def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 9e367a0d9a..a0a0afa488 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.history
import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
import java.net.URI
+import java.util.concurrent.TimeUnit
import scala.io.Source
@@ -30,7 +31,7 @@ import org.scalatest.Matchers
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.io._
import org.apache.spark.scheduler._
-import org.apache.spark.util.{JsonProtocol, Utils}
+import org.apache.spark.util.{JsonProtocol, ManualClock, Utils}
class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
@@ -47,10 +48,11 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
/** Create a fake log file using the new log format used in Spark 1.3+ */
private def newLogFile(
appId: String,
+ appAttemptId: Option[String],
inProgress: Boolean,
codec: Option[String] = None): File = {
val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
- val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId)
+ val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId)
val logPath = new URI(logUri).getPath + ip
new File(logPath)
}
@@ -59,22 +61,23 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
val provider = new FsHistoryProvider(createTestConf())
// Write a new-style application log.
- val newAppComplete = newLogFile("new1", inProgress = false)
+ val newAppComplete = newLogFile("new1", None, inProgress = false)
writeFile(newAppComplete, true, None,
- SparkListenerApplicationStart("new-app-complete", None, 1L, "test"),
+ SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
SparkListenerApplicationEnd(5L)
)
// Write a new-style application log.
- val newAppCompressedComplete = newLogFile("new1compressed", inProgress = false, Some("lzf"))
+ val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
+ Some("lzf"))
writeFile(newAppCompressedComplete, true, None,
- SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test"),
+ SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
SparkListenerApplicationEnd(4L))
// Write an unfinished app, new-style.
- val newAppIncomplete = newLogFile("new2", inProgress = true)
+ val newAppIncomplete = newLogFile("new2", None, inProgress = true)
writeFile(newAppIncomplete, true, None,
- SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test")
+ SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
)
// Write an old-style application log.
@@ -82,7 +85,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
oldAppComplete.mkdir()
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("old-app-complete", None, 2L, "test"),
+ SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
@@ -96,33 +99,45 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
oldAppIncomplete.mkdir()
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test")
+ SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
)
// Force a reload of data from the log directory, and check that both logs are loaded.
// Take the opportunity to check that the offset checks work as expected.
- provider.checkForLogs()
+ updateAndCheck(provider) { list =>
+ list.size should be (5)
+ list.count(_.attempts.head.completed) should be (3)
+
+ def makeAppInfo(
+ id: String,
+ name: String,
+ start: Long,
+ end: Long,
+ lastMod: Long,
+ user: String,
+ completed: Boolean): ApplicationHistoryInfo = {
+ ApplicationHistoryInfo(id, name,
+ List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
+ }
- val list = provider.getListing().toSeq
- list should not be (null)
- list.size should be (5)
- list.count(_.completed) should be (3)
-
- list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
- newAppComplete.lastModified(), "test", true))
- list(1) should be (ApplicationHistoryInfo(newAppCompressedComplete.getName(),
- "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
- list(2) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
- oldAppComplete.lastModified(), "test", true))
- list(3) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L,
- -1L, oldAppIncomplete.lastModified(), "test", false))
- list(4) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L,
- -1L, newAppIncomplete.lastModified(), "test", false))
-
- // Make sure the UI can be rendered.
- list.foreach { case info =>
- val appUi = provider.getAppUI(info.id)
- appUi should not be null
+ list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
+ newAppComplete.lastModified(), "test", true))
+ list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
+ "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
+ true))
+ list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
+ oldAppComplete.lastModified(), "test", true))
+ list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
+ oldAppIncomplete.lastModified(), "test", false))
+ list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
+ newAppIncomplete.lastModified(), "test", false))
+
+ // Make sure the UI can be rendered.
+ list.foreach { case info =>
+ val appUi = provider.getAppUI(info.id, None)
+ appUi should not be null
+ appUi should not be None
+ }
}
}
@@ -138,7 +153,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
logDir.mkdir()
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
- SparkListenerApplicationStart("app2", None, 2L, "test"),
+ SparkListenerApplicationStart("app2", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
@@ -159,52 +174,52 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("SPARK-3697: ignore directories that cannot be read.") {
- val logFile1 = newLogFile("new1", inProgress = false)
+ val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
- SparkListenerApplicationStart("app1-1", None, 1L, "test"),
+ SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
- val logFile2 = newLogFile("new2", inProgress = false)
+ val logFile2 = newLogFile("new2", None, inProgress = false)
writeFile(logFile2, true, None,
- SparkListenerApplicationStart("app1-2", None, 1L, "test"),
+ SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
logFile2.setReadable(false, false)
val provider = new FsHistoryProvider(createTestConf())
- provider.checkForLogs()
-
- val list = provider.getListing().toSeq
- list should not be (null)
- list.size should be (1)
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ }
}
test("history file is renamed from inprogress to completed") {
val provider = new FsHistoryProvider(createTestConf())
- val logFile1 = newLogFile("app1", inProgress = true)
+ val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
- SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
- provider.checkForLogs()
- val appListBeforeRename = provider.getListing()
- appListBeforeRename.size should be (1)
- appListBeforeRename.head.logPath should endWith(EventLoggingListener.IN_PROGRESS)
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should
+ endWith(EventLoggingListener.IN_PROGRESS)
+ }
- logFile1.renameTo(newLogFile("app1", inProgress = false))
- provider.checkForLogs()
- val appListAfterRename = provider.getListing()
- appListAfterRename.size should be (1)
- appListAfterRename.head.logPath should not endWith(EventLoggingListener.IN_PROGRESS)
+ logFile1.renameTo(newLogFile("app1", None, inProgress = false))
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should not
+ endWith(EventLoggingListener.IN_PROGRESS)
+ }
}
test("SPARK-5582: empty log directory") {
val provider = new FsHistoryProvider(createTestConf())
- val logFile1 = newLogFile("app1", inProgress = true)
+ val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
- SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
SparkListenerApplicationEnd(2L))
val oldLog = new File(testDir, "old1")
@@ -215,6 +230,126 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
appListAfterRename.size should be (1)
}
+ test("apps with multiple attempts") {
+ val provider = new FsHistoryProvider(createTestConf())
+
+ val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false)
+ writeFile(attempt1, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
+ SparkListenerApplicationEnd(2L)
+ )
+
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.size should be (1)
+ }
+
+ val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
+ writeFile(attempt2, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2"))
+ )
+
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.size should be (2)
+ list.head.attempts.head.attemptId should be (Some("attempt2"))
+ }
+
+ val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false)
+ attempt2.delete()
+ writeFile(attempt2, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
+ SparkListenerApplicationEnd(4L)
+ )
+
+ updateAndCheck(provider) { list =>
+ list should not be (null)
+ list.size should be (1)
+ list.head.attempts.size should be (2)
+ list.head.attempts.head.attemptId should be (Some("attempt2"))
+ }
+
+ val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
+ writeFile(attempt2, true, None,
+ SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
+ SparkListenerApplicationEnd(6L)
+ )
+
+ updateAndCheck(provider) { list =>
+ list.size should be (2)
+ list.head.attempts.size should be (1)
+ list.last.attempts.size should be (2)
+ list.head.attempts.head.attemptId should be (Some("attempt1"))
+
+ list.foreach { case app =>
+ app.attempts.foreach { attempt =>
+ val appUi = provider.getAppUI(app.id, attempt.attemptId)
+ appUi should not be null
+ }
+ }
+
+ }
+ }
+
+ test("log cleaner") {
+ val maxAge = TimeUnit.SECONDS.toMillis(10)
+ val clock = new ManualClock(maxAge / 2)
+ val provider = new FsHistoryProvider(
+ createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
+
+ val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
+ writeFile(log1, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
+ SparkListenerApplicationEnd(2L)
+ )
+ log1.setLastModified(0L)
+
+ val log2 = newLogFile("app1", Some("attempt2"), inProgress = false)
+ writeFile(log2, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
+ SparkListenerApplicationEnd(4L)
+ )
+ log2.setLastModified(clock.getTimeMillis())
+
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.size should be (2)
+ }
+
+ // Move the clock forward so log1 exceeds the max age.
+ clock.advance(maxAge)
+
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.size should be (1)
+ list.head.attempts.head.attemptId should be (Some("attempt2"))
+ }
+ assert(!log1.exists())
+
+ // Do the same for the other log.
+ clock.advance(maxAge)
+
+ updateAndCheck(provider) { list =>
+ list.size should be (0)
+ }
+ assert(!log2.exists())
+ }
+
+ /**
+ * Asks the provider to check for logs and calls a function to perform checks on the updated
+ * app list. Example:
+ *
+ * updateAndCheck(provider) { list =>
+ * // asserts
+ * }
+ */
+ private def updateAndCheck(provider: FsHistoryProvider)
+ (checkFn: Seq[ApplicationHistoryInfo] => Unit): Unit = {
+ provider.checkForLogs()
+ provider.cleanLogs()
+ checkFn(provider.getListing().toSeq)
+ }
+
private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
events: SparkListenerEvent*) = {
val fstream = new FileOutputStream(file)
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 20de46fdab..71ba9c1825 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -36,7 +36,8 @@ class HistoryServerSuite extends FunSuite with Matchers with MockitoSugar {
val request = mock[HttpServletRequest]
val ui = mock[SparkUI]
val link = "/history/app1"
- val info = new ApplicationHistoryInfo("app1", "app1", 0, 2, 1, "xxx", true)
+ val info = new ApplicationHistoryInfo("app1", "app1",
+ List(ApplicationAttemptInfo(None, 0, 2, 1, "xxx", true)))
when(historyServer.getApplicationList()).thenReturn(Seq(info))
when(ui.basePath).thenReturn(link)
when(historyServer.getProviderConfig()).thenReturn(Map[String, String]())
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 3c52a8c446..2482603f42 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -95,6 +95,7 @@ class DAGSchedulerSuite
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+ override def applicationAttemptId(): Option[String] = None
}
/** Length of time to wait while draining listener events. */
@@ -404,6 +405,7 @@ class DAGSchedulerSuite
taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean = true
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+ override def applicationAttemptId(): Option[String] = None
}
val noKillScheduler = new DAGScheduler(
sc,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 6d25edb7d2..b52a8d11d1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -61,7 +61,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
test("Verify log file exist") {
// Verify logging directory exists
val conf = getLoggingConf(testDirPath)
- val eventLogger = new EventLoggingListener("test", testDirPath.toUri(), conf)
+ val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
eventLogger.start()
val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
@@ -95,7 +95,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
}
test("Log overwriting") {
- val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test")
+ val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test", None)
val logPath = new URI(logUri).getPath
// Create file before writing the event log
new FileOutputStream(new File(logPath)).close()
@@ -108,18 +108,18 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
test("Event log name") {
// without compression
assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath(
- Utils.resolveURI("/base-dir"), "app1"))
+ Utils.resolveURI("/base-dir"), "app1", None))
// with compression
assert(s"file:/base-dir/app1.lzf" ===
- EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", Some("lzf")))
+ EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", None, Some("lzf")))
// illegal characters in app ID
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
- "a fine:mind$dollar{bills}.1"))
+ "a fine:mind$dollar{bills}.1", None))
// illegal characters in app ID with compression
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
- "a fine:mind$dollar{bills}.1", Some("lz4")))
+ "a fine:mind$dollar{bills}.1", None, Some("lz4")))
}
/* ----------------- *
@@ -140,10 +140,10 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
val conf = getLoggingConf(testDirPath, compressionCodec)
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
- val eventLogger = new EventLoggingListener(logName, testDirPath.toUri(), conf)
+ val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
- 125L, "Mickey")
+ 125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
@@ -186,7 +186,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
val eventLogPath = eventLogger.logPath
val expectedLogDir = testDir.toURI()
assert(eventLogPath === EventLoggingListener.getLogPath(
- expectedLogDir, sc.applicationId, compressionCodec.map(CompressionCodec.getShortName)))
+ expectedLogDir, sc.applicationId, None, compressionCodec.map(CompressionCodec.getShortName)))
// Begin listening for events that trigger asserts
val eventExistenceListener = new EventExistenceListener(eventLogger)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 6de6d2fec6..dabe4574b6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -50,7 +50,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
val fstream = fileSystem.create(logFilePath)
val writer = new PrintWriter(fstream)
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
- 125L, "Mickey")
+ 125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
@@ -146,7 +146,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
* log the events.
*/
private class EventMonster(conf: SparkConf)
- extends EventLoggingListener("test", new URI("testdir"), conf) {
+ extends EventLoggingListener("test", None, new URI("testdir"), conf) {
override def start() { }
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 2d039cb75a..0c9cf5bc68 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -74,7 +74,8 @@ class JsonProtocolSuite extends FunSuite {
val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
BlockManagerId("Scarce", "to be counted...", 100))
val unpersistRdd = SparkListenerUnpersistRDD(12345)
- val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
+ val applicationStart = SparkListenerApplicationStart("The winner of all", Some("appId"),
+ 42L, "Garfield", Some("appAttempt"))
val applicationEnd = SparkListenerApplicationEnd(42L)
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
@@ -274,9 +275,11 @@ class JsonProtocolSuite extends FunSuite {
test("SparkListenerApplicationStart backwards compatibility") {
// SparkListenerApplicationStart in Spark 1.0.0 do not have an "appId" property.
- val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user")
+ // SparkListenerApplicationStart pre-Spark 1.4 does not have "appAttemptId".
+ val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user", None)
val oldEvent = JsonProtocol.applicationStartToJson(applicationStart)
.removeField({ _._1 == "App ID" })
+ .removeField({ _._1 == "App Attempt ID" })
assert(applicationStart === JsonProtocol.applicationStartFromJson(oldEvent))
}
@@ -1497,8 +1500,10 @@ class JsonProtocolSuite extends FunSuite {
|{
| "Event": "SparkListenerApplicationStart",
| "App Name": "The winner of all",
+ | "App ID": "appId",
| "Timestamp": 42,
- | "User": "Garfield"
+ | "User": "Garfield",
+ | "App Attempt ID": "appAttempt"
|}
"""