aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala98
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala109
2 files changed, 147 insertions, 60 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 5427a88f32..2cc465e55f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -83,12 +83,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// List of application logs to be deleted by event log cleaner.
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
- // Constants used to parse Spark 1.0.0 log directories.
- private[history] val LOG_PREFIX = "EVENT_LOG_"
- private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
- private[history] val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
- private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
-
/**
* Return a runnable that performs the given operation on the event logs.
* This operation is expected to be executed periodically.
@@ -146,7 +140,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = {
try {
applications.get(appId).flatMap { appInfo =>
- appInfo.attempts.find(_.attemptId == attemptId).map { attempt =>
+ appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt =>
val replayBus = new ReplayListenerBus()
val ui = {
val conf = this.conf.clone()
@@ -155,20 +149,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime)
// Do not call ui.bind() to avoid creating a new server for each application
}
-
val appListener = new ApplicationEventListener()
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
-
- ui.setAppName(s"${appInfo.name} ($appId)")
-
- val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
- ui.getSecurityManager.setAcls(uiAclsEnabled)
- // make sure to set admin acls before view acls so they are properly picked up
- ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
- ui.getSecurityManager.setViewAcls(attempt.sparkUser,
- appListener.viewAcls.getOrElse(""))
- ui
+ appInfo.map { info =>
+ ui.setAppName(s"${info.name} ($appId)")
+
+ val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
+ ui.getSecurityManager.setAcls(uiAclsEnabled)
+ // make sure to set admin acls before view acls so they are properly picked up
+ ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
+ ui.getSecurityManager.setViewAcls(attempt.sparkUser,
+ appListener.viewAcls.getOrElse(""))
+ ui
+ }
}
}
} catch {
@@ -282,8 +276,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newAttempts = logs.flatMap { fileStatus =>
try {
val res = replay(fileStatus, bus)
- logInfo(s"Application log ${res.logPath} loaded successfully.")
- Some(res)
+ res match {
+ case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
+ case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
+ "The application may have not started.")
+ }
+ res
} catch {
case e: Exception =>
logError(
@@ -429,9 +427,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
/**
* Replays the events in the specified log file and returns information about the associated
- * application.
+ * application. Return `None` if the application ID cannot be located.
*/
- private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
+ private def replay(
+ eventLog: FileStatus,
+ bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val logInput =
@@ -445,16 +445,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
- new FsApplicationAttemptInfo(
- logPath.getName(),
- appListener.appName.getOrElse(NOT_STARTED),
- appListener.appId.getOrElse(logPath.getName()),
- appListener.appAttemptId,
- appListener.startTime.getOrElse(-1L),
- appListener.endTime.getOrElse(-1L),
- getModificationTime(eventLog).get,
- appListener.sparkUser.getOrElse(NOT_STARTED),
- appCompleted)
+
+ // Without an app ID, new logs will render incorrectly in the listing page, so do not list or
+ // try to show their UI. Some old versions of Spark generate logs without an app ID, so let
+ // logs generated by those versions go through.
+ if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) {
+ Some(new FsApplicationAttemptInfo(
+ logPath.getName(),
+ appListener.appName.getOrElse(NOT_STARTED),
+ appListener.appId.getOrElse(logPath.getName()),
+ appListener.appAttemptId,
+ appListener.startTime.getOrElse(-1L),
+ appListener.endTime.getOrElse(-1L),
+ getModificationTime(eventLog).get,
+ appListener.sparkUser.getOrElse(NOT_STARTED),
+ appCompleted))
+ } else {
+ None
+ }
} finally {
logInput.close()
}
@@ -529,10 +537,34 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}
+ /**
+ * Returns whether the version of Spark that generated logs records app IDs. App IDs were added
+ * in Spark 1.1.
+ */
+ private def sparkVersionHasAppId(entry: FileStatus): Boolean = {
+ if (isLegacyLogDirectory(entry)) {
+ fs.listStatus(entry.getPath())
+ .find { status => status.getPath().getName().startsWith(SPARK_VERSION_PREFIX) }
+ .map { status =>
+ val version = status.getPath().getName().substring(SPARK_VERSION_PREFIX.length())
+ version != "1.0" && version != "1.1"
+ }
+ .getOrElse(true)
+ } else {
+ true
+ }
+ }
+
}
-private object FsHistoryProvider {
+private[history] object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+ // Constants used to parse Spark 1.0.0 log directories.
+ val LOG_PREFIX = "EVENT_LOG_"
+ val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
+ val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
+ val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
}
private class FsApplicationAttemptInfo(
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 09075eeb53..2a62450bcd 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -39,6 +39,8 @@ import org.apache.spark.util.{JsonProtocol, ManualClock, Utils}
class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
+ import FsHistoryProvider._
+
private var testDir: File = null
before {
@@ -67,7 +69,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
// Write a new-style application log.
val newAppComplete = newLogFile("new1", None, inProgress = false)
writeFile(newAppComplete, true, None,
- SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
+ SparkListenerApplicationStart(newAppComplete.getName(), Some("new-app-complete"), 1L, "test",
+ None),
SparkListenerApplicationEnd(5L)
)
@@ -75,35 +78,30 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
Some("lzf"))
writeFile(newAppCompressedComplete, true, None,
- SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
+ SparkListenerApplicationStart(newAppCompressedComplete.getName(), Some("new-complete-lzf"),
+ 1L, "test", None),
SparkListenerApplicationEnd(4L))
// Write an unfinished app, new-style.
val newAppIncomplete = newLogFile("new2", None, inProgress = true)
writeFile(newAppIncomplete, true, None,
- SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
+ SparkListenerApplicationStart(newAppIncomplete.getName(), Some("new-incomplete"), 1L, "test",
+ None)
)
// Write an old-style application log.
- val oldAppComplete = new File(testDir, "old1")
- oldAppComplete.mkdir()
- createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
- writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
+ val oldAppComplete = writeOldLog("old1", "1.0", None, true,
+ SparkListenerApplicationStart("old1", Some("old-app-complete"), 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
- createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
// Check for logs so that we force the older unfinished app to be loaded, to make
// sure unfinished apps are also sorted correctly.
provider.checkForLogs()
// Write an unfinished app, old-style.
- val oldAppIncomplete = new File(testDir, "old2")
- oldAppIncomplete.mkdir()
- createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
- writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
+ val oldAppIncomplete = writeOldLog("old2", "1.0", None, false,
+ SparkListenerApplicationStart("old2", None, 2L, "test", None)
)
// Force a reload of data from the log directory, and check that both logs are loaded.
@@ -124,16 +122,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
}
- list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
+ list(0) should be (makeAppInfo("new-app-complete", newAppComplete.getName(), 1L, 5L,
newAppComplete.lastModified(), "test", true))
- list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
- "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
- true))
- list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
+ list(1) should be (makeAppInfo("new-complete-lzf", newAppCompressedComplete.getName(),
+ 1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
+ list(2) should be (makeAppInfo("old-app-complete", oldAppComplete.getName(), 2L, 3L,
oldAppComplete.lastModified(), "test", true))
- list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
- oldAppIncomplete.lastModified(), "test", false))
- list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
+ list(3) should be (makeAppInfo(oldAppIncomplete.getName(), oldAppIncomplete.getName(), 2L,
+ -1L, oldAppIncomplete.lastModified(), "test", false))
+ list(4) should be (makeAppInfo("new-incomplete", newAppIncomplete.getName(), 1L, -1L,
newAppIncomplete.lastModified(), "test", false))
// Make sure the UI can be rendered.
@@ -155,12 +152,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val codec = if (valid) CompressionCodec.createCodec(new SparkConf(), codecName) else null
val logDir = new File(testDir, codecName)
logDir.mkdir()
- createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
- writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
+ createEmptyFile(new File(logDir, SPARK_VERSION_PREFIX + "1.0"))
+ writeFile(new File(logDir, LOG_PREFIX + "1"), false, Option(codec),
SparkListenerApplicationStart("app2", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
- createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
+ createEmptyFile(new File(logDir, COMPRESSION_CODEC_PREFIX + codecName))
val logPath = new Path(logDir.getAbsolutePath())
try {
@@ -180,12 +177,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
test("SPARK-3697: ignore directories that cannot be read.") {
val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
- SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
+ SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
val logFile2 = newLogFile("new2", None, inProgress = false)
writeFile(logFile2, true, None,
- SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
+ SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
logFile2.setReadable(false, false)
@@ -218,6 +215,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}
+ test("Parse logs that application is not started") {
+ val provider = new FsHistoryProvider((createTestConf()))
+
+ val logFile1 = newLogFile("app1", None, inProgress = true)
+ writeFile(logFile1, true, None,
+ SparkListenerLogStart("1.4")
+ )
+ updateAndCheck(provider) { list =>
+ list.size should be (0)
+ }
+ }
+
test("SPARK-5582: empty log directory") {
val provider = new FsHistoryProvider(createTestConf())
@@ -373,6 +382,33 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}
+ test("SPARK-8372: new logs with no app ID are ignored") {
+ val provider = new FsHistoryProvider(createTestConf())
+
+ // Write a new log file without an app id, to make sure it's ignored.
+ val logFile1 = newLogFile("app1", None, inProgress = true)
+ writeFile(logFile1, true, None,
+ SparkListenerLogStart("1.4")
+ )
+
+ // Write a 1.2 log file with no start event (= no app id), it should be ignored.
+ writeOldLog("v12Log", "1.2", None, false)
+
+ // Write 1.0 and 1.1 logs, which don't have app ids.
+ writeOldLog("v11Log", "1.1", None, true,
+ SparkListenerApplicationStart("v11Log", None, 2L, "test", None),
+ SparkListenerApplicationEnd(3L))
+ writeOldLog("v10Log", "1.0", None, true,
+ SparkListenerApplicationStart("v10Log", None, 2L, "test", None),
+ SparkListenerApplicationEnd(4L))
+
+ updateAndCheck(provider) { list =>
+ list.size should be (2)
+ list(0).id should be ("v10Log")
+ list(1).id should be ("v11Log")
+ }
+ }
+
/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
@@ -412,4 +448,23 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
}
+ private def writeOldLog(
+ fname: String,
+ sparkVersion: String,
+ codec: Option[CompressionCodec],
+ completed: Boolean,
+ events: SparkListenerEvent*): File = {
+ val log = new File(testDir, fname)
+ log.mkdir()
+
+ val oldEventLog = new File(log, LOG_PREFIX + "1")
+ createEmptyFile(new File(log, SPARK_VERSION_PREFIX + sparkVersion))
+ writeFile(new File(log, LOG_PREFIX + "1"), false, codec, events: _*)
+ if (completed) {
+ createEmptyFile(new File(log, APPLICATION_COMPLETE))
+ }
+
+ log
+ }
+
}