aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala115
1 files changed, 74 insertions, 41 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 3e3d6ff29f..c5fab1d440 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -18,22 +18,23 @@
package org.apache.spark.deploy.history
import java.io.{IOException, BufferedInputStream, FileNotFoundException, InputStream}
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import scala.collection.mutable
import scala.concurrent.duration.Duration
import com.google.common.util.concurrent.ThreadFactoryBuilder
-import org.apache.hadoop.fs.{FileStatus, Path}
+import com.google.common.util.concurrent.MoreExecutors
import org.apache.hadoop.fs.permission.AccessControlException
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+
/**
* A class that provides application history from event logs stored in the file system.
@@ -98,6 +99,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}
}
+ /**
+ * An Executor to fetch and parse log files.
+ */
+ private val replayExecutor: ExecutorService = {
+ if (!conf.contains("spark.testing")) {
+ Executors.newSingleThreadExecutor(Utils.namedThreadFactory("log-replay-executor"))
+ } else {
+ MoreExecutors.sameThreadExecutor()
+ }
+ }
+
initialize()
private def initialize(): Unit = {
@@ -171,10 +183,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private[history] def checkForLogs(): Unit = {
try {
- var newLastModifiedTime = lastModifiedTime
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
- val logInfos = statusList
+ var newLastModifiedTime = lastModifiedTime
+ val logInfos: Seq[FileStatus] = statusList
.filter { entry =>
try {
getModificationTime(entry).map { time =>
@@ -189,48 +201,69 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
false
}
}
- .flatMap { entry =>
- try {
- Some(replay(entry, new ReplayListenerBus()))
- } catch {
- case e: Exception =>
- logError(s"Failed to load application log data from $entry.", e)
- None
- }
- }
- .sortWith(compareAppInfo)
+ .flatMap { entry => Some(entry) }
+ .sortWith { case (entry1, entry2) =>
+ val mod1 = getModificationTime(entry1).getOrElse(-1L)
+ val mod2 = getModificationTime(entry2).getOrElse(-1L)
+ mod1 >= mod2
+ }
+
+ logInfos.sliding(20, 20).foreach { batch =>
+ replayExecutor.submit(new Runnable {
+ override def run(): Unit = mergeApplicationListing(batch)
+ })
+ }
lastModifiedTime = newLastModifiedTime
+ } catch {
+ case e: Exception => logError("Exception in checking for event log updates", e)
+ }
+ }
- // When there are new logs, merge the new list with the existing one, maintaining
- // the expected ordering (descending end time). Maintaining the order is important
- // to avoid having to sort the list every time there is a request for the log list.
- if (!logInfos.isEmpty) {
- val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
- def addIfAbsent(info: FsApplicationHistoryInfo) = {
- if (!newApps.contains(info.id) ||
- newApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
- !info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
- newApps += (info.id -> info)
- }
+ /**
+ * Replay the log files in the list and merge the list of old applications with new ones
+ */
+ private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = {
+ val bus = new ReplayListenerBus()
+ val newApps = logs.flatMap { fileStatus =>
+ try {
+ val res = replay(fileStatus, bus)
+ logInfo(s"Application log ${res.logPath} loaded successfully.")
+ Some(res)
+ } catch {
+ case e: Exception =>
+ logError(
+ s"Exception encountered when attempting to load application log ${fileStatus.getPath}")
+ None
+ }
+ }.toSeq.sortWith(compareAppInfo)
+
+ // When there are new logs, merge the new list with the existing one, maintaining
+ // the expected ordering (descending end time). Maintaining the order is important
+ // to avoid having to sort the list every time there is a request for the log list.
+ if (newApps.nonEmpty) {
+ val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
+ def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
+ if (!mergedApps.contains(info.id) ||
+ mergedApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
+ !info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
+ mergedApps += (info.id -> info)
}
+ }
- val newIterator = logInfos.iterator.buffered
- val oldIterator = applications.values.iterator.buffered
- while (newIterator.hasNext && oldIterator.hasNext) {
- if (compareAppInfo(newIterator.head, oldIterator.head)) {
- addIfAbsent(newIterator.next)
- } else {
- addIfAbsent(oldIterator.next)
- }
+ val newIterator = newApps.iterator.buffered
+ val oldIterator = applications.values.iterator.buffered
+ while (newIterator.hasNext && oldIterator.hasNext) {
+ if (compareAppInfo(newIterator.head, oldIterator.head)) {
+ addIfAbsent(newIterator.next())
+ } else {
+ addIfAbsent(oldIterator.next())
}
- newIterator.foreach(addIfAbsent)
- oldIterator.foreach(addIfAbsent)
-
- applications = newApps
}
- } catch {
- case e: Exception => logError("Exception in checking for event log updates", e)
+ newIterator.foreach(addIfAbsent)
+ oldIterator.foreach(addIfAbsent)
+
+ applications = mergedApps
}
}