aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-06-30 14:01:52 -0700
committerAndrew Or <andrew@databricks.com>2015-06-30 14:05:53 -0700
commit1b5439f6e809da7389993244f484692fb9ffb43f (patch)
tree6eb3fce3d75d9966b9963cdcdc0e10470cff8564
parentbc355e24368123baca5335ddf5560ded1da11141 (diff)
downloadspark-1b5439f6e809da7389993244f484692fb9ffb43f.tar.gz
spark-1b5439f6e809da7389993244f484692fb9ffb43f.tar.bz2
spark-1b5439f6e809da7389993244f484692fb9ffb43f.zip
[SPARK-8372] Do not show applications that haven't recorded their app ID yet.
Showing these applications may lead to weird behavior in the History Server. For old logs, if the app ID is recorded later, you may end up with a duplicate entry. For new logs, the app might be listed with a ".inprogress" suffix. So ignore those, but still allow old applications that don't record app IDs at all (1.0 and 1.1) to be shown. Author: Marcelo Vanzin <vanzin@cloudera.com> Author: Carson Wang <carson.wang@intel.com> Closes #7097 from vanzin/SPARK-8372 and squashes the following commits: a24eab2 [Marcelo Vanzin] Feedback. 112ae8f [Marcelo Vanzin] Merge branch 'master' into SPARK-8372 7b91b74 [Marcelo Vanzin] Handle logs generated by 1.0 and 1.1. 1eca3fe [Carson Wang] [SPARK-8372] History server shows incorrect information for application not started Conflicts: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala98
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala109
2 files changed, 147 insertions, 60 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 45c2be34c8..3daa653589 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -80,12 +80,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// List of application logs to be deleted by event log cleaner.
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
- // Constants used to parse Spark 1.0.0 log directories.
- private[history] val LOG_PREFIX = "EVENT_LOG_"
- private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
- private[history] val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
- private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
-
/**
* Return a runnable that performs the given operation on the event logs.
* This operation is expected to be executed periodically.
@@ -143,7 +137,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = {
try {
applications.get(appId).flatMap { appInfo =>
- appInfo.attempts.find(_.attemptId == attemptId).map { attempt =>
+ appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt =>
val replayBus = new ReplayListenerBus()
val ui = {
val conf = this.conf.clone()
@@ -152,20 +146,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime)
// Do not call ui.bind() to avoid creating a new server for each application
}
-
val appListener = new ApplicationEventListener()
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
-
- ui.setAppName(s"${appInfo.name} ($appId)")
-
- val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
- ui.getSecurityManager.setAcls(uiAclsEnabled)
- // make sure to set admin acls before view acls so they are properly picked up
- ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
- ui.getSecurityManager.setViewAcls(attempt.sparkUser,
- appListener.viewAcls.getOrElse(""))
- ui
+ appInfo.map { info =>
+ ui.setAppName(s"${info.name} ($appId)")
+
+ val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
+ ui.getSecurityManager.setAcls(uiAclsEnabled)
+ // make sure to set admin acls before view acls so they are properly picked up
+ ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
+ ui.getSecurityManager.setViewAcls(attempt.sparkUser,
+ appListener.viewAcls.getOrElse(""))
+ ui
+ }
}
}
} catch {
@@ -227,8 +221,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newAttempts = logs.flatMap { fileStatus =>
try {
val res = replay(fileStatus, bus)
- logInfo(s"Application log ${res.logPath} loaded successfully.")
- Some(res)
+ res match {
+ case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
+ case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
+ "The application may have not started.")
+ }
+ res
} catch {
case e: Exception =>
logError(
@@ -374,9 +372,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
/**
* Replays the events in the specified log file and returns information about the associated
- * application.
+ * application. Return `None` if the application ID cannot be located.
*/
- private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
+ private def replay(
+ eventLog: FileStatus,
+ bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val logInput =
@@ -390,16 +390,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
- new FsApplicationAttemptInfo(
- logPath.getName(),
- appListener.appName.getOrElse(NOT_STARTED),
- appListener.appId.getOrElse(logPath.getName()),
- appListener.appAttemptId,
- appListener.startTime.getOrElse(-1L),
- appListener.endTime.getOrElse(-1L),
- getModificationTime(eventLog).get,
- appListener.sparkUser.getOrElse(NOT_STARTED),
- appCompleted)
+
+ // Without an app ID, new logs will render incorrectly in the listing page, so do not list or
+ // try to show their UI. Some old versions of Spark generate logs without an app ID, so let
+ // logs generated by those versions go through.
+ if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) {
+ Some(new FsApplicationAttemptInfo(
+ logPath.getName(),
+ appListener.appName.getOrElse(NOT_STARTED),
+ appListener.appId.getOrElse(logPath.getName()),
+ appListener.appAttemptId,
+ appListener.startTime.getOrElse(-1L),
+ appListener.endTime.getOrElse(-1L),
+ getModificationTime(eventLog).get,
+ appListener.sparkUser.getOrElse(NOT_STARTED),
+ appCompleted))
+ } else {
+ None
+ }
} finally {
logInput.close()
}
@@ -474,10 +482,34 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}
+ /**
+ * Returns whether the version of Spark that generated logs records app IDs. App IDs were added
+ * in Spark 1.1.
+ */
+ private def sparkVersionHasAppId(entry: FileStatus): Boolean = {
+ if (isLegacyLogDirectory(entry)) {
+ fs.listStatus(entry.getPath())
+ .find { status => status.getPath().getName().startsWith(SPARK_VERSION_PREFIX) }
+ .map { status =>
+ val version = status.getPath().getName().substring(SPARK_VERSION_PREFIX.length())
+ version != "1.0" && version != "1.1"
+ }
+ .getOrElse(true)
+ } else {
+ true
+ }
+ }
+
}
-private object FsHistoryProvider {
+private[history] object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+ // Constants used to parse Spark 1.0.0 log directories.
+ val LOG_PREFIX = "EVENT_LOG_"
+ val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
+ val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
+ val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
}
private class FsApplicationAttemptInfo(
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 0f6933df9e..afa4958172 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -35,6 +35,8 @@ import org.apache.spark.util.{JsonProtocol, ManualClock, Utils}
class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
+ import FsHistoryProvider._
+
private var testDir: File = null
before {
@@ -63,7 +65,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
// Write a new-style application log.
val newAppComplete = newLogFile("new1", None, inProgress = false)
writeFile(newAppComplete, true, None,
- SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
+ SparkListenerApplicationStart(newAppComplete.getName(), Some("new-app-complete"), 1L, "test",
+ None),
SparkListenerApplicationEnd(5L)
)
@@ -71,35 +74,30 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
Some("lzf"))
writeFile(newAppCompressedComplete, true, None,
- SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
+ SparkListenerApplicationStart(newAppCompressedComplete.getName(), Some("new-complete-lzf"),
+ 1L, "test", None),
SparkListenerApplicationEnd(4L))
// Write an unfinished app, new-style.
val newAppIncomplete = newLogFile("new2", None, inProgress = true)
writeFile(newAppIncomplete, true, None,
- SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
+ SparkListenerApplicationStart(newAppIncomplete.getName(), Some("new-incomplete"), 1L, "test",
+ None)
)
// Write an old-style application log.
- val oldAppComplete = new File(testDir, "old1")
- oldAppComplete.mkdir()
- createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
- writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
+ val oldAppComplete = writeOldLog("old1", "1.0", None, true,
+ SparkListenerApplicationStart("old1", Some("old-app-complete"), 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
- createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
// Check for logs so that we force the older unfinished app to be loaded, to make
// sure unfinished apps are also sorted correctly.
provider.checkForLogs()
// Write an unfinished app, old-style.
- val oldAppIncomplete = new File(testDir, "old2")
- oldAppIncomplete.mkdir()
- createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
- writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
+ val oldAppIncomplete = writeOldLog("old2", "1.0", None, false,
+ SparkListenerApplicationStart("old2", None, 2L, "test", None)
)
// Force a reload of data from the log directory, and check that both logs are loaded.
@@ -120,16 +118,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
}
- list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
+ list(0) should be (makeAppInfo("new-app-complete", newAppComplete.getName(), 1L, 5L,
newAppComplete.lastModified(), "test", true))
- list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
- "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
- true))
- list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
+ list(1) should be (makeAppInfo("new-complete-lzf", newAppCompressedComplete.getName(),
+ 1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
+ list(2) should be (makeAppInfo("old-app-complete", oldAppComplete.getName(), 2L, 3L,
oldAppComplete.lastModified(), "test", true))
- list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
- oldAppIncomplete.lastModified(), "test", false))
- list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
+ list(3) should be (makeAppInfo(oldAppIncomplete.getName(), oldAppIncomplete.getName(), 2L,
+ -1L, oldAppIncomplete.lastModified(), "test", false))
+ list(4) should be (makeAppInfo("new-incomplete", newAppIncomplete.getName(), 1L, -1L,
newAppIncomplete.lastModified(), "test", false))
// Make sure the UI can be rendered.
@@ -151,12 +148,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val codec = if (valid) CompressionCodec.createCodec(new SparkConf(), codecName) else null
val logDir = new File(testDir, codecName)
logDir.mkdir()
- createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
- writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
+ createEmptyFile(new File(logDir, SPARK_VERSION_PREFIX + "1.0"))
+ writeFile(new File(logDir, LOG_PREFIX + "1"), false, Option(codec),
SparkListenerApplicationStart("app2", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
- createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
+ createEmptyFile(new File(logDir, COMPRESSION_CODEC_PREFIX + codecName))
val logPath = new Path(logDir.getAbsolutePath())
try {
@@ -176,12 +173,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
test("SPARK-3697: ignore directories that cannot be read.") {
val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
- SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
+ SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
val logFile2 = newLogFile("new2", None, inProgress = false)
writeFile(logFile2, true, None,
- SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
+ SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
logFile2.setReadable(false, false)
@@ -214,6 +211,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}
+ test("Parse logs that application is not started") {
+ val provider = new FsHistoryProvider((createTestConf()))
+
+ val logFile1 = newLogFile("app1", None, inProgress = true)
+ writeFile(logFile1, true, None,
+ SparkListenerLogStart("1.4")
+ )
+ updateAndCheck(provider) { list =>
+ list.size should be (0)
+ }
+ }
+
test("SPARK-5582: empty log directory") {
val provider = new FsHistoryProvider(createTestConf())
@@ -335,6 +344,33 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
assert(!log2.exists())
}
+ test("SPARK-8372: new logs with no app ID are ignored") {
+ val provider = new FsHistoryProvider(createTestConf())
+
+ // Write a new log file without an app id, to make sure it's ignored.
+ val logFile1 = newLogFile("app1", None, inProgress = true)
+ writeFile(logFile1, true, None,
+ SparkListenerLogStart("1.4")
+ )
+
+ // Write a 1.2 log file with no start event (= no app id), it should be ignored.
+ writeOldLog("v12Log", "1.2", None, false)
+
+ // Write 1.0 and 1.1 logs, which don't have app ids.
+ writeOldLog("v11Log", "1.1", None, true,
+ SparkListenerApplicationStart("v11Log", None, 2L, "test", None),
+ SparkListenerApplicationEnd(3L))
+ writeOldLog("v10Log", "1.0", None, true,
+ SparkListenerApplicationStart("v10Log", None, 2L, "test", None),
+ SparkListenerApplicationEnd(4L))
+
+ updateAndCheck(provider) { list =>
+ list.size should be (2)
+ list(0).id should be ("v10Log")
+ list(1).id should be ("v11Log")
+ }
+ }
+
/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
@@ -374,4 +410,23 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
}
+ private def writeOldLog(
+ fname: String,
+ sparkVersion: String,
+ codec: Option[CompressionCodec],
+ completed: Boolean,
+ events: SparkListenerEvent*): File = {
+ val log = new File(testDir, fname)
+ log.mkdir()
+
+ val oldEventLog = new File(log, LOG_PREFIX + "1")
+ createEmptyFile(new File(log, SPARK_VERSION_PREFIX + sparkVersion))
+ writeFile(new File(log, LOG_PREFIX + "1"), false, codec, events: _*)
+ if (completed) {
+ createEmptyFile(new File(log, APPLICATION_COMPLETE))
+ }
+
+ log
+ }
+
}