aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorVinayak <vijoshi5@in.ibm.com>2016-11-11 12:54:16 -0600
committerTom Graves <tgraves@yahoo-inc.com>2016-11-11 12:54:16 -0600
commita531fe1a82ec515314f2db2e2305283fef24067f (patch)
treed7ffd900c5445da3fc6d6e3981574ef415c0c667 /core/src
parent4f15d94cfec86130f8dab28ae2e228ded8124020 (diff)
downloadspark-a531fe1a82ec515314f2db2e2305283fef24067f.tar.gz
spark-a531fe1a82ec515314f2db2e2305283fef24067f.tar.bz2
spark-a531fe1a82ec515314f2db2e2305283fef24067f.zip
[SPARK-17843][WEB UI] Indicate event logs pending for processing on history server UI
## What changes were proposed in this pull request? History Server UI's application listing to display information on currently under process event logs so a user knows that pending this processing an application may not list on the UI. When there are no event logs under process, the application list page has a "Last Updated" date-time at the top indicating the date-time of the last _completed_ scan of the event logs. The value is displayed to the user in his/her local time zone. ## How was this patch tested? All unit tests pass. Particularly all the suites under org.apache.spark.deploy.history.\* were run to test changes. - Very first startup - Pending logs - no logs processed yet: <img width="1280" alt="screen shot 2016-10-24 at 3 07 04 pm" src="https://cloud.githubusercontent.com/assets/12079825/19640981/b8d2a96a-99fc-11e6-9b1f-2d736fe90e48.png"> - Very first startup - Pending logs - some logs processed: <img width="1280" alt="screen shot 2016-10-24 at 3 18 42 pm" src="https://cloud.githubusercontent.com/assets/12079825/19641087/3f8e3bae-99fd-11e6-9ef1-e0e70d71d8ef.png"> - Last updated - No currently pending logs: <img width="1280" alt="screen shot 2016-10-17 at 8 34 37 pm" src="https://cloud.githubusercontent.com/assets/12079825/19443100/4d13946c-94a9-11e6-8ee2-c442729bb206.png"> - Last updated - With some currently pending logs: <img width="1280" alt="screen shot 2016-10-24 at 3 09 31 pm" src="https://cloud.githubusercontent.com/assets/12079825/19640903/7323ba3a-99fc-11e6-8359-6a45753dbb28.png"> - No applications found and No currently pending logs: <img width="1280" alt="screen shot 2016-10-24 at 3 24 26 pm" src="https://cloud.githubusercontent.com/assets/12079825/19641364/03a2cb04-99fe-11e6-87d6-d09587fc6201.png"> Author: Vinayak <vijoshi5@in.ibm.com> Closes #15410 from vijoshi/SAAS-608_master.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/historypage-common.js24
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala59
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala8
5 files changed, 116 insertions, 18 deletions
diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js b/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js
new file mode 100644
index 0000000000..55d540d831
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+$(document).ready(function() {
+ if ($('#last-updated').length) {
+ var lastUpdatedMillis = Number($('#last-updated').text());
+ var updatedDate = new Date(lastUpdatedMillis);
+ $('#last-updated').text(updatedDate.toLocaleDateString()+", "+updatedDate.toLocaleTimeString())
+ }
+});
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index 06530ff836..d7d82800b8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -75,6 +75,30 @@ private[history] case class LoadedAppUI(
private[history] abstract class ApplicationHistoryProvider {
/**
+ * Returns the count of application event logs that the provider is currently still processing.
+ * History Server UI can use this to indicate to a user that the application listing on the UI
+ * can be expected to list additional known applications once the processing of these
+ * application event logs completes.
+ *
+ * A History Provider that does not have a notion of count of event logs that may be pending
+ * for processing need not override this method.
+ *
+ * @return Count of application event logs that are currently under process
+ */
+ def getEventLogsUnderProcess(): Int = {
+ return 0;
+ }
+
+ /**
+ * Returns the time the history provider last updated the application history information
+ *
+ * @return 0 if this is undefined or unsupported, otherwise the last updated time in millis
+ */
+ def getLastUpdatedTime(): Long = {
+ return 0;
+ }
+
+ /**
* Returns a list of applications available for the history server to show.
*
* @return List of all know applications.
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index dfc1aad64c..ca38a47639 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.history
import java.io.{FileNotFoundException, IOException, OutputStream}
import java.util.UUID
-import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
+import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.mutable
@@ -108,7 +108,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// The modification time of the newest log detected during the last scan. Currently only
// used for logging msgs (logs are re-scanned based on file size, rather than modtime)
- private var lastScanTime = -1L
+ private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1)
// Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
// into the map in order, so the LinkedHashMap maintains the correct ordering.
@@ -120,6 +120,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// List of application logs to be deleted by event log cleaner.
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
+ private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0)
+
/**
* Return a runnable that performs the given operation on the event logs.
* This operation is expected to be executed periodically.
@@ -226,6 +228,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
applications.get(appId)
}
+ override def getEventLogsUnderProcess(): Int = pendingReplayTasksCount.get()
+
+ override def getLastUpdatedTime(): Long = lastScanTime.get()
+
override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
try {
applications.get(appId).flatMap { appInfo =>
@@ -329,26 +335,43 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
if (logInfos.nonEmpty) {
logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
}
- logInfos.map { file =>
- replayExecutor.submit(new Runnable {
+
+ var tasks = mutable.ListBuffer[Future[_]]()
+
+ try {
+ for (file <- logInfos) {
+ tasks += replayExecutor.submit(new Runnable {
override def run(): Unit = mergeApplicationListing(file)
})
}
- .foreach { task =>
- try {
- // Wait for all tasks to finish. This makes sure that checkForLogs
- // is not scheduled again while some tasks are already running in
- // the replayExecutor.
- task.get()
- } catch {
- case e: InterruptedException =>
- throw e
- case e: Exception =>
- logError("Exception while merging application listings", e)
- }
+ } catch {
+ // let the iteration over logInfos break, since an exception on
+ // replayExecutor.submit (..) indicates the ExecutorService is unable
+ // to take any more submissions at this time
+
+ case e: Exception =>
+ logError(s"Exception while submitting event log for replay", e)
+ }
+
+ pendingReplayTasksCount.addAndGet(tasks.size)
+
+ tasks.foreach { task =>
+ try {
+ // Wait for all tasks to finish. This makes sure that checkForLogs
+ // is not scheduled again while some tasks are already running in
+ // the replayExecutor.
+ task.get()
+ } catch {
+ case e: InterruptedException =>
+ throw e
+ case e: Exception =>
+ logError("Exception while merging application listings", e)
+ } finally {
+ pendingReplayTasksCount.decrementAndGet()
}
+ }
- lastScanTime = newLastScanTime
+ lastScanTime.set(newLastScanTime)
} catch {
case e: Exception => logError("Exception in checking for event log updates", e)
}
@@ -365,7 +388,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
} catch {
case e: Exception =>
logError("Exception encountered when attempting to update last scan time", e)
- lastScanTime
+ lastScanTime.get()
} finally {
if (!fs.delete(path, true)) {
logWarning(s"Error deleting ${path}")
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index 96b9ecf43b..0e7a6c24d4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -30,14 +30,31 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
val allAppsSize = parent.getApplicationList().count(_.completed != requestedIncomplete)
+ val eventLogsUnderProcessCount = parent.getEventLogsUnderProcess()
+ val lastUpdatedTime = parent.getLastUpdatedTime()
val providerConfig = parent.getProviderConfig()
val content =
+ <script src={UIUtils.prependBaseUri("/static/historypage-common.js")}></script>
<div>
<div class="span12">
<ul class="unstyled">
{providerConfig.map { case (k, v) => <li><strong>{k}:</strong> {v}</li> }}
</ul>
{
+ if (eventLogsUnderProcessCount > 0) {
+ <p>There are {eventLogsUnderProcessCount} event log(s) currently being
+ processed which may result in additional applications getting listed on this page.
+ Refresh the page to view updates. </p>
+ }
+ }
+
+ {
+ if (lastUpdatedTime > 0) {
+ <p>Last updated: <span id="last-updated">{lastUpdatedTime}</span></p>
+ }
+ }
+
+ {
if (allAppsSize > 0) {
<script src={UIUtils.prependBaseUri("/static/dataTables.rowsGroup.js")}></script> ++
<div id="history-summary" class="span12 pagination"></div> ++
@@ -46,6 +63,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
<script>setAppLimit({parent.maxApplications})</script>
} else if (requestedIncomplete) {
<h4>No incomplete applications found!</h4>
+ } else if (eventLogsUnderProcessCount > 0) {
+ <h4>No completed applications found!</h4>
} else {
<h4>No completed applications found!</h4> ++ parent.emptyListingHtml
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 3175b36b3e..7e21fa681a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -179,6 +179,14 @@ class HistoryServer(
provider.getListing()
}
+ def getEventLogsUnderProcess(): Int = {
+ provider.getEventLogsUnderProcess()
+ }
+
+ def getLastUpdatedTime(): Long = {
+ provider.getLastUpdatedTime()
+ }
+
def getApplicationInfoList: Iterator[ApplicationInfo] = {
getApplicationList().map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
}