aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala92
1 files changed, 49 insertions, 43 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 2bd4a46e16..07cbcec8e5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -79,6 +79,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private val NOT_STARTED = "<Not Started>"
+ private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"
+
// Interval between safemode checks.
private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
"spark.history.fs.safemodeCheck.interval", "5s")
@@ -89,6 +91,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// Interval between each cleaner checks for event logs to delete
private val CLEAN_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.cleaner.interval", "1d")
+ // Number of threads used to replay event logs.
+ private val NUM_PROCESSING_THREADS = conf.getInt(SPARK_HISTORY_FS_NUM_REPLAY_THREADS,
+ Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
+
private val logDir = conf.getOption("spark.history.fs.logDirectory")
.map { d => Utils.resolveURI(d).toString }
.getOrElse(DEFAULT_LOG_DIR)
@@ -129,11 +135,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
/**
- * An Executor to fetch and parse log files.
+ * Fixed size thread pool to fetch and parse log files.
*/
private val replayExecutor: ExecutorService = {
if (!conf.contains("spark.testing")) {
- ThreadUtils.newDaemonSingleThreadExecutor("log-replay-executor")
+ ThreadUtils.newDaemonFixedThreadPool(NUM_PROCESSING_THREADS, "log-replay-executor")
} else {
MoreExecutors.sameThreadExecutor()
}
@@ -297,10 +303,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
if (logInfos.nonEmpty) {
logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
}
- logInfos.grouped(20)
- .map { batch =>
+ logInfos.map { file =>
replayExecutor.submit(new Runnable {
- override def run(): Unit = mergeApplicationListing(batch)
+ override def run(): Unit = mergeApplicationListing(file)
})
}
.foreach { task =>
@@ -385,9 +390,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
/**
* Replay the log files in the list and merge the list of old applications with new ones
*/
- private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = {
- val newAttempts = logs.flatMap { fileStatus =>
- try {
+ private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
+ val newAttempts = try {
val bus = new ReplayListenerBus()
val res = replay(fileStatus, bus)
res match {
@@ -403,7 +407,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
e)
None
}
- }
if (newAttempts.isEmpty) {
return
@@ -413,45 +416,48 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// contains both the new app attempt, and those that were already loaded in the existing apps
// map. If an attempt has been updated, it replaces the old attempt in the list.
val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
- newAttempts.foreach { attempt =>
- val appInfo = newAppMap.get(attempt.appId)
- .orElse(applications.get(attempt.appId))
- .map { app =>
- val attempts =
- app.attempts.filter(_.attemptId != attempt.attemptId).toList ++ List(attempt)
- new FsApplicationHistoryInfo(attempt.appId, attempt.name,
- attempts.sortWith(compareAttemptInfo))
- }
- .getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt)))
- newAppMap(attempt.appId) = appInfo
- }
- // Merge the new app list with the existing one, maintaining the expected ordering (descending
- // end time). Maintaining the order is important to avoid having to sort the list every time
- // there is a request for the log list.
- val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
- val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
- def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
- if (!mergedApps.contains(info.id)) {
- mergedApps += (info.id -> info)
+ applications.synchronized {
+ newAttempts.foreach { attempt =>
+ val appInfo = newAppMap.get(attempt.appId)
+ .orElse(applications.get(attempt.appId))
+ .map { app =>
+ val attempts =
+ app.attempts.filter(_.attemptId != attempt.attemptId) ++ List(attempt)
+ new FsApplicationHistoryInfo(attempt.appId, attempt.name,
+ attempts.sortWith(compareAttemptInfo))
+ }
+ .getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt)))
+ newAppMap(attempt.appId) = appInfo
}
- }
- val newIterator = newApps.iterator.buffered
- val oldIterator = applications.values.iterator.buffered
- while (newIterator.hasNext && oldIterator.hasNext) {
- if (newAppMap.contains(oldIterator.head.id)) {
- oldIterator.next()
- } else if (compareAppInfo(newIterator.head, oldIterator.head)) {
- addIfAbsent(newIterator.next())
- } else {
- addIfAbsent(oldIterator.next())
+ // Merge the new app list with the existing one, maintaining the expected ordering (descending
+ // end time). Maintaining the order is important to avoid having to sort the list every time
+ // there is a request for the log list.
+ val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
+ val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
+ def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
+ if (!mergedApps.contains(info.id)) {
+ mergedApps += (info.id -> info)
+ }
}
- }
- newIterator.foreach(addIfAbsent)
- oldIterator.foreach(addIfAbsent)
- applications = mergedApps
+ val newIterator = newApps.iterator.buffered
+ val oldIterator = applications.values.iterator.buffered
+ while (newIterator.hasNext && oldIterator.hasNext) {
+ if (newAppMap.contains(oldIterator.head.id)) {
+ oldIterator.next()
+ } else if (compareAppInfo(newIterator.head, oldIterator.head)) {
+ addIfAbsent(newIterator.next())
+ } else {
+ addIfAbsent(oldIterator.next())
+ }
+ }
+ newIterator.foreach(addIfAbsent)
+ oldIterator.foreach(addIfAbsent)
+
+ applications = mergedApps
+ }
}
/**