aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMasayoshi TSUZUKI <tsudukim@oss.nttdata.co.jp>2015-01-07 07:32:16 -0800
committerAndrew Or <andrew@databricks.com>2015-01-07 07:32:53 -0800
commit6e74edeca31acd7dc84a34402e430e017591d858 (patch)
tree2a72a4e51c839eb47008faa6829ac22b2298efa5 /core
parent8fdd48959c93b9cf809f03549e2ae6c4687d1fcd (diff)
downloadspark-6e74edeca31acd7dc84a34402e430e017591d858.tar.gz
spark-6e74edeca31acd7dc84a34402e430e017591d858.tar.bz2
spark-6e74edeca31acd7dc84a34402e430e017591d858.zip
[SPARK-2458] Make failed application log visible on History Server
Enabled HistoryServer to show incomplete applications. We can see the log for incomplete applications by clicking the bottom link. Author: Masayoshi TSUZUKI <tsudukim@oss.nttdata.co.jp> Closes #3467 from tsudukim/feature/SPARK-2458-2 and squashes the following commits: 76205d2 [Masayoshi TSUZUKI] Fixed and added test code. 29a04a9 [Masayoshi TSUZUKI] Merge branch 'master' of github.com:tsudukim/spark into feature/SPARK-2458-2 f9ef854 [Masayoshi TSUZUKI] Added space between "if" and "(". Fixed "Incomplete" as capitalized in the web UI. Modified double negative variable name. 9b465b0 [Masayoshi TSUZUKI] Modified typo and better implementation. 3ed8a41 [Masayoshi TSUZUKI] Modified too long lines. 08ea14d [Masayoshi TSUZUKI] [SPARK-2458] Make failed application log visible on History Server
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala38
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala60
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala14
4 files changed, 72 insertions, 43 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index fbe39b2764..553bf3cb94 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -25,7 +25,8 @@ private[spark] case class ApplicationHistoryInfo(
startTime: Long,
endTime: Long,
lastUpdated: Long,
- sparkUser: String)
+ sparkUser: String,
+ completed: Boolean = false)
private[spark] abstract class ApplicationHistoryProvider {
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 792d15b99e..2b084a2d73 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -173,20 +173,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val logInfos = statusList
.filter { entry =>
try {
- val isFinishedApplication =
- if (isLegacyLogDirectory(entry)) {
- fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
- } else {
- !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
- }
-
- if (isFinishedApplication) {
- val modTime = getModificationTime(entry)
- newLastModifiedTime = math.max(newLastModifiedTime, modTime)
- modTime >= lastModifiedTime
- } else {
- false
- }
+ val modTime = getModificationTime(entry)
+ newLastModifiedTime = math.max(newLastModifiedTime, modTime)
+ modTime >= lastModifiedTime
} catch {
case e: AccessControlException =>
// Do not use "logInfo" since these messages can get pretty noisy if printed on
@@ -204,7 +193,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
None
}
}
- .sortBy { info => -info.endTime }
+ .sortBy { info => (-info.endTime, -info.startTime) }
lastModifiedTime = newLastModifiedTime
@@ -261,7 +250,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog),
- appListener.sparkUser.getOrElse(NOT_STARTED))
+ appListener.sparkUser.getOrElse(NOT_STARTED),
+ isApplicationCompleted(eventLog))
} finally {
logInput.close()
}
@@ -329,6 +319,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
/** Returns the system's mononotically increasing time. */
private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000)
+ /**
+ * Return true when the application has completed.
+ */
+ private def isApplicationCompleted(entry: FileStatus): Boolean = {
+ if (isLegacyLogDirectory(entry)) {
+ fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
+ } else {
+ !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
+ }
+ }
+
}
private object FsHistoryProvider {
@@ -342,5 +343,6 @@ private class FsApplicationHistoryInfo(
startTime: Long,
endTime: Long,
lastUpdated: Long,
- sparkUser: String)
- extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser)
+ sparkUser: String,
+ completed: Boolean = true)
+ extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index 0d5dcfb1dd..e4e7bc2216 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -31,8 +31,10 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt
val requestedFirst = (requestedPage - 1) * pageSize
+ val requestedIncomplete =
+ Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
- val allApps = parent.getApplicationList()
+ val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete)
val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))
@@ -65,25 +67,26 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
<h4>
Showing {actualFirst + 1}-{last + 1} of {allApps.size}
- <span style="float: right">
- {
- if (actualPage > 1) {
- <a href={"/?page=" + (actualPage - 1)}>&lt; </a>
- <a href={"/?page=1"}>1</a>
- }
+ {if (requestedIncomplete) "(Incomplete applications)"}
+ <span style="float: right">
+ {
+ if (actualPage > 1) {
+ <a href={makePageLink(actualPage - 1, requestedIncomplete)}>&lt; </a>
+ <a href={makePageLink(1, requestedIncomplete)}>1</a>
}
- {if (actualPage - plusOrMinus > secondPageFromLeft) " ... "}
- {leftSideIndices}
- {actualPage}
- {rightSideIndices}
- {if (actualPage + plusOrMinus < secondPageFromRight) " ... "}
- {
- if (actualPage < pageCount) {
- <a href={"/?page=" + pageCount}>{pageCount}</a>
- <a href={"/?page=" + (actualPage + 1)}> &gt;</a>
- }
+ }
+ {if (actualPage - plusOrMinus > secondPageFromLeft) " ... "}
+ {leftSideIndices}
+ {actualPage}
+ {rightSideIndices}
+ {if (actualPage + plusOrMinus < secondPageFromRight) " ... "}
+ {
+ if (actualPage < pageCount) {
+ <a href={makePageLink(pageCount, requestedIncomplete)}>{pageCount}</a>
+ <a href={makePageLink(actualPage + 1, requestedIncomplete)}> &gt;</a>
}
- </span>
+ }
+ </span>
</h4> ++
appTable
} else {
@@ -96,6 +99,15 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
</p>
}
}
+ <a href={makePageLink(actualPage, !requestedIncomplete)}>
+ {
+ if (requestedIncomplete) {
+ "Back to completed applications"
+ } else {
+ "Show incomplete applications"
+ }
+ }
+ </a>
</div>
</div>
UIUtils.basicSparkPage(content, "History Server")
@@ -117,8 +129,9 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
val startTime = UIUtils.formatDate(info.startTime)
- val endTime = UIUtils.formatDate(info.endTime)
- val duration = UIUtils.formatDuration(info.endTime - info.startTime)
+ val endTime = if (info.endTime > 0) UIUtils.formatDate(info.endTime) else "-"
+ val duration =
+ if (info.endTime > 0) UIUtils.formatDuration(info.endTime - info.startTime) else "-"
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
<tr>
<td><a href={uiAddress}>{info.id}</a></td>
@@ -130,4 +143,11 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
<td sorttable_customkey={info.lastUpdated.toString}>{lastUpdated}</td>
</tr>
}
+
+ private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = {
+ "/?" + Array(
+ "page=" + linkPage,
+ "showIncomplete=" + showIncomplete
+ ).mkString("&")
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index d719e9301f..8379883e06 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -64,7 +64,8 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
)
// Write an unfinished app, new-style.
- writeFile(new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS), true, None,
+ val logFile2 = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS)
+ writeFile(logFile2, true, None,
SparkListenerApplicationStart("app2-2", None, 1L, "test")
)
@@ -92,12 +93,17 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
val list = provider.getListing().toSeq
list should not be (null)
- list.size should be (2)
+ list.size should be (4)
+ list.count(e => e.completed) should be (2)
list(0) should be (ApplicationHistoryInfo(oldLog.getName(), "app3", 2L, 3L,
- oldLog.lastModified(), "test"))
+ oldLog.lastModified(), "test", true))
list(1) should be (ApplicationHistoryInfo(logFile1.getName(), "app1-1", 1L, 2L,
- logFile1.lastModified(), "test"))
+ logFile1.lastModified(), "test", true))
+ list(2) should be (ApplicationHistoryInfo(oldLog2.getName(), "app4", 2L, -1L,
+ oldLog2.lastModified(), "test", false))
+ list(3) should be (ApplicationHistoryInfo(logFile2.getName(), "app2-2", 1L, -1L,
+ logFile2.lastModified(), "test", false))
// Make sure the UI can be rendered.
list.foreach { case info =>