diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala | 115 |
1 files changed, 74 insertions, 41 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 3e3d6ff29f..c5fab1d440 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -18,22 +18,23 @@ package org.apache.spark.deploy.history import java.io.{IOException, BufferedInputStream, FileNotFoundException, InputStream} -import java.util.concurrent.{Executors, TimeUnit} +import java.util.concurrent.{ExecutorService, Executors, TimeUnit} import scala.collection.mutable import scala.concurrent.duration.Duration import com.google.common.util.concurrent.ThreadFactoryBuilder -import org.apache.hadoop.fs.{FileStatus, Path} +import com.google.common.util.concurrent.MoreExecutors import org.apache.hadoop.fs.permission.AccessControlException - -import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SecurityManager, SparkConf} + /** * A class that provides application history from event logs stored in the file system. @@ -98,6 +99,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } + /** + * An Executor to fetch and parse log files. + */ + private val replayExecutor: ExecutorService = { + if (!conf.contains("spark.testing")) { + Executors.newSingleThreadExecutor(Utils.namedThreadFactory("log-replay-executor")) + } else { + MoreExecutors.sameThreadExecutor() + } + } + initialize() private def initialize(): Unit = { @@ -171,10 +183,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis */ private[history] def checkForLogs(): Unit = { try { - var newLastModifiedTime = lastModifiedTime val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) .getOrElse(Seq[FileStatus]()) - val logInfos = statusList + var newLastModifiedTime = lastModifiedTime + val logInfos: Seq[FileStatus] = statusList .filter { entry => try { getModificationTime(entry).map { time => @@ -189,48 +201,69 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis false } } - .flatMap { entry => - try { - Some(replay(entry, new ReplayListenerBus())) - } catch { - case e: Exception => - logError(s"Failed to load application log data from $entry.", e) - None - } - } - .sortWith(compareAppInfo) + .flatMap { entry => Some(entry) } + .sortWith { case (entry1, entry2) => + val mod1 = getModificationTime(entry1).getOrElse(-1L) + val mod2 = getModificationTime(entry2).getOrElse(-1L) + mod1 >= mod2 + } + + logInfos.sliding(20, 20).foreach { batch => + replayExecutor.submit(new Runnable { + override def run(): Unit = mergeApplicationListing(batch) + }) + } lastModifiedTime = newLastModifiedTime + } catch { + case e: Exception => logError("Exception in checking for event log updates", e) + } + } - // When there are new logs, merge the new list with the existing one, maintaining - // the expected ordering (descending end time). Maintaining the order is important - // to avoid having to sort the list every time there is a request for the log list. - if (!logInfos.isEmpty) { - val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - def addIfAbsent(info: FsApplicationHistoryInfo) = { - if (!newApps.contains(info.id) || - newApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) && - !info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) { - newApps += (info.id -> info) - } + /** + * Replay the log files in the list and merge the list of old applications with new ones + */ + private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = { + val bus = new ReplayListenerBus() + val newApps = logs.flatMap { fileStatus => + try { + val res = replay(fileStatus, bus) + logInfo(s"Application log ${res.logPath} loaded successfully.") + Some(res) + } catch { + case e: Exception => + logError( + s"Exception encountered when attempting to load application log ${fileStatus.getPath}") + None + } + }.toSeq.sortWith(compareAppInfo) + + // When there are new logs, merge the new list with the existing one, maintaining + // the expected ordering (descending end time). Maintaining the order is important + // to avoid having to sort the list every time there is a request for the log list. + if (newApps.nonEmpty) { + val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + def addIfAbsent(info: FsApplicationHistoryInfo): Unit = { + if (!mergedApps.contains(info.id) || + mergedApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) && + !info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) { + mergedApps += (info.id -> info) } + } - val newIterator = logInfos.iterator.buffered - val oldIterator = applications.values.iterator.buffered - while (newIterator.hasNext && oldIterator.hasNext) { - if (compareAppInfo(newIterator.head, oldIterator.head)) { - addIfAbsent(newIterator.next) - } else { - addIfAbsent(oldIterator.next) - } + val newIterator = newApps.iterator.buffered + val oldIterator = applications.values.iterator.buffered + while (newIterator.hasNext && oldIterator.hasNext) { + if (compareAppInfo(newIterator.head, oldIterator.head)) { + addIfAbsent(newIterator.next()) + } else { + addIfAbsent(oldIterator.next()) } - newIterator.foreach(addIfAbsent) - oldIterator.foreach(addIfAbsent) - - applications = newApps } - } catch { - case e: Exception => logError("Exception in checking for event log updates", e) + newIterator.foreach(addIfAbsent) + oldIterator.foreach(addIfAbsent) + + applications = mergedApps } } |