aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2014-09-03 14:57:38 -0700
committerAndrew Or <andrewor14@gmail.com>2014-09-03 14:57:38 -0700
commitf2b5b619a9efee91573c0e546792e68e72afce21 (patch)
tree9bc103807239cde4a425d031518a9d0cc6087be4
parentccc69e26ec2fadd90886990b90a5a600efd08aba (diff)
downloadspark-f2b5b619a9efee91573c0e546792e68e72afce21.tar.gz
spark-f2b5b619a9efee91573c0e546792e68e72afce21.tar.bz2
spark-f2b5b619a9efee91573c0e546792e68e72afce21.zip
[SPARK-3388] Expose aplication ID in ApplicationStart event, use it in history server.
This change exposes the application ID generated by the Spark Master, Mesos or Yarn via the SparkListenerApplicationStart event. It then uses that information to expose the application via its ID in the history server, instead of using the internal directory name generated by the event logger as an application id. This allows someone who knows the application ID to easily figure out the URL for the application's entry in the HS, aside from looking better. In Yarn mode, this is used to generate a direct link from the RM application list to the Spark history server entry (thus providing a fix for SPARK-2150). Note this sort of assumes that the different managers will generate app ids that are sufficiently different from each other that clashes will not occur. Author: Marcelo Vanzin <vanzin@cloudera.com> This patch had conflicts when merged, resolved by Committer: Andrew Or <andrewor14@gmail.com> Closes #1218 from vanzin/yarn-hs-link-2 and squashes the following commits: 2d19f3c [Marcelo Vanzin] Review feedback. 6706d3a [Marcelo Vanzin] Implement applicationId() in base classes. 56fe42e [Marcelo Vanzin] Fix cluster mode history address, plus a cleanup. 44112a8 [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2 8278316 [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2 a86bbcf [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2 a0056e6 [Marcelo Vanzin] Unbreak test. 4b10cfd [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2 cb0cab2 [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2 25f2826 [Marcelo Vanzin] Add MIMA excludes. f0ba90f [Marcelo Vanzin] Use BufferedIterator. c90a08d [Marcelo Vanzin] Remove unused code. 3f8ec66 [Marcelo Vanzin] Review feedback. 21aa71b [Marcelo Vanzin] Fix JSON test. b022bae [Marcelo Vanzin] Undo SparkContext cleanup. c6d7478 [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2 4e3483f [Marcelo Vanzin] Fix test. 57517b8 [Marcelo Vanzin] Review feedback. Mostly, more consistent use of Scala's Option. 311e49d [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2 d35d86f [Marcelo Vanzin] Fix yarn backend after rebase. 36dc362 [Marcelo Vanzin] Don't use Iterator::takeWhile(). 0afd696 [Marcelo Vanzin] Wait until master responds before returning from start(). abc4697 [Marcelo Vanzin] Make FsHistoryProvider keep a map of applications by id. 26b266e [Marcelo Vanzin] Use Mesos framework ID as Spark application ID. b3f3664 [Marcelo Vanzin] [yarn] Make the RM link point to the app direcly in the HS. 2fb7de4 [Marcelo Vanzin] Expose the application ID in the ApplicationStart event. ed10348 [Marcelo Vanzin] Expose application id to spark context.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala176
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIUtils.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala9
-rw-r--r--project/MimaExcludes.scala6
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala24
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala14
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala4
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala5
25 files changed, 228 insertions, 133 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cb4fb7cfbd..529febff94 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1261,7 +1261,10 @@ class SparkContext(config: SparkConf) extends Logging {
/** Post the application start event */
private def postApplicationStart() {
- listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser))
+ // Note: this code assumes that the task scheduler has been initialized and has contacted
+ // the cluster manager to get an application ID (in case the cluster manager provides one).
+ listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(),
+ startTime, sparkUser))
}
/** Post the application end event */
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index a0e8bd403a..fbe39b2764 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -34,15 +34,15 @@ private[spark] abstract class ApplicationHistoryProvider {
*
* @return List of all know applications.
*/
- def getListing(): Seq[ApplicationHistoryInfo]
+ def getListing(): Iterable[ApplicationHistoryInfo]
/**
* Returns the Spark UI for a specific application.
*
* @param appId The application ID.
- * @return The application's UI, or null if application is not found.
+ * @return The application's UI, or None if application is not found.
*/
- def getAppUI(appId: String): SparkUI
+ def getAppUI(appId: String): Option[SparkUI]
/**
* Called when the server is shutting down.
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 05c8a90782..481f6c93c6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -32,6 +32,8 @@ import org.apache.spark.util.Utils
private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
with Logging {
+ private val NOT_STARTED = "<Not Started>"
+
// Interval between each check for event log updates
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
conf.getInt("spark.history.updateInterval", 10)) * 1000
@@ -47,8 +49,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L
- // List of applications, in order from newest to oldest.
- @volatile private var appList: Seq[ApplicationHistoryInfo] = Nil
+ // The modification time of the newest log detected during the last scan. This is used
+ // to ignore logs that are older during subsequent scans, to avoid processing data that
+ // is already known.
+ private var lastModifiedTime = -1L
+
+ // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
+ // into the map in order, so the LinkedHashMap maintains the correct ordering.
+ @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
+ = new mutable.LinkedHashMap()
/**
* A background thread that periodically checks for event log updates on disk.
@@ -93,15 +102,35 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
logCheckingThread.start()
}
- override def getListing() = appList
+ override def getListing() = applications.values
- override def getAppUI(appId: String): SparkUI = {
+ override def getAppUI(appId: String): Option[SparkUI] = {
try {
- val appLogDir = fs.getFileStatus(new Path(resolvedLogDir.toString, appId))
- val (_, ui) = loadAppInfo(appLogDir, renderUI = true)
- ui
+ applications.get(appId).map { info =>
+ val (replayBus, appListener) = createReplayBus(fs.getFileStatus(
+ new Path(logDir, info.logDir)))
+ val ui = {
+ val conf = this.conf.clone()
+ val appSecManager = new SecurityManager(conf)
+ new SparkUI(conf, appSecManager, replayBus, appId,
+ s"${HistoryServer.UI_PATH_PREFIX}/$appId")
+ // Do not call ui.bind() to avoid creating a new server for each application
+ }
+
+ replayBus.replay()
+
+ ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED)} ($appId)")
+
+ val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
+ ui.getSecurityManager.setAcls(uiAclsEnabled)
+ // make sure to set admin acls before view acls so they are properly picked up
+ ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
+ ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED),
+ appListener.viewAcls.getOrElse(""))
+ ui
+ }
} catch {
- case e: FileNotFoundException => null
+ case e: FileNotFoundException => None
}
}
@@ -119,84 +148,79 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
try {
val logStatus = fs.listStatus(new Path(resolvedLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
- val logInfos = logDirs.filter { dir =>
- fs.isFile(new Path(dir.getPath, EventLoggingListener.APPLICATION_COMPLETE))
- }
- val currentApps = Map[String, ApplicationHistoryInfo](
- appList.map(app => app.id -> app):_*)
-
- // For any application that either (i) is not listed or (ii) has changed since the last time
- // the listing was created (defined by the log dir's modification time), load the app's info.
- // Otherwise just reuse what's already in memory.
- val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size)
- for (dir <- logInfos) {
- val curr = currentApps.getOrElse(dir.getPath().getName(), null)
- if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
+ // Load all new logs from the log directory. Only directories that have a modification time
+ // later than the last known log directory will be loaded.
+ var newLastModifiedTime = lastModifiedTime
+ val logInfos = logDirs
+ .filter { dir =>
+ if (fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) {
+ val modTime = getModificationTime(dir)
+ newLastModifiedTime = math.max(newLastModifiedTime, modTime)
+ modTime > lastModifiedTime
+ } else {
+ false
+ }
+ }
+ .flatMap { dir =>
try {
- val (app, _) = loadAppInfo(dir, renderUI = false)
- newApps += app
+ val (replayBus, appListener) = createReplayBus(dir)
+ replayBus.replay()
+ Some(new FsApplicationHistoryInfo(
+ dir.getPath().getName(),
+ appListener.appId.getOrElse(dir.getPath().getName()),
+ appListener.appName.getOrElse(NOT_STARTED),
+ appListener.startTime.getOrElse(-1L),
+ appListener.endTime.getOrElse(-1L),
+ getModificationTime(dir),
+ appListener.sparkUser.getOrElse(NOT_STARTED)))
} catch {
- case e: Exception => logError(s"Failed to load app info from directory $dir.")
+ case e: Exception =>
+ logInfo(s"Failed to load application log data from $dir.", e)
+ None
+ }
+ }
+ .sortBy { info => -info.endTime }
+
+ lastModifiedTime = newLastModifiedTime
+
+ // When there are new logs, merge the new list with the existing one, maintaining
+ // the expected ordering (descending end time). Maintaining the order is important
+ // to avoid having to sort the list every time there is a request for the log list.
+ if (!logInfos.isEmpty) {
+ val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
+ def addIfAbsent(info: FsApplicationHistoryInfo) = {
+ if (!newApps.contains(info.id)) {
+ newApps += (info.id -> info)
}
- } else {
- newApps += curr
}
- }
- appList = newApps.sortBy { info => -info.endTime }
+ val newIterator = logInfos.iterator.buffered
+ val oldIterator = applications.values.iterator.buffered
+ while (newIterator.hasNext && oldIterator.hasNext) {
+ if (newIterator.head.endTime > oldIterator.head.endTime) {
+ addIfAbsent(newIterator.next)
+ } else {
+ addIfAbsent(oldIterator.next)
+ }
+ }
+ newIterator.foreach(addIfAbsent)
+ oldIterator.foreach(addIfAbsent)
+
+ applications = newApps
+ }
} catch {
case t: Throwable => logError("Exception in checking for event log updates", t)
}
}
- /**
- * Parse the application's logs to find out the information we need to build the
- * listing page.
- *
- * When creating the listing of available apps, there is no need to load the whole UI for the
- * application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user
- * clicks on a specific application.
- *
- * @param logDir Directory with application's log files.
- * @param renderUI Whether to create the SparkUI for the application.
- * @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
- */
- private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
- val path = logDir.getPath
- val appId = path.getName
+ private def createReplayBus(logDir: FileStatus): (ReplayListenerBus, ApplicationEventListener) = {
+ val path = logDir.getPath()
val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)
-
- val ui: SparkUI = if (renderUI) {
- val conf = this.conf.clone()
- val appSecManager = new SecurityManager(conf)
- new SparkUI(conf, appSecManager, replayBus, appId,
- HistoryServer.UI_PATH_PREFIX + s"/$appId")
- // Do not call ui.bind() to avoid creating a new server for each application
- } else {
- null
- }
-
- replayBus.replay()
- val appInfo = ApplicationHistoryInfo(
- appId,
- appListener.appName,
- appListener.startTime,
- appListener.endTime,
- getModificationTime(logDir),
- appListener.sparkUser)
-
- if (ui != null) {
- val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
- ui.getSecurityManager.setAcls(uiAclsEnabled)
- // make sure to set admin acls before view acls so properly picked up
- ui.getSecurityManager.setAdminAcls(appListener.adminAcls)
- ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
- }
- (appInfo, ui)
+ (replayBus, appListener)
}
/** Return when this directory was last modified. */
@@ -219,3 +243,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000)
}
+
+private class FsApplicationHistoryInfo(
+ val logDir: String,
+ id: String,
+ name: String,
+ startTime: Long,
+ endTime: Long,
+ lastUpdated: Long,
+ sparkUser: String)
+ extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index d1a64c1912..ce00c0ffd2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -52,10 +52,7 @@ class HistoryServer(
private val appLoader = new CacheLoader[String, SparkUI] {
override def load(key: String): SparkUI = {
- val ui = provider.getAppUI(key)
- if (ui == null) {
- throw new NoSuchElementException()
- }
+ val ui = provider.getAppUI(key).getOrElse(throw new NoSuchElementException())
attachSparkUI(ui)
ui
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
index 162158babc..6d39a5e3fa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
@@ -24,38 +24,31 @@ package org.apache.spark.scheduler
* from multiple applications are seen, the behavior is unspecified.
*/
private[spark] class ApplicationEventListener extends SparkListener {
- var appName = "<Not Started>"
- var sparkUser = "<Not Started>"
- var startTime = -1L
- var endTime = -1L
- var viewAcls = ""
- var adminAcls = ""
-
- def applicationStarted = startTime != -1
-
- def applicationCompleted = endTime != -1
-
- def applicationDuration: Long = {
- val difference = endTime - startTime
- if (applicationStarted && applicationCompleted && difference > 0) difference else -1L
- }
+ var appName: Option[String] = None
+ var appId: Option[String] = None
+ var sparkUser: Option[String] = None
+ var startTime: Option[Long] = None
+ var endTime: Option[Long] = None
+ var viewAcls: Option[String] = None
+ var adminAcls: Option[String] = None
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
- appName = applicationStart.appName
- startTime = applicationStart.time
- sparkUser = applicationStart.sparkUser
+ appName = Some(applicationStart.appName)
+ appId = applicationStart.appId
+ startTime = Some(applicationStart.time)
+ sparkUser = Some(applicationStart.sparkUser)
}
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
- endTime = applicationEnd.time
+ endTime = Some(applicationEnd.time)
}
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
synchronized {
val environmentDetails = environmentUpdate.environmentDetails
val allProperties = environmentDetails("Spark Properties").toMap
- viewAcls = allProperties.getOrElse("spark.ui.view.acls", "")
- adminAcls = allProperties.getOrElse("spark.admin.acls", "")
+ viewAcls = allProperties.get("spark.ui.view.acls")
+ adminAcls = allProperties.get("spark.admin.acls")
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index e41e0a9841..a0be8307ef 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -31,4 +31,12 @@ private[spark] trait SchedulerBackend {
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException
def isReady(): Boolean = true
+
+ /**
+ * The application ID associated with the job, if any.
+ *
+ * @return The application ID, or None if the backend does not provide an ID.
+ */
+ def applicationId(): Option[String] = None
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index f33c2e065a..86afe3bd52 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -89,8 +89,8 @@ case class SparkListenerExecutorMetricsUpdate(
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String)
- extends SparkListenerEvent
+case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long,
+ sparkUser: String) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 1a0b877c8a..1c1ce666ea 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -64,4 +64,12 @@ private[spark] trait TaskScheduler {
*/
def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean
+
+ /**
+ * The application ID associated with the job, if any.
+ *
+ * @return The application ID, or None if the backend does not provide an ID.
+ */
+ def applicationId(): Option[String] = None
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index ad051e59af..633e892554 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -491,6 +491,9 @@ private[spark] class TaskSchedulerImpl(
}
}
}
+
+ override def applicationId(): Option[String] = backend.applicationId()
+
}
@@ -535,4 +538,5 @@ private[spark] object TaskSchedulerImpl {
retval.toList
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 2a3711ae2a..5b5257269d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -51,12 +51,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
- // Submit tasks only after (registered resources / total expected resources)
+ // Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio =
math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
// Submit tasks after maxRegisteredWaitingTime milliseconds
- // if minRegisteredRatio has not yet been reached
+ // if minRegisteredRatio has not yet been reached
val maxRegisteredWaitingTime =
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
val createTime = System.currentTimeMillis()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index bc7670f4a8..513d74a08a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -69,4 +69,5 @@ private[spark] class SimrSchedulerBackend(
fs.delete(new Path(driverFilePath), false)
super.stop()
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 32138e5246..06872ace2e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -34,6 +34,10 @@ private[spark] class SparkDeploySchedulerBackend(
var client: AppClient = null
var stopping = false
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
+ var appId: String = _
+
+ val registrationLock = new Object()
+ var registrationDone = false
val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
val totalExpectedCores = maxCores.getOrElse(0)
@@ -68,6 +72,8 @@ private[spark] class SparkDeploySchedulerBackend(
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
+
+ waitForRegistration()
}
override def stop() {
@@ -81,15 +87,19 @@ private[spark] class SparkDeploySchedulerBackend(
override def connected(appId: String) {
logInfo("Connected to Spark cluster with app ID " + appId)
+ this.appId = appId
+ notifyContext()
}
override def disconnected() {
+ notifyContext()
if (!stopping) {
logWarning("Disconnected from Spark cluster! Waiting for reconnection...")
}
}
override def dead(reason: String) {
+ notifyContext()
if (!stopping) {
logError("Application has been killed. Reason: " + reason)
scheduler.error(reason)
@@ -116,4 +126,22 @@ private[spark] class SparkDeploySchedulerBackend(
override def sufficientResourcesRegistered(): Boolean = {
totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
}
+
+ override def applicationId(): Option[String] = Option(appId)
+
+ private def waitForRegistration() = {
+ registrationLock.synchronized {
+ while (!registrationDone) {
+ registrationLock.wait()
+ }
+ }
+ }
+
+ private def notifyContext() = {
+ registrationLock.synchronized {
+ registrationDone = true
+ registrationLock.notifyAll()
+ }
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 87e181e773..da43ef5676 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -309,4 +309,5 @@ private[spark] class CoarseMesosSchedulerBackend(
logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
slaveLost(d, s)
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 67ee4d66f1..a9ef126f5d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -349,4 +349,5 @@ private[spark] class MesosSchedulerBackend(
// TODO: query Mesos for number of cores
override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index bec9502f20..9ea25c2bc7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -114,4 +114,5 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
localActor ! StatusUpdate(taskId, state, serializedData)
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index bee6dad338..f0006b42ae 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -232,7 +232,7 @@ private[spark] object UIUtils extends Logging {
def listingTable[T](
headers: Seq[String],
generateDataRow: T => Seq[Node],
- data: Seq[T],
+ data: Iterable[T],
fixedWidth: Boolean = false): Seq[Node] = {
var listingTableClass = TABLE_CLASS
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 1fc536b096..b0754e3ce1 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -171,6 +171,7 @@ private[spark] object JsonProtocol {
def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = {
("Event" -> Utils.getFormattedClassName(applicationStart)) ~
("App Name" -> applicationStart.appName) ~
+ ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
("Timestamp" -> applicationStart.time) ~
("User" -> applicationStart.sparkUser)
}
@@ -484,9 +485,10 @@ private[spark] object JsonProtocol {
def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = {
val appName = (json \ "App Name").extract[String]
+ val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String])
val time = (json \ "Timestamp").extract[Long]
val sparkUser = (json \ "User").extract[String]
- SparkListenerApplicationStart(appName, time, sparkUser)
+ SparkListenerApplicationStart(appName, appId, time, sparkUser)
}
def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 41e58a008c..fead883793 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -229,7 +229,8 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
val conf = getLoggingConf(logDirPath, compressionCodec)
val eventLogger = new EventLoggingListener("test", conf)
val listenerBus = new LiveListenerBus
- val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey")
+ val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
+ 125L, "Mickey")
val applicationEnd = SparkListenerApplicationEnd(1000L)
// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 8f0ee9f4db..7ab351d1b4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -83,7 +83,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
val fstream = fileSystem.create(logFilePath)
val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
val writer = new PrintWriter(cstream)
- val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey")
+ val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
+ 125L, "Mickey")
val applicationEnd = SparkListenerApplicationEnd(1000L)
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index c84bafce37..2b45d8b695 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -60,7 +60,7 @@ class JsonProtocolSuite extends FunSuite {
val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
BlockManagerId("Scarce", "to be counted...", 100))
val unpersistRdd = SparkListenerUnpersistRDD(12345)
- val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield")
+ val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
val applicationEnd = SparkListenerApplicationEnd(42L)
testEvent(stageSubmitted, stageSubmittedJsonString)
@@ -176,6 +176,13 @@ class JsonProtocolSuite extends FunSuite {
deserializedBmRemoved)
}
+ test("SparkListenerApplicationStart backwards compatibility") {
+ // SparkListenerApplicationStart in Spark 1.0.0 do not have an "appId" property.
+ val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user")
+ val oldEvent = JsonProtocol.applicationStartToJson(applicationStart)
+ .removeField({ _._1 == "App ID" })
+ assert(applicationStart === JsonProtocol.applicationStartFromJson(oldEvent))
+ }
/** -------------------------- *
| Helper test running methods |
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a2f1b3582a..855d5cc8cf 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -111,6 +111,8 @@ object MimaExcludes {
MimaBuild.excludeSparkClass("storage.Values") ++
MimaBuild.excludeSparkClass("storage.Entry") ++
MimaBuild.excludeSparkClass("storage.MemoryStore$Entry") ++
+ // Class was missing "@DeveloperApi" annotation in 1.0.
+ MimaBuild.excludeSparkClass("scheduler.SparkListenerApplicationStart") ++
Seq(
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"org.apache.spark.mllib.tree.impurity.Gini.calculate"),
@@ -119,14 +121,14 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"org.apache.spark.mllib.tree.impurity.Variance.calculate")
) ++
- Seq ( // Package-private classes removed in SPARK-2341
+ Seq( // Package-private classes removed in SPARK-2341
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.BinaryLabelParser"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.BinaryLabelParser$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser$")
- ) ++
+ ) ++
Seq( // package-private classes removed in MLlib
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.org$apache$spark$mllib$regression$GeneralizedLinearAlgorithm$$prependOne")
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 8c54840971..98039a20de 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
@@ -70,6 +71,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
private val sparkContextRef = new AtomicReference[SparkContext](null)
final def run(): Int = {
+ val appAttemptId = client.getAttemptId()
+
if (isDriver) {
// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
@@ -77,9 +80,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// Set the master property to match the requested mode.
System.setProperty("spark.master", "yarn-cluster")
+
+ // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
+ System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
}
- logInfo("ApplicationAttemptId: " + client.getAttemptId())
+ logInfo("ApplicationAttemptId: " + appAttemptId)
val cleanupHook = new Runnable {
override def run() {
@@ -151,13 +157,20 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
sparkContextRef.compareAndSet(sc, null)
}
- private def registerAM(uiAddress: String, uiHistoryAddress: String) = {
+ private def registerAM(uiAddress: String) = {
val sc = sparkContextRef.get()
+
+ val appId = client.getAttemptId().getApplicationId().toString()
+ val historyAddress =
+ sparkConf.getOption("spark.yarn.historyServer.address")
+ .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" }
+ .getOrElse("")
+
allocator = client.register(yarnConf,
if (sc != null) sc.getConf else sparkConf,
if (sc != null) sc.preferredNodeLocationData else Map(),
uiAddress,
- uiHistoryAddress)
+ historyAddress)
allocator.allocateResources()
reporterThread = launchReporterThread()
@@ -175,7 +188,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
if (sc == null) {
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
} else {
- registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
+ registerAM(sc.ui.appUIHostPort)
try {
userThread.join()
} finally {
@@ -190,8 +203,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
conf = sparkConf, securityManager = securityMgr)._1
actor = waitForSparkDriver()
addAmIpFilter()
- registerAM(sparkConf.get("spark.driver.appUIAddress", ""),
- sparkConf.get("spark.driver.appUIHistoryAddress", ""))
+ registerAM(sparkConf.get("spark.driver.appUIAddress", ""))
// In client mode the actor will stop the reporter thread.
reporterThread.join()
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index ffe2731ca1..dc77f12364 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.util.RackResolver
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@@ -156,19 +155,6 @@ object YarnSparkHadoopUtil {
}
}
- def getUIHistoryAddress(sc: SparkContext, conf: SparkConf) : String = {
- val eventLogDir = sc.eventLogger match {
- case Some(logger) => logger.getApplicationLogDir()
- case None => ""
- }
- val historyServerAddress = conf.get("spark.yarn.historyServer.address", "")
- if (historyServerAddress != "" && eventLogDir != "") {
- historyServerAddress + HistoryServer.UI_PATH_PREFIX + s"/$eventLogDir"
- } else {
- ""
- }
- }
-
/**
* Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands
* using `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work. The
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index a5f537dd9d..41c662cd7a 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -56,7 +56,6 @@ private[spark] class YarnClientSchedulerBackend(
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
- conf.set("spark.driver.appUIHistoryAddress", YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf))
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
@@ -150,4 +149,7 @@ private[spark] class YarnClientSchedulerBackend(
override def sufficientResourcesRegistered(): Boolean = {
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
+
+ override def applicationId(): Option[String] = Option(appId).map(_.toString())
+
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 55665220a6..39436d0999 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -28,7 +28,7 @@ private[spark] class YarnClusterSchedulerBackend(
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
var totalExpectedExecutors = 0
-
+
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
minRegisteredRatio = 0.8
}
@@ -47,4 +47,7 @@ private[spark] class YarnClusterSchedulerBackend(
override def sufficientResourcesRegistered(): Boolean = {
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
+
+ override def applicationId(): Option[String] = sc.getConf.getOption("spark.yarn.app.id")
+
}