diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala | 120 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala | 39 |
2 files changed, 101 insertions, 58 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 530cc52522..dfc1aad64c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -36,6 +36,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.ReplayListenerBus._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} @@ -78,10 +79,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) import FsHistoryProvider._ - private val NOT_STARTED = "<Not Started>" - - private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads" - // Interval between safemode checks. private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds( "spark.history.fs.safemodeCheck.interval", "5s") @@ -241,11 +238,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime) // Do not call ui.bind() to avoid creating a new server for each application } - val appListener = new ApplicationEventListener() - replayBus.addListener(appListener) - val appAttemptInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), - replayBus) - appAttemptInfo.map { info => + + val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) + + val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) + + if (appListener.appId.isDefined) { val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) ui.getSecurityManager.setAcls(uiAclsEnabled) // make sure to set admin acls before view acls so they are properly picked up @@ -254,8 +252,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) appListener.viewAcls.getOrElse("")) ui.getSecurityManager.setAdminAclsGroups(appListener.adminAclsGroups.getOrElse("")) ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) - LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize)) + Some(LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize))) + } else { + None } + } } } catch { @@ -411,28 +412,54 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - /** * Replay the log files in the list and merge the list of old applications with new ones */ private def mergeApplicationListing(fileStatus: FileStatus): Unit = { val newAttempts = try { - val bus = new ReplayListenerBus() - val res = replay(fileStatus, bus) - res match { - case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully: $r") - case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " + - "The application may have not started.") - } - res - } catch { - case e: Exception => - logError( - s"Exception encountered when attempting to load application log ${fileStatus.getPath}", - e) - None + val eventsFilter: ReplayEventsFilter = { eventString => + eventString.startsWith(APPL_START_EVENT_PREFIX) || + eventString.startsWith(APPL_END_EVENT_PREFIX) + } + + val logPath = fileStatus.getPath() + + val appCompleted = isApplicationCompleted(fileStatus) + + val appListener = replay(fileStatus, appCompleted, new ReplayListenerBus(), eventsFilter) + + // Without an app ID, new logs will render incorrectly in the listing page, so do not list or + // try to show their UI. + if (appListener.appId.isDefined) { + val attemptInfo = new FsApplicationAttemptInfo( + logPath.getName(), + appListener.appName.getOrElse(NOT_STARTED), + appListener.appId.getOrElse(logPath.getName()), + appListener.appAttemptId, + appListener.startTime.getOrElse(-1L), + appListener.endTime.getOrElse(-1L), + fileStatus.getModificationTime(), + appListener.sparkUser.getOrElse(NOT_STARTED), + appCompleted, + fileStatus.getLen() + ) + fileToAppInfo(logPath) = attemptInfo + logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo") + Some(attemptInfo) + } else { + logWarning(s"Failed to load application log ${fileStatus.getPath}. " + + "The application may have not started.") + None } + } catch { + case e: Exception => + logError( + s"Exception encountered when attempting to load application log ${fileStatus.getPath}", + e) + None + } + if (newAttempts.isEmpty) { return } @@ -564,12 +591,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } /** - * Replays the events in the specified log file and returns information about the associated - * application. Return `None` if the application ID cannot be located. + * Replays the events in the specified log file on the supplied `ReplayListenerBus`. Returns + * an `ApplicationEventListener` instance with event data captured from the replay. + * `ReplayEventsFilter` determines what events are replayed and can therefore limit the + * data captured in the returned `ApplicationEventListener` instance. */ private def replay( eventLog: FileStatus, - bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = { + appCompleted: Boolean, + bus: ReplayListenerBus, + eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): ApplicationEventListener = { val logPath = eventLog.getPath() logInfo(s"Replaying log path: $logPath") // Note that the eventLog may have *increased* in size since when we grabbed the filestatus, @@ -581,30 +612,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val logInput = EventLoggingListener.openEventLog(logPath, fs) try { val appListener = new ApplicationEventListener - val appCompleted = isApplicationCompleted(eventLog) bus.addListener(appListener) - bus.replay(logInput, logPath.toString, !appCompleted) - - // Without an app ID, new logs will render incorrectly in the listing page, so do not list or - // try to show their UI. - if (appListener.appId.isDefined) { - val attemptInfo = new FsApplicationAttemptInfo( - logPath.getName(), - appListener.appName.getOrElse(NOT_STARTED), - appListener.appId.getOrElse(logPath.getName()), - appListener.appAttemptId, - appListener.startTime.getOrElse(-1L), - appListener.endTime.getOrElse(-1L), - eventLog.getModificationTime(), - appListener.sparkUser.getOrElse(NOT_STARTED), - appCompleted, - eventLog.getLen() - ) - fileToAppInfo(logPath) = attemptInfo - Some(attemptInfo) - } else { - None - } + bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter) + appListener } finally { logInput.close() } @@ -689,6 +699,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private[history] object FsHistoryProvider { val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + + private val NOT_STARTED = "<Not Started>" + + private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads" + + private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\"" + + private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\"" } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index d32f5eb7bf..3eff8d952b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -25,6 +25,7 @@ import com.fasterxml.jackson.core.JsonParseException import org.json4s.jackson.JsonMethods._ import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ReplayListenerBus._ import org.apache.spark.util.JsonProtocol /** @@ -43,30 +44,45 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { * @param sourceName Filename (or other source identifier) from whence @logData is being read * @param maybeTruncated Indicate whether log file might be truncated (some abnormal situations * encountered, log file might not finished writing) or not + * @param eventsFilter Filter function to select JSON event strings in the log data stream that + * should be parsed and replayed. When not specified, all event strings in the log data + * are parsed and replayed. */ def replay( logData: InputStream, sourceName: String, - maybeTruncated: Boolean = false): Unit = { + maybeTruncated: Boolean = false, + eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { + var currentLine: String = null - var lineNumber: Int = 1 + var lineNumber: Int = 0 + try { - val lines = Source.fromInputStream(logData).getLines() - while (lines.hasNext) { - currentLine = lines.next() + val lineEntries = Source.fromInputStream(logData) + .getLines() + .zipWithIndex + .filter { case (line, _) => eventsFilter(line) } + + while (lineEntries.hasNext) { try { + val entry = lineEntries.next() + + currentLine = entry._1 + lineNumber = entry._2 + 1 + postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine))) } catch { case jpe: JsonParseException => // We can only ignore exception from last line of the file that might be truncated - if (!maybeTruncated || lines.hasNext) { + // the last entry may not be the very last line in the event log, but we treat it + // as such in a best effort to replay the given input + if (!maybeTruncated || lineEntries.hasNext) { throw jpe } else { logWarning(s"Got JsonParseException from log file $sourceName" + s" at line $lineNumber, the file might not have finished writing cleanly.") } } - lineNumber += 1 } } catch { case ioe: IOException => @@ -78,3 +94,12 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { } } + + +private[spark] object ReplayListenerBus { + + type ReplayEventsFilter = (String) => Boolean + + // utility filter that selects all event logs during replay + val SELECT_ALL_FILTER: ReplayEventsFilter = { (eventString: String) => true } +} |