aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala79
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala2
2 files changed, 52 insertions, 29 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 1355e1ad1b..fc42bf06e4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -21,9 +21,11 @@ import java.io.FileNotFoundException
import java.net.URLEncoder
import java.text.SimpleDateFormat
import java.util.Date
-import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.postfixOps
import scala.util.Random
@@ -56,6 +58,10 @@ private[deploy] class Master(
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
+ private val rebuildUIThread =
+ ThreadUtils.newDaemonSingleThreadExecutor("master-rebuild-ui-thread")
+ private val rebuildUIContext = ExecutionContext.fromExecutor(rebuildUIThread)
+
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
@@ -78,7 +84,8 @@ private[deploy] class Master(
private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
private val completedApps = new ArrayBuffer[ApplicationInfo]
private var nextAppNumber = 0
- private val appIdToUI = new HashMap[String, SparkUI]
+ // Using ConcurrentHashMap so that master-rebuild-ui-thread can add a UI after asyncRebuildUI
+ private val appIdToUI = new ConcurrentHashMap[String, SparkUI]
private val drivers = new HashSet[DriverInfo]
private val completedDrivers = new ArrayBuffer[DriverInfo]
@@ -191,6 +198,7 @@ private[deploy] class Master(
checkForWorkerTimeOutTask.cancel(true)
}
forwardMessageThread.shutdownNow()
+ rebuildUIThread.shutdownNow()
webUi.stop()
restServer.foreach(_.stop())
masterMetricsSystem.stop()
@@ -367,6 +375,10 @@ private[deploy] class Master(
case CheckForWorkerTimeOut => {
timeOutDeadWorkers()
}
+
+ case AttachCompletedRebuildUI(appId) =>
+ // An asyncRebuildSparkUI has completed, so need to attach to master webUi
+ Option(appIdToUI.get(appId)).foreach { ui => webUi.attachSparkUI(ui) }
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -809,7 +821,7 @@ private[deploy] class Master(
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
- appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
+ Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
@@ -818,7 +830,7 @@ private[deploy] class Master(
waitingApps -= app
// If application events are logged, use them to rebuild the UI
- rebuildSparkUI(app)
+ asyncRebuildSparkUI(app)
for (exec <- app.executors.values) {
killExecutor(exec)
@@ -923,49 +935,57 @@ private[deploy] class Master(
* Return the UI if successful, else None
*/
private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = {
+ val futureUI = asyncRebuildSparkUI(app)
+ Await.result(futureUI, Duration.Inf)
+ }
+
+ /** Rebuild a new SparkUI asynchronously to not block RPC event loop */
+ private[master] def asyncRebuildSparkUI(app: ApplicationInfo): Future[Option[SparkUI]] = {
val appName = app.desc.name
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
- try {
- val eventLogDir = app.desc.eventLogDir
- .getOrElse {
- // Event logging is not enabled for this application
- app.appUIUrlAtHistoryServer = Some(notFoundBasePath)
- return None
- }
-
+ val eventLogDir = app.desc.eventLogDir
+ .getOrElse {
+ // Event logging is disabled for this application
+ app.appUIUrlAtHistoryServer = Some(notFoundBasePath)
+ return Future.successful(None)
+ }
+ val futureUI = Future {
val eventLogFilePrefix = EventLoggingListener.getLogPath(
- eventLogDir, app.id, appAttemptId = None, compressionCodecName = app.desc.eventLogCodec)
+ eventLogDir, app.id, appAttemptId = None, compressionCodecName = app.desc.eventLogCodec)
val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
- EventLoggingListener.IN_PROGRESS))
+ EventLoggingListener.IN_PROGRESS))
- if (inProgressExists) {
+ val eventLogFile = if (inProgressExists) {
// Event logging is enabled for this application, but the application is still in progress
logWarning(s"Application $appName is still in progress, it may be terminated abnormally.")
- }
-
- val (eventLogFile, status) = if (inProgressExists) {
- (eventLogFilePrefix + EventLoggingListener.IN_PROGRESS, " (in progress)")
+ eventLogFilePrefix + EventLoggingListener.IN_PROGRESS
} else {
- (eventLogFilePrefix, " (completed)")
+ eventLogFilePrefix
}
val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
val replayBus = new ReplayListenerBus()
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
appName, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", app.startTime)
- val maybeTruncated = eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS)
try {
- replayBus.replay(logInput, eventLogFile, maybeTruncated)
+ replayBus.replay(logInput, eventLogFile, inProgressExists)
} finally {
logInput.close()
}
- appIdToUI(app.id) = ui
- webUi.attachSparkUI(ui)
+
+ Some(ui)
+ }(rebuildUIContext)
+
+ futureUI.onSuccess { case Some(ui) =>
+ appIdToUI.put(app.id, ui)
+ self.send(AttachCompletedRebuildUI(app.id))
// Application UI is successfully rebuilt, so link the Master UI to it
+ // NOTE - app.appUIUrlAtHistoryServer is volatile
app.appUIUrlAtHistoryServer = Some(ui.basePath)
- Some(ui)
- } catch {
+ }(ThreadUtils.sameThread)
+
+ futureUI.onFailure {
case fnf: FileNotFoundException =>
// Event logging is enabled for this application, but no event logs are found
val title = s"Application history not found (${app.id})"
@@ -974,7 +994,7 @@ private[deploy] class Master(
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
app.appUIUrlAtHistoryServer = Some(notFoundBasePath + s"?msg=$msg&title=$title")
- None
+
case e: Exception =>
// Relay exception message to application UI page
val title = s"Application history load error (${app.id})"
@@ -984,8 +1004,9 @@ private[deploy] class Master(
msg = URLEncoder.encode(msg, "UTF-8")
app.appUIUrlAtHistoryServer =
Some(notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title")
- None
- }
+ }(ThreadUtils.sameThread)
+
+ futureUI
}
/** Generate a new app ID given a app's submission date */
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
index a952cee36e..a055d09767 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
@@ -39,4 +39,6 @@ private[master] object MasterMessages {
case object BoundPortsRequest
case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int])
+
+ case class AttachCompletedRebuildUI(appId: String)
}