aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarson Wang <carson.wang@intel.com>2015-06-17 13:41:36 -0700
committerAndrew Or <andrew@databricks.com>2015-06-17 13:42:36 -0700
commit2837e067099921dd4ab6639ac5f6e89f789d4ff4 (patch)
tree4b926ba1426037107e144c28151ec1aa80d3fc01
parent7ad8c5d869555b1bf4b50eafdf80e057a0175941 (diff)
downloadspark-2837e067099921dd4ab6639ac5f6e89f789d4ff4.tar.gz
spark-2837e067099921dd4ab6639ac5f6e89f789d4ff4.tar.bz2
spark-2837e067099921dd4ab6639ac5f6e89f789d4ff4.zip
[SPARK-8372] History server shows incorrect information for application not started
The history server may show an incorrect App ID for an incomplete application like <App ID>.inprogress. This app info will never disappear even after the app is completed. ![incorrectappinfo](https://cloud.githubusercontent.com/assets/9278199/8156147/2a10fdbe-137d-11e5-9620-c5b61d93e3c1.png) The cause of the issue is that a log path name is used as the app id when app id cannot be got during replay. Author: Carson Wang <carson.wang@intel.com> Closes #6827 from carsonwang/SPARK-8372 and squashes the following commits: cdbb089 [Carson Wang] Fix code style 3e46b35 [Carson Wang] Update code style 90f5dde [Carson Wang] Add a unit test d8c9cd0 [Carson Wang] Replaying events only return information when app is started
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala38
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala43
2 files changed, 53 insertions, 28 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 5427a88f32..db383b9823 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -160,7 +160,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
- ui.setAppName(s"${appInfo.name} ($appId)")
+ appInfo.foreach { app => ui.setAppName(s"${app.name} ($appId)") }
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
@@ -282,8 +282,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newAttempts = logs.flatMap { fileStatus =>
try {
val res = replay(fileStatus, bus)
- logInfo(s"Application log ${res.logPath} loaded successfully.")
- Some(res)
+ res match {
+ case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
+ case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
+ "The application may have not started.")
+ }
+ res
} catch {
case e: Exception =>
logError(
@@ -429,9 +433,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
/**
* Replays the events in the specified log file and returns information about the associated
- * application.
+ * application. Return `None` if the application ID cannot be located.
*/
- private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
+ private def replay(
+ eventLog: FileStatus,
+ bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val logInput =
@@ -445,16 +451,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
- new FsApplicationAttemptInfo(
- logPath.getName(),
- appListener.appName.getOrElse(NOT_STARTED),
- appListener.appId.getOrElse(logPath.getName()),
- appListener.appAttemptId,
- appListener.startTime.getOrElse(-1L),
- appListener.endTime.getOrElse(-1L),
- getModificationTime(eventLog).get,
- appListener.sparkUser.getOrElse(NOT_STARTED),
- appCompleted)
+ appListener.appId.map { appId =>
+ new FsApplicationAttemptInfo(
+ logPath.getName(),
+ appListener.appName.getOrElse(NOT_STARTED),
+ appId,
+ appListener.appAttemptId,
+ appListener.startTime.getOrElse(-1L),
+ appListener.endTime.getOrElse(-1L),
+ getModificationTime(eventLog).get,
+ appListener.sparkUser.getOrElse(NOT_STARTED),
+ appCompleted)
+ }
} finally {
logInput.close()
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 09075eeb53..d3a6db5f26 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -67,7 +67,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
// Write a new-style application log.
val newAppComplete = newLogFile("new1", None, inProgress = false)
writeFile(newAppComplete, true, None,
- SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
+ SparkListenerApplicationStart(
+ "new-app-complete", Some("new-app-complete"), 1L, "test", None),
SparkListenerApplicationEnd(5L)
)
@@ -75,13 +76,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
Some("lzf"))
writeFile(newAppCompressedComplete, true, None,
- SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
+ SparkListenerApplicationStart(
+ "new-app-compressed-complete", Some("new-app-compressed-complete"), 1L, "test", None),
SparkListenerApplicationEnd(4L))
// Write an unfinished app, new-style.
val newAppIncomplete = newLogFile("new2", None, inProgress = true)
writeFile(newAppIncomplete, true, None,
- SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
+ SparkListenerApplicationStart(
+ "new-app-incomplete", Some("new-app-incomplete"), 1L, "test", None)
)
// Write an old-style application log.
@@ -89,7 +92,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
oldAppComplete.mkdir()
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
+ SparkListenerApplicationStart(
+ "old-app-complete", Some("old-app-complete"), 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
@@ -103,7 +107,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
oldAppIncomplete.mkdir()
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
+ SparkListenerApplicationStart(
+ "old-app-incomplete", Some("old-app-incomplete"), 2L, "test", None)
)
// Force a reload of data from the log directory, and check that both logs are loaded.
@@ -124,16 +129,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
}
- list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
+ list(0) should be (makeAppInfo("new-app-complete", "new-app-complete", 1L, 5L,
newAppComplete.lastModified(), "test", true))
- list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
+ list(1) should be (makeAppInfo("new-app-compressed-complete",
"new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
true))
- list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
+ list(2) should be (makeAppInfo("old-app-complete", "old-app-complete", 2L, 3L,
oldAppComplete.lastModified(), "test", true))
- list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
+ list(3) should be (makeAppInfo("old-app-incomplete", "old-app-incomplete", 2L, -1L,
oldAppIncomplete.lastModified(), "test", false))
- list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
+ list(4) should be (makeAppInfo("new-app-incomplete", "new-app-incomplete", 1L, -1L,
newAppIncomplete.lastModified(), "test", false))
// Make sure the UI can be rendered.
@@ -157,7 +162,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
logDir.mkdir()
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
- SparkListenerApplicationStart("app2", None, 2L, "test", None),
+ SparkListenerApplicationStart("app2", Some("app2"), 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
@@ -180,12 +185,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
test("SPARK-3697: ignore directories that cannot be read.") {
val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
- SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
+ SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
val logFile2 = newLogFile("new2", None, inProgress = false)
writeFile(logFile2, true, None,
- SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
+ SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
logFile2.setReadable(false, false)
@@ -218,6 +223,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}
+ test("Parse logs that application is not started") {
+ val provider = new FsHistoryProvider((createTestConf()))
+
+ val logFile1 = newLogFile("app1", None, inProgress = true)
+ writeFile(logFile1, true, None,
+ SparkListenerLogStart("1.4")
+ )
+ updateAndCheck(provider) { list =>
+ list.size should be (0)
+ }
+ }
+
test("SPARK-5582: empty log directory") {
val provider = new FsHistoryProvider(createTestConf())