aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorguliangliang <guliangliang@qiyi.com>2015-03-02 15:33:23 -0800
committerAndrew Or <andrew@databricks.com>2015-03-02 15:33:23 -0800
commit26c1c56dea5d4160913bb65bb743aeb63fee3240 (patch)
tree1502ed4dcd7aff6832615873c328c9ea07317cfa
parent6b348d90f475440c285a4b636134ffa9351580b9 (diff)
downloadspark-26c1c56dea5d4160913bb65bb743aeb63fee3240.tar.gz
spark-26c1c56dea5d4160913bb65bb743aeb63fee3240.tar.bz2
spark-26c1c56dea5d4160913bb65bb743aeb63fee3240.zip
[SPARK-5522] Accelerate the Histroty Server start
When starting the history server, all the log files will be fetched and parsed in order to get the applications' meta data e.g. App Name, Start Time, Duration, etc. In our production cluster, there exist 2600 log files (160G) in HDFS and it costs 3 hours to restart the history server, which is a little bit too long for us. It would be better, if the history server can show logs with missing information during start-up and fill the missing information after fetching and parsing a log file. Author: guliangliang <guliangliang@qiyi.com> Closes #4525 from marsishandsome/Spark5522 and squashes the following commits: a865c11 [guliangliang] fix bug2 4340c2b [guliangliang] fix bug af92a5a [guliangliang] [SPARK-5522] Accelerate the Histroty Server start
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala115
1 files changed, 74 insertions, 41 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 3e3d6ff29f..c5fab1d440 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -18,22 +18,23 @@
package org.apache.spark.deploy.history
import java.io.{IOException, BufferedInputStream, FileNotFoundException, InputStream}
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import scala.collection.mutable
import scala.concurrent.duration.Duration
import com.google.common.util.concurrent.ThreadFactoryBuilder
-import org.apache.hadoop.fs.{FileStatus, Path}
+import com.google.common.util.concurrent.MoreExecutors
import org.apache.hadoop.fs.permission.AccessControlException
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+
/**
* A class that provides application history from event logs stored in the file system.
@@ -98,6 +99,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}
}
+ /**
+ * An Executor to fetch and parse log files.
+ */
+ private val replayExecutor: ExecutorService = {
+ if (!conf.contains("spark.testing")) {
+ Executors.newSingleThreadExecutor(Utils.namedThreadFactory("log-replay-executor"))
+ } else {
+ MoreExecutors.sameThreadExecutor()
+ }
+ }
+
initialize()
private def initialize(): Unit = {
@@ -171,10 +183,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private[history] def checkForLogs(): Unit = {
try {
- var newLastModifiedTime = lastModifiedTime
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
- val logInfos = statusList
+ var newLastModifiedTime = lastModifiedTime
+ val logInfos: Seq[FileStatus] = statusList
.filter { entry =>
try {
getModificationTime(entry).map { time =>
@@ -189,48 +201,69 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
false
}
}
- .flatMap { entry =>
- try {
- Some(replay(entry, new ReplayListenerBus()))
- } catch {
- case e: Exception =>
- logError(s"Failed to load application log data from $entry.", e)
- None
- }
- }
- .sortWith(compareAppInfo)
+ .flatMap { entry => Some(entry) }
+ .sortWith { case (entry1, entry2) =>
+ val mod1 = getModificationTime(entry1).getOrElse(-1L)
+ val mod2 = getModificationTime(entry2).getOrElse(-1L)
+ mod1 >= mod2
+ }
+
+ logInfos.sliding(20, 20).foreach { batch =>
+ replayExecutor.submit(new Runnable {
+ override def run(): Unit = mergeApplicationListing(batch)
+ })
+ }
lastModifiedTime = newLastModifiedTime
+ } catch {
+ case e: Exception => logError("Exception in checking for event log updates", e)
+ }
+ }
- // When there are new logs, merge the new list with the existing one, maintaining
- // the expected ordering (descending end time). Maintaining the order is important
- // to avoid having to sort the list every time there is a request for the log list.
- if (!logInfos.isEmpty) {
- val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
- def addIfAbsent(info: FsApplicationHistoryInfo) = {
- if (!newApps.contains(info.id) ||
- newApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
- !info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
- newApps += (info.id -> info)
- }
+ /**
+ * Replay the log files in the list and merge the list of old applications with new ones
+ */
+ private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = {
+ val bus = new ReplayListenerBus()
+ val newApps = logs.flatMap { fileStatus =>
+ try {
+ val res = replay(fileStatus, bus)
+ logInfo(s"Application log ${res.logPath} loaded successfully.")
+ Some(res)
+ } catch {
+ case e: Exception =>
+ logError(
+ s"Exception encountered when attempting to load application log ${fileStatus.getPath}")
+ None
+ }
+ }.toSeq.sortWith(compareAppInfo)
+
+ // When there are new logs, merge the new list with the existing one, maintaining
+ // the expected ordering (descending end time). Maintaining the order is important
+ // to avoid having to sort the list every time there is a request for the log list.
+ if (newApps.nonEmpty) {
+ val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
+ def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
+ if (!mergedApps.contains(info.id) ||
+ mergedApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
+ !info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
+ mergedApps += (info.id -> info)
}
+ }
- val newIterator = logInfos.iterator.buffered
- val oldIterator = applications.values.iterator.buffered
- while (newIterator.hasNext && oldIterator.hasNext) {
- if (compareAppInfo(newIterator.head, oldIterator.head)) {
- addIfAbsent(newIterator.next)
- } else {
- addIfAbsent(oldIterator.next)
- }
+ val newIterator = newApps.iterator.buffered
+ val oldIterator = applications.values.iterator.buffered
+ while (newIterator.hasNext && oldIterator.hasNext) {
+ if (compareAppInfo(newIterator.head, oldIterator.head)) {
+ addIfAbsent(newIterator.next())
+ } else {
+ addIfAbsent(oldIterator.next())
}
- newIterator.foreach(addIfAbsent)
- oldIterator.foreach(addIfAbsent)
-
- applications = newApps
}
- } catch {
- case e: Exception => logError("Exception in checking for event log updates", e)
+ newIterator.foreach(addIfAbsent)
+ oldIterator.foreach(addIfAbsent)
+
+ applications = mergedApps
}
}