From ea88b1a5077e6ba980b0de6d3bc508c62285ba4c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 29 Jun 2015 10:52:05 -0700 Subject: Revert "[SPARK-8372] History server shows incorrect information for application not started" This reverts commit 2837e067099921dd4ab6639ac5f6e89f789d4ff4. --- .../spark/deploy/history/FsHistoryProvider.scala | 38 +++++++++------------- 1 file changed, 15 insertions(+), 23 deletions(-) (limited to 'core/src/main') diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index db383b9823..5427a88f32 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -160,7 +160,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) replayBus.addListener(appListener) val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus) - appInfo.foreach { app => ui.setAppName(s"${app.name} ($appId)") } + ui.setAppName(s"${appInfo.name} ($appId)") val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) ui.getSecurityManager.setAcls(uiAclsEnabled) @@ -282,12 +282,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val newAttempts = logs.flatMap { fileStatus => try { val res = replay(fileStatus, bus) - res match { - case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.") - case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " + - "The application may have not started.") - } - res + logInfo(s"Application log ${res.logPath} loaded successfully.") + Some(res) } catch { case e: Exception => logError( @@ -433,11 +429,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) /** * Replays the events in the specified log file and returns information about the associated - * application. Return `None` if the application ID cannot be located. + * application. */ - private def replay( - eventLog: FileStatus, - bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = { + private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = { val logPath = eventLog.getPath() logInfo(s"Replaying log path: $logPath") val logInput = @@ -451,18 +445,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val appCompleted = isApplicationCompleted(eventLog) bus.addListener(appListener) bus.replay(logInput, logPath.toString, !appCompleted) - appListener.appId.map { appId => - new FsApplicationAttemptInfo( - logPath.getName(), - appListener.appName.getOrElse(NOT_STARTED), - appId, - appListener.appAttemptId, - appListener.startTime.getOrElse(-1L), - appListener.endTime.getOrElse(-1L), - getModificationTime(eventLog).get, - appListener.sparkUser.getOrElse(NOT_STARTED), - appCompleted) - } + new FsApplicationAttemptInfo( + logPath.getName(), + appListener.appName.getOrElse(NOT_STARTED), + appListener.appId.getOrElse(logPath.getName()), + appListener.appAttemptId, + appListener.startTime.getOrElse(-1L), + appListener.endTime.getOrElse(-1L), + getModificationTime(eventLog).get, + appListener.sparkUser.getOrElse(NOT_STARTED), + appCompleted) } finally { logInput.close() } -- cgit v1.2.3