aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorVinayak <vijoshi5@in.ibm.com>2016-10-25 10:36:03 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-10-25 10:36:03 -0700
commitc5fe3dd4f59c464c830b414acccd3cca0fdd877c (patch)
tree3a7019e1b0e159d528c0da914ac2ff02680d0d8b /core/src
parentac8ff920faec6ee06e17212e2b5d2ee117495e87 (diff)
downloadspark-c5fe3dd4f59c464c830b414acccd3cca0fdd877c.tar.gz
spark-c5fe3dd4f59c464c830b414acccd3cca0fdd877c.tar.bz2
spark-c5fe3dd4f59c464c830b414acccd3cca0fdd877c.zip
[SPARK-18010][CORE] Reduce work performed for building up the application list for the History Server app list UI page
## What changes were proposed in this pull request? allow ReplayListenerBus to skip deserialising and replaying certain events using an inexpensive check of the event log entry. Use this to ensure that when event log replay is triggered for building the application list, we get the ReplayListenerBus to skip over all but the few events needed for our immediate purpose. Refer [SPARK-18010] for the motivation behind this change. ## How was this patch tested? Tested with existing HistoryServer and ReplayListener unit test suites. All tests pass. Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. Author: Vinayak <vijoshi5@in.ibm.com> Closes #15556 from vijoshi/SAAS-467_master.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala120
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala39
2 files changed, 101 insertions, 58 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 530cc52522..dfc1aad64c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -36,6 +36,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.ReplayListenerBus._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
@@ -78,10 +79,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
import FsHistoryProvider._
- private val NOT_STARTED = "<Not Started>"
-
- private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"
-
// Interval between safemode checks.
private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
"spark.history.fs.safemodeCheck.interval", "5s")
@@ -241,11 +238,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime)
// Do not call ui.bind() to avoid creating a new server for each application
}
- val appListener = new ApplicationEventListener()
- replayBus.addListener(appListener)
- val appAttemptInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)),
- replayBus)
- appAttemptInfo.map { info =>
+
+ val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
+
+ val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
+
+ if (appListener.appId.isDefined) {
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so they are properly picked up
@@ -254,8 +252,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
appListener.viewAcls.getOrElse(""))
ui.getSecurityManager.setAdminAclsGroups(appListener.adminAclsGroups.getOrElse(""))
ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse(""))
- LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize))
+ Some(LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize)))
+ } else {
+ None
}
+
}
}
} catch {
@@ -411,28 +412,54 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}
-
/**
* Replay the log files in the list and merge the list of old applications with new ones
*/
private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
val newAttempts = try {
- val bus = new ReplayListenerBus()
- val res = replay(fileStatus, bus)
- res match {
- case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully: $r")
- case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
- "The application may have not started.")
- }
- res
- } catch {
- case e: Exception =>
- logError(
- s"Exception encountered when attempting to load application log ${fileStatus.getPath}",
- e)
- None
+ val eventsFilter: ReplayEventsFilter = { eventString =>
+ eventString.startsWith(APPL_START_EVENT_PREFIX) ||
+ eventString.startsWith(APPL_END_EVENT_PREFIX)
+ }
+
+ val logPath = fileStatus.getPath()
+
+ val appCompleted = isApplicationCompleted(fileStatus)
+
+ val appListener = replay(fileStatus, appCompleted, new ReplayListenerBus(), eventsFilter)
+
+ // Without an app ID, new logs will render incorrectly in the listing page, so do not list or
+ // try to show their UI.
+ if (appListener.appId.isDefined) {
+ val attemptInfo = new FsApplicationAttemptInfo(
+ logPath.getName(),
+ appListener.appName.getOrElse(NOT_STARTED),
+ appListener.appId.getOrElse(logPath.getName()),
+ appListener.appAttemptId,
+ appListener.startTime.getOrElse(-1L),
+ appListener.endTime.getOrElse(-1L),
+ fileStatus.getModificationTime(),
+ appListener.sparkUser.getOrElse(NOT_STARTED),
+ appCompleted,
+ fileStatus.getLen()
+ )
+ fileToAppInfo(logPath) = attemptInfo
+ logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo")
+ Some(attemptInfo)
+ } else {
+ logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
+ "The application may have not started.")
+ None
}
+ } catch {
+ case e: Exception =>
+ logError(
+ s"Exception encountered when attempting to load application log ${fileStatus.getPath}",
+ e)
+ None
+ }
+
if (newAttempts.isEmpty) {
return
}
@@ -564,12 +591,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
/**
- * Replays the events in the specified log file and returns information about the associated
- * application. Return `None` if the application ID cannot be located.
+ * Replays the events in the specified log file on the supplied `ReplayListenerBus`. Returns
+ * an `ApplicationEventListener` instance with event data captured from the replay.
+ * `ReplayEventsFilter` determines what events are replayed and can therefore limit the
+ * data captured in the returned `ApplicationEventListener` instance.
*/
private def replay(
eventLog: FileStatus,
- bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
+ appCompleted: Boolean,
+ bus: ReplayListenerBus,
+ eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): ApplicationEventListener = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
// Note that the eventLog may have *increased* in size since when we grabbed the filestatus,
@@ -581,30 +612,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val logInput = EventLoggingListener.openEventLog(logPath, fs)
try {
val appListener = new ApplicationEventListener
- val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
- bus.replay(logInput, logPath.toString, !appCompleted)
-
- // Without an app ID, new logs will render incorrectly in the listing page, so do not list or
- // try to show their UI.
- if (appListener.appId.isDefined) {
- val attemptInfo = new FsApplicationAttemptInfo(
- logPath.getName(),
- appListener.appName.getOrElse(NOT_STARTED),
- appListener.appId.getOrElse(logPath.getName()),
- appListener.appAttemptId,
- appListener.startTime.getOrElse(-1L),
- appListener.endTime.getOrElse(-1L),
- eventLog.getModificationTime(),
- appListener.sparkUser.getOrElse(NOT_STARTED),
- appCompleted,
- eventLog.getLen()
- )
- fileToAppInfo(logPath) = attemptInfo
- Some(attemptInfo)
- } else {
- None
- }
+ bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter)
+ appListener
} finally {
logInput.close()
}
@@ -689,6 +699,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private[history] object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+ private val NOT_STARTED = "<Not Started>"
+
+ private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"
+
+ private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\""
+
+ private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\""
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index d32f5eb7bf..3eff8d952b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.core.JsonParseException
import org.json4s.jackson.JsonMethods._
import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ReplayListenerBus._
import org.apache.spark.util.JsonProtocol
/**
@@ -43,30 +44,45 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
* @param sourceName Filename (or other source identifier) from whence @logData is being read
* @param maybeTruncated Indicate whether log file might be truncated (some abnormal situations
* encountered, log file might not finished writing) or not
+ * @param eventsFilter Filter function to select JSON event strings in the log data stream that
+ * should be parsed and replayed. When not specified, all event strings in the log data
+ * are parsed and replayed.
*/
def replay(
logData: InputStream,
sourceName: String,
- maybeTruncated: Boolean = false): Unit = {
+ maybeTruncated: Boolean = false,
+ eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
+
var currentLine: String = null
- var lineNumber: Int = 1
+ var lineNumber: Int = 0
+
try {
- val lines = Source.fromInputStream(logData).getLines()
- while (lines.hasNext) {
- currentLine = lines.next()
+ val lineEntries = Source.fromInputStream(logData)
+ .getLines()
+ .zipWithIndex
+ .filter { case (line, _) => eventsFilter(line) }
+
+ while (lineEntries.hasNext) {
try {
+ val entry = lineEntries.next()
+
+ currentLine = entry._1
+ lineNumber = entry._2 + 1
+
postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
} catch {
case jpe: JsonParseException =>
// We can only ignore exception from last line of the file that might be truncated
- if (!maybeTruncated || lines.hasNext) {
+ // the last entry may not be the very last line in the event log, but we treat it
+ // as such in a best effort to replay the given input
+ if (!maybeTruncated || lineEntries.hasNext) {
throw jpe
} else {
logWarning(s"Got JsonParseException from log file $sourceName" +
s" at line $lineNumber, the file might not have finished writing cleanly.")
}
}
- lineNumber += 1
}
} catch {
case ioe: IOException =>
@@ -78,3 +94,12 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
}
}
+
+
+private[spark] object ReplayListenerBus {
+
+ type ReplayEventsFilter = (String) => Boolean
+
+ // utility filter that selects all event logs during replay
+ val SELECT_ALL_FILTER: ReplayEventsFilter = { (eventString: String) => true }
+}