aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala38
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala60
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala14
4 files changed, 72 insertions, 43 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index fbe39b2764..553bf3cb94 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -25,7 +25,8 @@ private[spark] case class ApplicationHistoryInfo(
startTime: Long,
endTime: Long,
lastUpdated: Long,
- sparkUser: String)
+ sparkUser: String,
+ completed: Boolean = false)
private[spark] abstract class ApplicationHistoryProvider {
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 792d15b99e..2b084a2d73 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -173,20 +173,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val logInfos = statusList
.filter { entry =>
try {
- val isFinishedApplication =
- if (isLegacyLogDirectory(entry)) {
- fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
- } else {
- !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
- }
-
- if (isFinishedApplication) {
- val modTime = getModificationTime(entry)
- newLastModifiedTime = math.max(newLastModifiedTime, modTime)
- modTime >= lastModifiedTime
- } else {
- false
- }
+ val modTime = getModificationTime(entry)
+ newLastModifiedTime = math.max(newLastModifiedTime, modTime)
+ modTime >= lastModifiedTime
} catch {
case e: AccessControlException =>
// Do not use "logInfo" since these messages can get pretty noisy if printed on
@@ -204,7 +193,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
None
}
}
- .sortBy { info => -info.endTime }
+ .sortBy { info => (-info.endTime, -info.startTime) }
lastModifiedTime = newLastModifiedTime
@@ -261,7 +250,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog),
- appListener.sparkUser.getOrElse(NOT_STARTED))
+ appListener.sparkUser.getOrElse(NOT_STARTED),
+ isApplicationCompleted(eventLog))
} finally {
logInput.close()
}
@@ -329,6 +319,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
/** Returns the system's mononotically increasing time. */
private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000)
+ /**
+ * Return true when the application has completed.
+ */
+ private def isApplicationCompleted(entry: FileStatus): Boolean = {
+ if (isLegacyLogDirectory(entry)) {
+ fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
+ } else {
+ !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
+ }
+ }
+
}
private object FsHistoryProvider {
@@ -342,5 +343,6 @@ private class FsApplicationHistoryInfo(
startTime: Long,
endTime: Long,
lastUpdated: Long,
- sparkUser: String)
- extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser)
+ sparkUser: String,
+ completed: Boolean = true)
+ extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index 0d5dcfb1dd..e4e7bc2216 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -31,8 +31,10 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt
val requestedFirst = (requestedPage - 1) * pageSize
+ val requestedIncomplete =
+ Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
- val allApps = parent.getApplicationList()
+ val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete)
val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))
@@ -65,25 +67,26 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
<h4>
Showing {actualFirst + 1}-{last + 1} of {allApps.size}
- <span style="float: right">
- {
- if (actualPage > 1) {
- <a href={"/?page=" + (actualPage - 1)}>&lt; </a>
- <a href={"/?page=1"}>1</a>
- }
+ {if (requestedIncomplete) "(Incomplete applications)"}
+ <span style="float: right">
+ {
+ if (actualPage > 1) {
+ <a href={makePageLink(actualPage - 1, requestedIncomplete)}>&lt; </a>
+ <a href={makePageLink(1, requestedIncomplete)}>1</a>
}
- {if (actualPage - plusOrMinus > secondPageFromLeft) " ... "}
- {leftSideIndices}
- {actualPage}
- {rightSideIndices}
- {if (actualPage + plusOrMinus < secondPageFromRight) " ... "}
- {
- if (actualPage < pageCount) {
- <a href={"/?page=" + pageCount}>{pageCount}</a>
- <a href={"/?page=" + (actualPage + 1)}> &gt;</a>
- }
+ }
+ {if (actualPage - plusOrMinus > secondPageFromLeft) " ... "}
+ {leftSideIndices}
+ {actualPage}
+ {rightSideIndices}
+ {if (actualPage + plusOrMinus < secondPageFromRight) " ... "}
+ {
+ if (actualPage < pageCount) {
+ <a href={makePageLink(pageCount, requestedIncomplete)}>{pageCount}</a>
+ <a href={makePageLink(actualPage + 1, requestedIncomplete)}> &gt;</a>
}
- </span>
+ }
+ </span>
</h4> ++
appTable
} else {
@@ -96,6 +99,15 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
</p>
}
}
+ <a href={makePageLink(actualPage, !requestedIncomplete)}>
+ {
+ if (requestedIncomplete) {
+ "Back to completed applications"
+ } else {
+ "Show incomplete applications"
+ }
+ }
+ </a>
</div>
</div>
UIUtils.basicSparkPage(content, "History Server")
@@ -117,8 +129,9 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
val startTime = UIUtils.formatDate(info.startTime)
- val endTime = UIUtils.formatDate(info.endTime)
- val duration = UIUtils.formatDuration(info.endTime - info.startTime)
+ val endTime = if (info.endTime > 0) UIUtils.formatDate(info.endTime) else "-"
+ val duration =
+ if (info.endTime > 0) UIUtils.formatDuration(info.endTime - info.startTime) else "-"
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
<tr>
<td><a href={uiAddress}>{info.id}</a></td>
@@ -130,4 +143,11 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
<td sorttable_customkey={info.lastUpdated.toString}>{lastUpdated}</td>
</tr>
}
+
+ private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = {
+ "/?" + Array(
+ "page=" + linkPage,
+ "showIncomplete=" + showIncomplete
+ ).mkString("&")
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index d719e9301f..8379883e06 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -64,7 +64,8 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
)
// Write an unfinished app, new-style.
- writeFile(new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS), true, None,
+ val logFile2 = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS)
+ writeFile(logFile2, true, None,
SparkListenerApplicationStart("app2-2", None, 1L, "test")
)
@@ -92,12 +93,17 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
val list = provider.getListing().toSeq
list should not be (null)
- list.size should be (2)
+ list.size should be (4)
+ list.count(e => e.completed) should be (2)
list(0) should be (ApplicationHistoryInfo(oldLog.getName(), "app3", 2L, 3L,
- oldLog.lastModified(), "test"))
+ oldLog.lastModified(), "test", true))
list(1) should be (ApplicationHistoryInfo(logFile1.getName(), "app1-1", 1L, 2L,
- logFile1.lastModified(), "test"))
+ logFile1.lastModified(), "test", true))
+ list(2) should be (ApplicationHistoryInfo(oldLog2.getName(), "app4", 2L, -1L,
+ oldLog2.lastModified(), "test", false))
+ list(3) should be (ApplicationHistoryInfo(logFile2.getName(), "app2-2", 1L, -1L,
+ logFile2.lastModified(), "test", false))
// Make sure the UI can be rendered.
list.foreach { case info =>