aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala120
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala39
2 files changed, 101 insertions, 58 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 530cc52522..dfc1aad64c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -36,6 +36,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.ReplayListenerBus._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
@@ -78,10 +79,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
import FsHistoryProvider._
- private val NOT_STARTED = "<Not Started>"
-
- private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"
-
// Interval between safemode checks.
private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
"spark.history.fs.safemodeCheck.interval", "5s")
@@ -241,11 +238,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime)
// Do not call ui.bind() to avoid creating a new server for each application
}
- val appListener = new ApplicationEventListener()
- replayBus.addListener(appListener)
- val appAttemptInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)),
- replayBus)
- appAttemptInfo.map { info =>
+
+ val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
+
+ val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
+
+ if (appListener.appId.isDefined) {
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so they are properly picked up
@@ -254,8 +252,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
appListener.viewAcls.getOrElse(""))
ui.getSecurityManager.setAdminAclsGroups(appListener.adminAclsGroups.getOrElse(""))
ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse(""))
- LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize))
+ Some(LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize)))
+ } else {
+ None
}
+
}
}
} catch {
@@ -411,28 +412,54 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}
-
/**
* Replay the log files in the list and merge the list of old applications with new ones
*/
private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
val newAttempts = try {
- val bus = new ReplayListenerBus()
- val res = replay(fileStatus, bus)
- res match {
- case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully: $r")
- case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
- "The application may have not started.")
- }
- res
- } catch {
- case e: Exception =>
- logError(
- s"Exception encountered when attempting to load application log ${fileStatus.getPath}",
- e)
- None
+ val eventsFilter: ReplayEventsFilter = { eventString =>
+ eventString.startsWith(APPL_START_EVENT_PREFIX) ||
+ eventString.startsWith(APPL_END_EVENT_PREFIX)
+ }
+
+ val logPath = fileStatus.getPath()
+
+ val appCompleted = isApplicationCompleted(fileStatus)
+
+ val appListener = replay(fileStatus, appCompleted, new ReplayListenerBus(), eventsFilter)
+
+ // Without an app ID, new logs will render incorrectly in the listing page, so do not list or
+ // try to show their UI.
+ if (appListener.appId.isDefined) {
+ val attemptInfo = new FsApplicationAttemptInfo(
+ logPath.getName(),
+ appListener.appName.getOrElse(NOT_STARTED),
+ appListener.appId.getOrElse(logPath.getName()),
+ appListener.appAttemptId,
+ appListener.startTime.getOrElse(-1L),
+ appListener.endTime.getOrElse(-1L),
+ fileStatus.getModificationTime(),
+ appListener.sparkUser.getOrElse(NOT_STARTED),
+ appCompleted,
+ fileStatus.getLen()
+ )
+ fileToAppInfo(logPath) = attemptInfo
+ logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo")
+ Some(attemptInfo)
+ } else {
+ logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
+ "The application may have not started.")
+ None
}
+ } catch {
+ case e: Exception =>
+ logError(
+ s"Exception encountered when attempting to load application log ${fileStatus.getPath}",
+ e)
+ None
+ }
+
if (newAttempts.isEmpty) {
return
}
@@ -564,12 +591,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
/**
- * Replays the events in the specified log file and returns information about the associated
- * application. Return `None` if the application ID cannot be located.
+ * Replays the events in the specified log file on the supplied `ReplayListenerBus`. Returns
+ * an `ApplicationEventListener` instance with event data captured from the replay.
+ * `ReplayEventsFilter` determines what events are replayed and can therefore limit the
+ * data captured in the returned `ApplicationEventListener` instance.
*/
private def replay(
eventLog: FileStatus,
- bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
+ appCompleted: Boolean,
+ bus: ReplayListenerBus,
+ eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): ApplicationEventListener = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
// Note that the eventLog may have *increased* in size since when we grabbed the filestatus,
@@ -581,30 +612,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val logInput = EventLoggingListener.openEventLog(logPath, fs)
try {
val appListener = new ApplicationEventListener
- val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
- bus.replay(logInput, logPath.toString, !appCompleted)
-
- // Without an app ID, new logs will render incorrectly in the listing page, so do not list or
- // try to show their UI.
- if (appListener.appId.isDefined) {
- val attemptInfo = new FsApplicationAttemptInfo(
- logPath.getName(),
- appListener.appName.getOrElse(NOT_STARTED),
- appListener.appId.getOrElse(logPath.getName()),
- appListener.appAttemptId,
- appListener.startTime.getOrElse(-1L),
- appListener.endTime.getOrElse(-1L),
- eventLog.getModificationTime(),
- appListener.sparkUser.getOrElse(NOT_STARTED),
- appCompleted,
- eventLog.getLen()
- )
- fileToAppInfo(logPath) = attemptInfo
- Some(attemptInfo)
- } else {
- None
- }
+ bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter)
+ appListener
} finally {
logInput.close()
}
@@ -689,6 +699,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private[history] object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+ private val NOT_STARTED = "<Not Started>"
+
+ private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"
+
+ private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\""
+
+ private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\""
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index d32f5eb7bf..3eff8d952b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.core.JsonParseException
import org.json4s.jackson.JsonMethods._
import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ReplayListenerBus._
import org.apache.spark.util.JsonProtocol
/**
@@ -43,30 +44,45 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
* @param sourceName Filename (or other source identifier) from whence @logData is being read
* @param maybeTruncated Indicate whether log file might be truncated (some abnormal situations
* encountered, log file might not finished writing) or not
+ * @param eventsFilter Filter function to select JSON event strings in the log data stream that
+ * should be parsed and replayed. When not specified, all event strings in the log data
+ * are parsed and replayed.
*/
def replay(
logData: InputStream,
sourceName: String,
- maybeTruncated: Boolean = false): Unit = {
+ maybeTruncated: Boolean = false,
+ eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
+
var currentLine: String = null
- var lineNumber: Int = 1
+ var lineNumber: Int = 0
+
try {
- val lines = Source.fromInputStream(logData).getLines()
- while (lines.hasNext) {
- currentLine = lines.next()
+ val lineEntries = Source.fromInputStream(logData)
+ .getLines()
+ .zipWithIndex
+ .filter { case (line, _) => eventsFilter(line) }
+
+ while (lineEntries.hasNext) {
try {
+ val entry = lineEntries.next()
+
+ currentLine = entry._1
+ lineNumber = entry._2 + 1
+
postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
} catch {
case jpe: JsonParseException =>
// We can only ignore exception from last line of the file that might be truncated
- if (!maybeTruncated || lines.hasNext) {
+ // the last entry may not be the very last line in the event log, but we treat it
+ // as such in a best effort to replay the given input
+ if (!maybeTruncated || lineEntries.hasNext) {
throw jpe
} else {
logWarning(s"Got JsonParseException from log file $sourceName" +
s" at line $lineNumber, the file might not have finished writing cleanly.")
}
}
- lineNumber += 1
}
} catch {
case ioe: IOException =>
@@ -78,3 +94,12 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
}
}
+
+
+private[spark] object ReplayListenerBus {
+
+ type ReplayEventsFilter = (String) => Boolean
+
+ // utility filter that selects all event logs during replay
+ val SELECT_ALL_FILTER: ReplayEventsFilter = { (eventString: String) => true }
+}