aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorBryan Cutler <bjcutler@us.ibm.com>2015-12-15 18:28:16 -0800
committerAndrew Or <andrew@databricks.com>2015-12-15 18:28:16 -0800
commitc5b6b398d5e368626e589feede80355fb74c2bd8 (patch)
treea5f22830ec409b885b2ab004d5e646441c1c4064 /core
parent8a215d2338c6286253e20122640592f9d69896c8 (diff)
downloadspark-c5b6b398d5e368626e589feede80355fb74c2bd8.tar.gz
spark-c5b6b398d5e368626e589feede80355fb74c2bd8.tar.bz2
spark-c5b6b398d5e368626e589feede80355fb74c2bd8.zip
[SPARK-12062][CORE] Change Master to asyc rebuild UI when application completes
This change builds the event history of completed apps asynchronously so the RPC thread will not be blocked and allow new workers to register/remove if the event log history is very large and takes a long time to rebuild. Author: Bryan Cutler <bjcutler@us.ibm.com> Closes #10284 from BryanCutler/async-MasterUI-SPARK-12062.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala79
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala2
2 files changed, 52 insertions, 29 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 1355e1ad1b..fc42bf06e4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -21,9 +21,11 @@ import java.io.FileNotFoundException
import java.net.URLEncoder
import java.text.SimpleDateFormat
import java.util.Date
-import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.postfixOps
import scala.util.Random
@@ -56,6 +58,10 @@ private[deploy] class Master(
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
+ private val rebuildUIThread =
+ ThreadUtils.newDaemonSingleThreadExecutor("master-rebuild-ui-thread")
+ private val rebuildUIContext = ExecutionContext.fromExecutor(rebuildUIThread)
+
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
@@ -78,7 +84,8 @@ private[deploy] class Master(
private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
private val completedApps = new ArrayBuffer[ApplicationInfo]
private var nextAppNumber = 0
- private val appIdToUI = new HashMap[String, SparkUI]
+ // Using ConcurrentHashMap so that master-rebuild-ui-thread can add a UI after asyncRebuildUI
+ private val appIdToUI = new ConcurrentHashMap[String, SparkUI]
private val drivers = new HashSet[DriverInfo]
private val completedDrivers = new ArrayBuffer[DriverInfo]
@@ -191,6 +198,7 @@ private[deploy] class Master(
checkForWorkerTimeOutTask.cancel(true)
}
forwardMessageThread.shutdownNow()
+ rebuildUIThread.shutdownNow()
webUi.stop()
restServer.foreach(_.stop())
masterMetricsSystem.stop()
@@ -367,6 +375,10 @@ private[deploy] class Master(
case CheckForWorkerTimeOut => {
timeOutDeadWorkers()
}
+
+ case AttachCompletedRebuildUI(appId) =>
+ // An asyncRebuildSparkUI has completed, so need to attach to master webUi
+ Option(appIdToUI.get(appId)).foreach { ui => webUi.attachSparkUI(ui) }
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -809,7 +821,7 @@ private[deploy] class Master(
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
- appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
+ Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
@@ -818,7 +830,7 @@ private[deploy] class Master(
waitingApps -= app
// If application events are logged, use them to rebuild the UI
- rebuildSparkUI(app)
+ asyncRebuildSparkUI(app)
for (exec <- app.executors.values) {
killExecutor(exec)
@@ -923,49 +935,57 @@ private[deploy] class Master(
* Return the UI if successful, else None
*/
private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = {
+ val futureUI = asyncRebuildSparkUI(app)
+ Await.result(futureUI, Duration.Inf)
+ }
+
+ /** Rebuild a new SparkUI asynchronously to not block RPC event loop */
+ private[master] def asyncRebuildSparkUI(app: ApplicationInfo): Future[Option[SparkUI]] = {
val appName = app.desc.name
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
- try {
- val eventLogDir = app.desc.eventLogDir
- .getOrElse {
- // Event logging is not enabled for this application
- app.appUIUrlAtHistoryServer = Some(notFoundBasePath)
- return None
- }
-
+ val eventLogDir = app.desc.eventLogDir
+ .getOrElse {
+ // Event logging is disabled for this application
+ app.appUIUrlAtHistoryServer = Some(notFoundBasePath)
+ return Future.successful(None)
+ }
+ val futureUI = Future {
val eventLogFilePrefix = EventLoggingListener.getLogPath(
- eventLogDir, app.id, appAttemptId = None, compressionCodecName = app.desc.eventLogCodec)
+ eventLogDir, app.id, appAttemptId = None, compressionCodecName = app.desc.eventLogCodec)
val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
- EventLoggingListener.IN_PROGRESS))
+ EventLoggingListener.IN_PROGRESS))
- if (inProgressExists) {
+ val eventLogFile = if (inProgressExists) {
// Event logging is enabled for this application, but the application is still in progress
logWarning(s"Application $appName is still in progress, it may be terminated abnormally.")
- }
-
- val (eventLogFile, status) = if (inProgressExists) {
- (eventLogFilePrefix + EventLoggingListener.IN_PROGRESS, " (in progress)")
+ eventLogFilePrefix + EventLoggingListener.IN_PROGRESS
} else {
- (eventLogFilePrefix, " (completed)")
+ eventLogFilePrefix
}
val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
val replayBus = new ReplayListenerBus()
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
appName, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", app.startTime)
- val maybeTruncated = eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS)
try {
- replayBus.replay(logInput, eventLogFile, maybeTruncated)
+ replayBus.replay(logInput, eventLogFile, inProgressExists)
} finally {
logInput.close()
}
- appIdToUI(app.id) = ui
- webUi.attachSparkUI(ui)
+
+ Some(ui)
+ }(rebuildUIContext)
+
+ futureUI.onSuccess { case Some(ui) =>
+ appIdToUI.put(app.id, ui)
+ self.send(AttachCompletedRebuildUI(app.id))
// Application UI is successfully rebuilt, so link the Master UI to it
+ // NOTE - app.appUIUrlAtHistoryServer is volatile
app.appUIUrlAtHistoryServer = Some(ui.basePath)
- Some(ui)
- } catch {
+ }(ThreadUtils.sameThread)
+
+ futureUI.onFailure {
case fnf: FileNotFoundException =>
// Event logging is enabled for this application, but no event logs are found
val title = s"Application history not found (${app.id})"
@@ -974,7 +994,7 @@ private[deploy] class Master(
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
app.appUIUrlAtHistoryServer = Some(notFoundBasePath + s"?msg=$msg&title=$title")
- None
+
case e: Exception =>
// Relay exception message to application UI page
val title = s"Application history load error (${app.id})"
@@ -984,8 +1004,9 @@ private[deploy] class Master(
msg = URLEncoder.encode(msg, "UTF-8")
app.appUIUrlAtHistoryServer =
Some(notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title")
- None
- }
+ }(ThreadUtils.sameThread)
+
+ futureUI
}
/** Generate a new app ID given a app's submission date */
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
index a952cee36e..a055d09767 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
@@ -39,4 +39,6 @@ private[master] object MasterMessages {
case object BoundPortsRequest
case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int])
+
+ case class AttachCompletedRebuildUI(appId: String)
}