aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala176
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIUtils.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala9
-rw-r--r--project/MimaExcludes.scala6
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala24
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala14
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala4
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala5
25 files changed, 228 insertions, 133 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cb4fb7cfbd..529febff94 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1261,7 +1261,10 @@ class SparkContext(config: SparkConf) extends Logging {
/** Post the application start event */
private def postApplicationStart() {
- listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser))
+ // Note: this code assumes that the task scheduler has been initialized and has contacted
+ // the cluster manager to get an application ID (in case the cluster manager provides one).
+ listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(),
+ startTime, sparkUser))
}
/** Post the application end event */
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index a0e8bd403a..fbe39b2764 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -34,15 +34,15 @@ private[spark] abstract class ApplicationHistoryProvider {
*
* @return List of all know applications.
*/
- def getListing(): Seq[ApplicationHistoryInfo]
+ def getListing(): Iterable[ApplicationHistoryInfo]
/**
* Returns the Spark UI for a specific application.
*
* @param appId The application ID.
- * @return The application's UI, or null if application is not found.
+ * @return The application's UI, or None if application is not found.
*/
- def getAppUI(appId: String): SparkUI
+ def getAppUI(appId: String): Option[SparkUI]
/**
* Called when the server is shutting down.
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 05c8a90782..481f6c93c6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -32,6 +32,8 @@ import org.apache.spark.util.Utils
private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
with Logging {
+ private val NOT_STARTED = "<Not Started>"
+
// Interval between each check for event log updates
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
conf.getInt("spark.history.updateInterval", 10)) * 1000
@@ -47,8 +49,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L
- // List of applications, in order from newest to oldest.
- @volatile private var appList: Seq[ApplicationHistoryInfo] = Nil
+ // The modification time of the newest log detected during the last scan. This is used
+ // to ignore logs that are older during subsequent scans, to avoid processing data that
+ // is already known.
+ private var lastModifiedTime = -1L
+
+ // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
+ // into the map in order, so the LinkedHashMap maintains the correct ordering.
+ @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
+ = new mutable.LinkedHashMap()
/**
* A background thread that periodically checks for event log updates on disk.
@@ -93,15 +102,35 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
logCheckingThread.start()
}
- override def getListing() = appList
+ override def getListing() = applications.values
- override def getAppUI(appId: String): SparkUI = {
+ override def getAppUI(appId: String): Option[SparkUI] = {
try {
- val appLogDir = fs.getFileStatus(new Path(resolvedLogDir.toString, appId))
- val (_, ui) = loadAppInfo(appLogDir, renderUI = true)
- ui
+ applications.get(appId).map { info =>
+ val (replayBus, appListener) = createReplayBus(fs.getFileStatus(
+ new Path(logDir, info.logDir)))
+ val ui = {
+ val conf = this.conf.clone()
+ val appSecManager = new SecurityManager(conf)
+ new SparkUI(conf, appSecManager, replayBus, appId,
+ s"${HistoryServer.UI_PATH_PREFIX}/$appId")
+ // Do not call ui.bind() to avoid creating a new server for each application
+ }
+
+ replayBus.replay()
+
+ ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED)} ($appId)")
+
+ val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
+ ui.getSecurityManager.setAcls(uiAclsEnabled)
+ // make sure to set admin acls before view acls so they are properly picked up
+ ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
+ ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED),
+ appListener.viewAcls.getOrElse(""))
+ ui
+ }
} catch {
- case e: FileNotFoundException => null
+ case e: FileNotFoundException => None
}
}
@@ -119,84 +148,79 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
try {
val logStatus = fs.listStatus(new Path(resolvedLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
- val logInfos = logDirs.filter { dir =>
- fs.isFile(new Path(dir.getPath, EventLoggingListener.APPLICATION_COMPLETE))
- }
- val currentApps = Map[String, ApplicationHistoryInfo](
- appList.map(app => app.id -> app):_*)
-
- // For any application that either (i) is not listed or (ii) has changed since the last time
- // the listing was created (defined by the log dir's modification time), load the app's info.
- // Otherwise just reuse what's already in memory.
- val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size)
- for (dir <- logInfos) {
- val curr = currentApps.getOrElse(dir.getPath().getName(), null)
- if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
+ // Load all new logs from the log directory. Only directories that have a modification time
+ // later than the last known log directory will be loaded.
+ var newLastModifiedTime = lastModifiedTime
+ val logInfos = logDirs
+ .filter { dir =>
+ if (fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) {
+ val modTime = getModificationTime(dir)
+ newLastModifiedTime = math.max(newLastModifiedTime, modTime)
+ modTime > lastModifiedTime
+ } else {
+ false
+ }
+ }
+ .flatMap { dir =>
try {
- val (app, _) = loadAppInfo(dir, renderUI = false)
- newApps += app
+ val (replayBus, appListener) = createReplayBus(dir)
+ replayBus.replay()
+ Some(new FsApplicationHistoryInfo(
+ dir.getPath().getName(),
+ appListener.appId.getOrElse(dir.getPath().getName()),
+ appListener.appName.getOrElse(NOT_STARTED),
+ appListener.startTime.getOrElse(-1L),
+ appListener.endTime.getOrElse(-1L),
+ getModificationTime(dir),
+ appListener.sparkUser.getOrElse(NOT_STARTED)))
} catch {
- case e: Exception => logError(s"Failed to load app info from directory $dir.")
+ case e: Exception =>
+ logInfo(s"Failed to load application log data from $dir.", e)
+ None
+ }
+ }
+ .sortBy { info => -info.endTime }
+
+ lastModifiedTime = newLastModifiedTime
+
+ // When there are new logs, merge the new list with the existing one, maintaining
+ // the expected ordering (descending end time). Maintaining the order is important
+ // to avoid having to sort the list every time there is a request for the log list.
+ if (!logInfos.isEmpty) {
+ val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
+ def addIfAbsent(info: FsApplicationHistoryInfo) = {
+ if (!newApps.contains(info.id)) {
+ newApps += (info.id -> info)
}
- } else {
- newApps += curr
}
- }
- appList = newApps.sortBy { info => -info.endTime }
+ val newIterator = logInfos.iterator.buffered
+ val oldIterator = applications.values.iterator.buffered
+ while (newIterator.hasNext && oldIterator.hasNext) {
+ if (newIterator.head.endTime > oldIterator.head.endTime) {
+ addIfAbsent(newIterator.next)
+ } else {
+ addIfAbsent(oldIterator.next)
+ }
+ }
+ newIterator.foreach(addIfAbsent)
+ oldIterator.foreach(addIfAbsent)
+
+ applications = newApps
+ }
} catch {
case t: Throwable => logError("Exception in checking for event log updates", t)
}
}
- /**
- * Parse the application's logs to find out the information we need to build the
- * listing page.
- *
- * When creating the listing of available apps, there is no need to load the whole UI for the
- * application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user
- * clicks on a specific application.
- *
- * @param logDir Directory with application's log files.
- * @param renderUI Whether to create the SparkUI for the application.
- * @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
- */
- private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
- val path = logDir.getPath
- val appId = path.getName
+ private def createReplayBus(logDir: FileStatus): (ReplayListenerBus, ApplicationEventListener) = {
+ val path = logDir.getPath()
val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)
-
- val ui: SparkUI = if (renderUI) {
- val conf = this.conf.clone()
- val appSecManager = new SecurityManager(conf)
- new SparkUI(conf, appSecManager, replayBus, appId,
- HistoryServer.UI_PATH_PREFIX + s"/$appId")
- // Do not call ui.bind() to avoid creating a new server for each application
- } else {
- null
- }
-
- replayBus.replay()
- val appInfo = ApplicationHistoryInfo(
- appId,
- appListener.appName,
- appListener.startTime,
- appListener.endTime,
- getModificationTime(logDir),
- appListener.sparkUser)
-
- if (ui != null) {
- val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
- ui.getSecurityManager.setAcls(uiAclsEnabled)
- // make sure to set admin acls before view acls so properly picked up
- ui.getSecurityManager.setAdminAcls(appListener.adminAcls)
- ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
- }
- (appInfo, ui)
+ (replayBus, appListener)
}
/** Return when this directory was last modified. */
@@ -219,3 +243,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000)
}
+
+private class FsApplicationHistoryInfo(
+ val logDir: String,
+ id: String,
+ name: String,
+ startTime: Long,
+ endTime: Long,
+ lastUpdated: Long,
+ sparkUser: String)
+ extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index d1a64c1912..ce00c0ffd2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -52,10 +52,7 @@ class HistoryServer(
private val appLoader = new CacheLoader[String, SparkUI] {
override def load(key: String): SparkUI = {
- val ui = provider.getAppUI(key)
- if (ui == null) {
- throw new NoSuchElementException()
- }
+ val ui = provider.getAppUI(key).getOrElse(throw new NoSuchElementException())
attachSparkUI(ui)
ui
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
index 162158babc..6d39a5e3fa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
@@ -24,38 +24,31 @@ package org.apache.spark.scheduler
* from multiple applications are seen, the behavior is unspecified.
*/
private[spark] class ApplicationEventListener extends SparkListener {
- var appName = "<Not Started>"
- var sparkUser = "<Not Started>"
- var startTime = -1L
- var endTime = -1L
- var viewAcls = ""
- var adminAcls = ""
-
- def applicationStarted = startTime != -1
-
- def applicationCompleted = endTime != -1
-
- def applicationDuration: Long = {
- val difference = endTime - startTime
- if (applicationStarted && applicationCompleted && difference > 0) difference else -1L
- }
+ var appName: Option[String] = None
+ var appId: Option[String] = None
+ var sparkUser: Option[String] = None
+ var startTime: Option[Long] = None
+ var endTime: Option[Long] = None
+ var viewAcls: Option[String] = None
+ var adminAcls: Option[String] = None
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
- appName = applicationStart.appName
- startTime = applicationStart.time
- sparkUser = applicationStart.sparkUser
+ appName = Some(applicationStart.appName)
+ appId = applicationStart.appId
+ startTime = Some(applicationStart.time)
+ sparkUser = Some(applicationStart.sparkUser)
}
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
- endTime = applicationEnd.time
+ endTime = Some(applicationEnd.time)
}
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
synchronized {
val environmentDetails = environmentUpdate.environmentDetails
val allProperties = environmentDetails("Spark Properties").toMap
- viewAcls = allProperties.getOrElse("spark.ui.view.acls", "")
- adminAcls = allProperties.getOrElse("spark.admin.acls", "")
+ viewAcls = allProperties.get("spark.ui.view.acls")
+ adminAcls = allProperties.get("spark.admin.acls")
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index e41e0a9841..a0be8307ef 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -31,4 +31,12 @@ private[spark] trait SchedulerBackend {
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException
def isReady(): Boolean = true
+
+ /**
+ * The application ID associated with the job, if any.
+ *
+ * @return The application ID, or None if the backend does not provide an ID.
+ */
+ def applicationId(): Option[String] = None
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index f33c2e065a..86afe3bd52 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -89,8 +89,8 @@ case class SparkListenerExecutorMetricsUpdate(
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String)
- extends SparkListenerEvent
+case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long,
+ sparkUser: String) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 1a0b877c8a..1c1ce666ea 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -64,4 +64,12 @@ private[spark] trait TaskScheduler {
*/
def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean
+
+ /**
+ * The application ID associated with the job, if any.
+ *
+ * @return The application ID, or None if the backend does not provide an ID.
+ */
+ def applicationId(): Option[String] = None
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index ad051e59af..633e892554 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -491,6 +491,9 @@ private[spark] class TaskSchedulerImpl(
}
}
}
+
+ override def applicationId(): Option[String] = backend.applicationId()
+
}
@@ -535,4 +538,5 @@ private[spark] object TaskSchedulerImpl {
retval.toList
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 2a3711ae2a..5b5257269d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -51,12 +51,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
- // Submit tasks only after (registered resources / total expected resources)
+ // Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio =
math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
// Submit tasks after maxRegisteredWaitingTime milliseconds
- // if minRegisteredRatio has not yet been reached
+ // if minRegisteredRatio has not yet been reached
val maxRegisteredWaitingTime =
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
val createTime = System.currentTimeMillis()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index bc7670f4a8..513d74a08a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -69,4 +69,5 @@ private[spark] class SimrSchedulerBackend(
fs.delete(new Path(driverFilePath), false)
super.stop()
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 32138e5246..06872ace2e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -34,6 +34,10 @@ private[spark] class SparkDeploySchedulerBackend(
var client: AppClient = null
var stopping = false
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
+ var appId: String = _
+
+ val registrationLock = new Object()
+ var registrationDone = false
val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
val totalExpectedCores = maxCores.getOrElse(0)
@@ -68,6 +72,8 @@ private[spark] class SparkDeploySchedulerBackend(
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
+
+ waitForRegistration()
}
override def stop() {
@@ -81,15 +87,19 @@ private[spark] class SparkDeploySchedulerBackend(
override def connected(appId: String) {
logInfo("Connected to Spark cluster with app ID " + appId)
+ this.appId = appId
+ notifyContext()
}
override def disconnected() {
+ notifyContext()
if (!stopping) {
logWarning("Disconnected from Spark cluster! Waiting for reconnection...")
}
}
override def dead(reason: String) {
+ notifyContext()
if (!stopping) {
logError("Application has been killed. Reason: " + reason)
scheduler.error(reason)
@@ -116,4 +126,22 @@ private[spark] class SparkDeploySchedulerBackend(
override def sufficientResourcesRegistered(): Boolean = {
totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
}
+
+ override def applicationId(): Option[String] = Option(appId)
+
+ private def waitForRegistration() = {
+ registrationLock.synchronized {
+ while (!registrationDone) {
+ registrationLock.wait()
+ }
+ }
+ }
+
+ private def notifyContext() = {
+ registrationLock.synchronized {
+ registrationDone = true
+ registrationLock.notifyAll()
+ }
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 87e181e773..da43ef5676 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -309,4 +309,5 @@ private[spark] class CoarseMesosSchedulerBackend(
logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
slaveLost(d, s)
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 67ee4d66f1..a9ef126f5d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -349,4 +349,5 @@ private[spark] class MesosSchedulerBackend(
// TODO: query Mesos for number of cores
override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index bec9502f20..9ea25c2bc7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -114,4 +114,5 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
localActor ! StatusUpdate(taskId, state, serializedData)
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index bee6dad338..f0006b42ae 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -232,7 +232,7 @@ private[spark] object UIUtils extends Logging {
def listingTable[T](
headers: Seq[String],
generateDataRow: T => Seq[Node],
- data: Seq[T],
+ data: Iterable[T],
fixedWidth: Boolean = false): Seq[Node] = {
var listingTableClass = TABLE_CLASS
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 1fc536b096..b0754e3ce1 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -171,6 +171,7 @@ private[spark] object JsonProtocol {
def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = {
("Event" -> Utils.getFormattedClassName(applicationStart)) ~
("App Name" -> applicationStart.appName) ~
+ ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
("Timestamp" -> applicationStart.time) ~
("User" -> applicationStart.sparkUser)
}
@@ -484,9 +485,10 @@ private[spark] object JsonProtocol {
def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = {
val appName = (json \ "App Name").extract[String]
+ val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String])
val time = (json \ "Timestamp").extract[Long]
val sparkUser = (json \ "User").extract[String]
- SparkListenerApplicationStart(appName, time, sparkUser)
+ SparkListenerApplicationStart(appName, appId, time, sparkUser)
}
def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 41e58a008c..fead883793 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -229,7 +229,8 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
val conf = getLoggingConf(logDirPath, compressionCodec)
val eventLogger = new EventLoggingListener("test", conf)
val listenerBus = new LiveListenerBus
- val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey")
+ val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
+ 125L, "Mickey")
val applicationEnd = SparkListenerApplicationEnd(1000L)
// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 8f0ee9f4db..7ab351d1b4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -83,7 +83,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
val fstream = fileSystem.create(logFilePath)
val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
val writer = new PrintWriter(cstream)
- val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey")
+ val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
+ 125L, "Mickey")
val applicationEnd = SparkListenerApplicationEnd(1000L)
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index c84bafce37..2b45d8b695 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -60,7 +60,7 @@ class JsonProtocolSuite extends FunSuite {
val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
BlockManagerId("Scarce", "to be counted...", 100))
val unpersistRdd = SparkListenerUnpersistRDD(12345)
- val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield")
+ val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
val applicationEnd = SparkListenerApplicationEnd(42L)
testEvent(stageSubmitted, stageSubmittedJsonString)
@@ -176,6 +176,13 @@ class JsonProtocolSuite extends FunSuite {
deserializedBmRemoved)
}
+ test("SparkListenerApplicationStart backwards compatibility") {
+ // SparkListenerApplicationStart in Spark 1.0.0 do not have an "appId" property.
+ val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user")
+ val oldEvent = JsonProtocol.applicationStartToJson(applicationStart)
+ .removeField({ _._1 == "App ID" })
+ assert(applicationStart === JsonProtocol.applicationStartFromJson(oldEvent))
+ }
/** -------------------------- *
| Helper test running methods |
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a2f1b3582a..855d5cc8cf 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -111,6 +111,8 @@ object MimaExcludes {
MimaBuild.excludeSparkClass("storage.Values") ++
MimaBuild.excludeSparkClass("storage.Entry") ++
MimaBuild.excludeSparkClass("storage.MemoryStore$Entry") ++
+ // Class was missing "@DeveloperApi" annotation in 1.0.
+ MimaBuild.excludeSparkClass("scheduler.SparkListenerApplicationStart") ++
Seq(
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"org.apache.spark.mllib.tree.impurity.Gini.calculate"),
@@ -119,14 +121,14 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"org.apache.spark.mllib.tree.impurity.Variance.calculate")
) ++
- Seq ( // Package-private classes removed in SPARK-2341
+ Seq( // Package-private classes removed in SPARK-2341
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.BinaryLabelParser"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.BinaryLabelParser$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser$")
- ) ++
+ ) ++
Seq( // package-private classes removed in MLlib
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.org$apache$spark$mllib$regression$GeneralizedLinearAlgorithm$$prependOne")
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 8c54840971..98039a20de 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
@@ -70,6 +71,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
private val sparkContextRef = new AtomicReference[SparkContext](null)
final def run(): Int = {
+ val appAttemptId = client.getAttemptId()
+
if (isDriver) {
// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
@@ -77,9 +80,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// Set the master property to match the requested mode.
System.setProperty("spark.master", "yarn-cluster")
+
+ // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
+ System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
}
- logInfo("ApplicationAttemptId: " + client.getAttemptId())
+ logInfo("ApplicationAttemptId: " + appAttemptId)
val cleanupHook = new Runnable {
override def run() {
@@ -151,13 +157,20 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
sparkContextRef.compareAndSet(sc, null)
}
- private def registerAM(uiAddress: String, uiHistoryAddress: String) = {
+ private def registerAM(uiAddress: String) = {
val sc = sparkContextRef.get()
+
+ val appId = client.getAttemptId().getApplicationId().toString()
+ val historyAddress =
+ sparkConf.getOption("spark.yarn.historyServer.address")
+ .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" }
+ .getOrElse("")
+
allocator = client.register(yarnConf,
if (sc != null) sc.getConf else sparkConf,
if (sc != null) sc.preferredNodeLocationData else Map(),
uiAddress,
- uiHistoryAddress)
+ historyAddress)
allocator.allocateResources()
reporterThread = launchReporterThread()
@@ -175,7 +188,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
if (sc == null) {
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
} else {
- registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
+ registerAM(sc.ui.appUIHostPort)
try {
userThread.join()
} finally {
@@ -190,8 +203,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
conf = sparkConf, securityManager = securityMgr)._1
actor = waitForSparkDriver()
addAmIpFilter()
- registerAM(sparkConf.get("spark.driver.appUIAddress", ""),
- sparkConf.get("spark.driver.appUIHistoryAddress", ""))
+ registerAM(sparkConf.get("spark.driver.appUIAddress", ""))
// In client mode the actor will stop the reporter thread.
reporterThread.join()
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index ffe2731ca1..dc77f12364 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.util.RackResolver
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@@ -156,19 +155,6 @@ object YarnSparkHadoopUtil {
}
}
- def getUIHistoryAddress(sc: SparkContext, conf: SparkConf) : String = {
- val eventLogDir = sc.eventLogger match {
- case Some(logger) => logger.getApplicationLogDir()
- case None => ""
- }
- val historyServerAddress = conf.get("spark.yarn.historyServer.address", "")
- if (historyServerAddress != "" && eventLogDir != "") {
- historyServerAddress + HistoryServer.UI_PATH_PREFIX + s"/$eventLogDir"
- } else {
- ""
- }
- }
-
/**
* Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands
* using `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work. The
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index a5f537dd9d..41c662cd7a 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -56,7 +56,6 @@ private[spark] class YarnClientSchedulerBackend(
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
- conf.set("spark.driver.appUIHistoryAddress", YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf))
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
@@ -150,4 +149,7 @@ private[spark] class YarnClientSchedulerBackend(
override def sufficientResourcesRegistered(): Boolean = {
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
+
+ override def applicationId(): Option[String] = Option(appId).map(_.toString())
+
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 55665220a6..39436d0999 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -28,7 +28,7 @@ private[spark] class YarnClusterSchedulerBackend(
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
var totalExpectedExecutors = 0
-
+
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
minRegisteredRatio = 0.8
}
@@ -47,4 +47,7 @@ private[spark] class YarnClusterSchedulerBackend(
override def sufficientResourcesRegistered(): Boolean = {
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
+
+ override def applicationId(): Option[String] = sc.getConf.getOption("spark.yarn.app.id")
+
}