aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-06-30 14:01:52 -0700
committerAndrew Or <andrew@databricks.com>2015-06-30 14:01:52 -0700
commit4bb8375fc2c6aa8342df03c3617aa97e7d01de3f (patch)
tree91a8c320c0d44cbf3c53c5723be61d7101c3a483 /core/src/main/scala/org
parent7dda0844e1eb6df7455af68592751806b3b92251 (diff)
downloadspark-4bb8375fc2c6aa8342df03c3617aa97e7d01de3f.tar.gz
spark-4bb8375fc2c6aa8342df03c3617aa97e7d01de3f.tar.bz2
spark-4bb8375fc2c6aa8342df03c3617aa97e7d01de3f.zip
[SPARK-8372] Do not show applications that haven't recorded their app ID yet.
Showing these applications may lead to weird behavior in the History Server. For old logs, if the app ID is recorded later, you may end up with a duplicate entry. For new logs, the app might be listed with a ".inprogress" suffix. So ignore those, but still allow old applications that don't record app IDs at all (1.0 and 1.1) to be shown. Author: Marcelo Vanzin <vanzin@cloudera.com> Author: Carson Wang <carson.wang@intel.com> Closes #7097 from vanzin/SPARK-8372 and squashes the following commits: a24eab2 [Marcelo Vanzin] Feedback. 112ae8f [Marcelo Vanzin] Merge branch 'master' into SPARK-8372 7b91b74 [Marcelo Vanzin] Handle logs generated by 1.0 and 1.1. 1eca3fe [Carson Wang] [SPARK-8372] History server shows incorrect information for application not started
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala98
1 files changed, 65 insertions, 33 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 5427a88f32..2cc465e55f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -83,12 +83,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// List of application logs to be deleted by event log cleaner.
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
- // Constants used to parse Spark 1.0.0 log directories.
- private[history] val LOG_PREFIX = "EVENT_LOG_"
- private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
- private[history] val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
- private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
-
/**
* Return a runnable that performs the given operation on the event logs.
* This operation is expected to be executed periodically.
@@ -146,7 +140,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = {
try {
applications.get(appId).flatMap { appInfo =>
- appInfo.attempts.find(_.attemptId == attemptId).map { attempt =>
+ appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt =>
val replayBus = new ReplayListenerBus()
val ui = {
val conf = this.conf.clone()
@@ -155,20 +149,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime)
// Do not call ui.bind() to avoid creating a new server for each application
}
-
val appListener = new ApplicationEventListener()
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
-
- ui.setAppName(s"${appInfo.name} ($appId)")
-
- val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
- ui.getSecurityManager.setAcls(uiAclsEnabled)
- // make sure to set admin acls before view acls so they are properly picked up
- ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
- ui.getSecurityManager.setViewAcls(attempt.sparkUser,
- appListener.viewAcls.getOrElse(""))
- ui
+ appInfo.map { info =>
+ ui.setAppName(s"${info.name} ($appId)")
+
+ val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
+ ui.getSecurityManager.setAcls(uiAclsEnabled)
+ // make sure to set admin acls before view acls so they are properly picked up
+ ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
+ ui.getSecurityManager.setViewAcls(attempt.sparkUser,
+ appListener.viewAcls.getOrElse(""))
+ ui
+ }
}
}
} catch {
@@ -282,8 +276,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newAttempts = logs.flatMap { fileStatus =>
try {
val res = replay(fileStatus, bus)
- logInfo(s"Application log ${res.logPath} loaded successfully.")
- Some(res)
+ res match {
+ case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
+ case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
+ "The application may have not started.")
+ }
+ res
} catch {
case e: Exception =>
logError(
@@ -429,9 +427,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
/**
* Replays the events in the specified log file and returns information about the associated
- * application.
+ * application. Return `None` if the application ID cannot be located.
*/
- private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
+ private def replay(
+ eventLog: FileStatus,
+ bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val logInput =
@@ -445,16 +445,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
- new FsApplicationAttemptInfo(
- logPath.getName(),
- appListener.appName.getOrElse(NOT_STARTED),
- appListener.appId.getOrElse(logPath.getName()),
- appListener.appAttemptId,
- appListener.startTime.getOrElse(-1L),
- appListener.endTime.getOrElse(-1L),
- getModificationTime(eventLog).get,
- appListener.sparkUser.getOrElse(NOT_STARTED),
- appCompleted)
+
+ // Without an app ID, new logs will render incorrectly in the listing page, so do not list or
+ // try to show their UI. Some old versions of Spark generate logs without an app ID, so let
+ // logs generated by those versions go through.
+ if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) {
+ Some(new FsApplicationAttemptInfo(
+ logPath.getName(),
+ appListener.appName.getOrElse(NOT_STARTED),
+ appListener.appId.getOrElse(logPath.getName()),
+ appListener.appAttemptId,
+ appListener.startTime.getOrElse(-1L),
+ appListener.endTime.getOrElse(-1L),
+ getModificationTime(eventLog).get,
+ appListener.sparkUser.getOrElse(NOT_STARTED),
+ appCompleted))
+ } else {
+ None
+ }
} finally {
logInput.close()
}
@@ -529,10 +537,34 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}
+ /**
+ * Returns whether the version of Spark that generated logs records app IDs. App IDs were added
+ * in Spark 1.1.
+ */
+ private def sparkVersionHasAppId(entry: FileStatus): Boolean = {
+ if (isLegacyLogDirectory(entry)) {
+ fs.listStatus(entry.getPath())
+ .find { status => status.getPath().getName().startsWith(SPARK_VERSION_PREFIX) }
+ .map { status =>
+ val version = status.getPath().getName().substring(SPARK_VERSION_PREFIX.length())
+ version != "1.0" && version != "1.1"
+ }
+ .getOrElse(true)
+ } else {
+ true
+ }
+ }
+
}
-private object FsHistoryProvider {
+private[history] object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+ // Constants used to parse Spark 1.0.0 log directories.
+ val LOG_PREFIX = "EVENT_LOG_"
+ val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
+ val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
+ val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
}
private class FsApplicationAttemptInfo(