aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorBryan Cutler <cutlerb@gmail.com>2016-05-04 14:29:54 -0700
committerAndrew Or <andrew@databricks.com>2016-05-04 14:29:54 -0700
commitcf2e9da612397233ae7bca0e9ce57309f16226b5 (patch)
tree496c54cf5f861c8f9916671b91a415729e54e8bc /core
parent0c00391f77359efdbf9dbd26d4c8186be8839922 (diff)
downloadspark-cf2e9da612397233ae7bca0e9ce57309f16226b5.tar.gz
spark-cf2e9da612397233ae7bca0e9ce57309f16226b5.tar.bz2
spark-cf2e9da612397233ae7bca0e9ce57309f16226b5.zip
[SPARK-12299][CORE] Remove history serving functionality from Master
Remove history server functionality from standalone Master. Previously, the Master process rebuilt a SparkUI once the application was completed which sometimes caused problems, such as OOM, when the application event log is large (see SPARK-6270). Keeping this functionality out of the Master will help to simplify the process and increase stability. Testing for this change included running core unit tests and manually running an application on a standalone cluster to verify that it completed successfully and that the Master UI functioned correctly. Also added 2 unit tests to verify killing an application and driver from MasterWebUI makes the correct request to the Master. Author: Bryan Cutler <cutlerb@gmail.com> Closes #10991 from BryanCutler/remove-history-master-SPARK-12299.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala109
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala73
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala40
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala30
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala118
10 files changed, 86 insertions, 311 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index 4ffb5283e9..53564d0e95 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -41,7 +41,6 @@ private[spark] class ApplicationInfo(
@transient var coresGranted: Int = _
@transient var endTime: Long = _
@transient var appSource: ApplicationSource = _
- @transient @volatile var appUIUrlAtHistoryServer: Option[String] = None
// A cap on the number of executors this application can have at any given time.
// By default, this is infinite. Only after the first allocation request is issued by the
@@ -66,7 +65,6 @@ private[spark] class ApplicationInfo(
nextExecutorId = 0
removedExecutors = new ArrayBuffer[ExecutorDesc]
executorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE)
- appUIUrlAtHistoryServer = None
}
private def newExecutorId(useID: Option[Int] = None): Int = {
@@ -136,11 +134,4 @@ private[spark] class ApplicationInfo(
System.currentTimeMillis() - startTime
}
}
-
- /**
- * Returns the original application UI url unless there is its address at history server
- * is defined
- */
- def curAppUIUrl: String = appUIUrlAtHistoryServer.getOrElse(desc.appUiUrl)
-
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index edc9be2a8a..faed4f4dc9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -17,25 +17,17 @@
package org.apache.spark.deploy.master
-import java.io.FileNotFoundException
-import java.net.URLEncoder
import java.text.SimpleDateFormat
import java.util.Date
-import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.concurrent.{ExecutionContext, Future}
-import scala.concurrent.duration.Duration
-import scala.language.postfixOps
import scala.util.Random
-import org.apache.hadoop.fs.Path
-
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
ExecutorState, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages._
-import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
@@ -43,9 +35,7 @@ import org.apache.spark.deploy.rest.StandaloneRestServer
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
-import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.serializer.{JavaSerializer, Serializer}
-import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ThreadUtils, Utils}
private[deploy] class Master(
@@ -59,10 +49,6 @@ private[deploy] class Master(
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
- private val rebuildUIThread =
- ThreadUtils.newDaemonSingleThreadExecutor("master-rebuild-ui-thread")
- private val rebuildUIContext = ExecutionContext.fromExecutor(rebuildUIThread)
-
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
@@ -85,8 +71,6 @@ private[deploy] class Master(
private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
private val completedApps = new ArrayBuffer[ApplicationInfo]
private var nextAppNumber = 0
- // Using ConcurrentHashMap so that master-rebuild-ui-thread can add a UI after asyncRebuildUI
- private val appIdToUI = new ConcurrentHashMap[String, SparkUI]
private val drivers = new HashSet[DriverInfo]
private val completedDrivers = new ArrayBuffer[DriverInfo]
@@ -199,7 +183,6 @@ private[deploy] class Master(
checkForWorkerTimeOutTask.cancel(true)
}
forwardMessageThread.shutdownNow()
- rebuildUIThread.shutdownNow()
webUi.stop()
restServer.foreach(_.stop())
masterMetricsSystem.stop()
@@ -391,9 +374,6 @@ private[deploy] class Master(
case CheckForWorkerTimeOut =>
timeOutDeadWorkers()
- case AttachCompletedRebuildUI(appId) =>
- // An asyncRebuildSparkUI has completed, so need to attach to master webUi
- Option(appIdToUI.get(appId)).foreach { ui => webUi.attachSparkUI(ui) }
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -844,7 +824,6 @@ private[deploy] class Master(
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach { a =>
- Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
}
completedApps.trimStart(toRemove)
@@ -852,9 +831,6 @@ private[deploy] class Master(
completedApps += app // Remember it in our history
waitingApps -= app
- // If application events are logged, use them to rebuild the UI
- asyncRebuildSparkUI(app)
-
for (exec <- app.executors.values) {
killExecutor(exec)
}
@@ -953,89 +929,6 @@ private[deploy] class Master(
exec.state = ExecutorState.KILLED
}
- /**
- * Rebuild a new SparkUI from the given application's event logs.
- * Return the UI if successful, else None
- */
- private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = {
- val futureUI = asyncRebuildSparkUI(app)
- ThreadUtils.awaitResult(futureUI, Duration.Inf)
- }
-
- /** Rebuild a new SparkUI asynchronously to not block RPC event loop */
- private[master] def asyncRebuildSparkUI(app: ApplicationInfo): Future[Option[SparkUI]] = {
- val appName = app.desc.name
- val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
- val eventLogDir = app.desc.eventLogDir
- .getOrElse {
- // Event logging is disabled for this application
- app.appUIUrlAtHistoryServer = Some(notFoundBasePath)
- return Future.successful(None)
- }
- val futureUI = Future {
- val eventLogFilePrefix = EventLoggingListener.getLogPath(
- eventLogDir, app.id, appAttemptId = None, compressionCodecName = app.desc.eventLogCodec)
- val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
- val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
- EventLoggingListener.IN_PROGRESS))
-
- val eventLogFile = if (inProgressExists) {
- // Event logging is enabled for this application, but the application is still in progress
- logWarning(s"Application $appName is still in progress, it may be terminated abnormally.")
- eventLogFilePrefix + EventLoggingListener.IN_PROGRESS
- } else {
- eventLogFilePrefix
- }
-
- val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
- val replayBus = new ReplayListenerBus()
- val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
- appName, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", app.startTime)
- try {
- replayBus.replay(logInput, eventLogFile, inProgressExists)
- } finally {
- logInput.close()
- }
-
- Some(ui)
- }(rebuildUIContext)
-
- futureUI.onSuccess { case Some(ui) =>
- appIdToUI.put(app.id, ui)
- // `self` can be null if we are already in the process of shutting down
- // This happens frequently in tests where `local-cluster` is used
- if (self != null) {
- self.send(AttachCompletedRebuildUI(app.id))
- }
- // Application UI is successfully rebuilt, so link the Master UI to it
- // NOTE - app.appUIUrlAtHistoryServer is volatile
- app.appUIUrlAtHistoryServer = Some(ui.basePath)
- }(ThreadUtils.sameThread)
-
- futureUI.onFailure {
- case fnf: FileNotFoundException =>
- // Event logging is enabled for this application, but no event logs are found
- val title = s"Application history not found (${app.id})"
- var msg = s"No event logs found for application $appName in ${app.desc.eventLogDir.get}."
- logWarning(msg)
- msg += " Did you specify the correct logging directory?"
- msg = URLEncoder.encode(msg, "UTF-8")
- app.appUIUrlAtHistoryServer = Some(notFoundBasePath + s"?msg=$msg&title=$title")
-
- case e: Exception =>
- // Relay exception message to application UI page
- val title = s"Application history load error (${app.id})"
- val exception = URLEncoder.encode(Utils.exceptionString(e), "UTF-8")
- var msg = s"Exception in replaying log for application $appName!"
- logError(msg, e)
- msg = URLEncoder.encode(msg, "UTF-8")
- app.appUIUrlAtHistoryServer =
- Some(notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title")
- }(ThreadUtils.sameThread)
-
- futureUI
- }
-
/** Generate a new app ID given a app's submission date */
private def newApplicationId(submitDate: Date): String = {
val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
index a055d09767..a952cee36e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
@@ -39,6 +39,4 @@ private[master] object MasterMessages {
case object BoundPortsRequest
case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int])
-
- case class AttachCompletedRebuildUI(appId: String)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 96274958d1..8875fc2232 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -75,7 +75,11 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
</li>
<li><strong>Submit Date:</strong> {app.submitDate}</li>
<li><strong>State:</strong> {app.state}</li>
- <li><strong><a href={app.curAppUIUrl}>Application Detail UI</a></strong></li>
+ {
+ if (!app.isFinished) {
+ <li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
+ }
+ }
</ul>
</div>
</div>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala
deleted file mode 100644
index e021f1eef7..0000000000
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.master.ui
-
-import java.net.URLDecoder
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import org.apache.spark.ui.{UIUtils, WebUIPage}
-
-private[ui] class HistoryNotFoundPage(parent: MasterWebUI)
- extends WebUIPage("history/not-found") {
-
- /**
- * Render a page that conveys failure in loading application history.
- *
- * This accepts 3 HTTP parameters:
- * msg = message to display to the user
- * title = title of the page
- * exception = detailed description of the exception in loading application history (if any)
- *
- * Parameters "msg" and "exception" are assumed to be UTF-8 encoded.
- */
- def render(request: HttpServletRequest): Seq[Node] = {
- val titleParam = request.getParameter("title")
- val msgParam = request.getParameter("msg")
- val exceptionParam = request.getParameter("exception")
-
- // If no parameters are specified, assume the user did not enable event logging
- val defaultTitle = "Event logging is not enabled"
- val defaultContent =
- <div class="row-fluid">
- <div class="span12" style="font-size:14px">
- No event logs were found for this application! To
- <a href="http://spark.apache.org/docs/latest/monitoring.html">enable event logging</a>,
- set <span style="font-style:italic">spark.eventLog.enabled</span> to true and
- <span style="font-style:italic">spark.eventLog.dir</span> to the directory to which your
- event logs are written.
- </div>
- </div>
-
- val title = Option(titleParam).getOrElse(defaultTitle)
- val content = Option(msgParam)
- .map { msg => URLDecoder.decode(msg, "UTF-8") }
- .map { msg =>
- <div class="row-fluid">
- <div class="span12" style="font-size:14px">{msg}</div>
- </div> ++
- Option(exceptionParam)
- .map { e => URLDecoder.decode(e, "UTF-8") }
- .map { e => <pre>{e}</pre> }
- .getOrElse(Seq.empty)
- }.getOrElse(defaultContent)
-
- UIUtils.basicSparkPage(content, title)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index 363f4b84f8..75de3ede78 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -206,7 +206,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
{killLink}
</td>
<td>
- <a href={app.curAppUIUrl}>{app.desc.name}</a>
+ {
+ if (app.isFinished) {
+ app.desc.name
+ } else {
+ <a href={app.desc.appUiUrl}>{app.desc.name}</a>
+ }
+ }
</td>
<td>
{app.coresGranted}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index ae16ce90c8..a0727ad83f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -19,8 +19,6 @@ package org.apache.spark.deploy.master.ui
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.Logging
-import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource,
- UIRoot}
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
@@ -30,60 +28,26 @@ import org.apache.spark.ui.JettyUtils._
private[master]
class MasterWebUI(
val master: Master,
- requestedPort: Int,
- customMasterPage: Option[MasterPage] = None)
+ requestedPort: Int)
extends WebUI(master.securityMgr, master.securityMgr.getSSLOptions("standalone"),
- requestedPort, master.conf, name = "MasterUI") with Logging with UIRoot {
+ requestedPort, master.conf, name = "MasterUI") with Logging {
val masterEndpointRef = master.self
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
- val masterPage = customMasterPage.getOrElse(new MasterPage(this))
-
initialize()
/** Initialize all components of the server. */
def initialize() {
val masterPage = new MasterPage(this)
attachPage(new ApplicationPage(this))
- attachPage(new HistoryNotFoundPage(this))
attachPage(masterPage)
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
- attachHandler(ApiRootResource.getServletHandler(this))
attachHandler(createRedirectHandler(
"/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST")))
attachHandler(createRedirectHandler(
"/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST")))
}
-
- /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
- def attachSparkUI(ui: SparkUI) {
- assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
- ui.getHandlers.foreach(attachHandler)
- }
-
- /** Detach a reconstructed UI from this Master UI. Only valid after bind(). */
- def detachSparkUI(ui: SparkUI) {
- assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
- ui.getHandlers.foreach(detachHandler)
- }
-
- def getApplicationInfoList: Iterator[ApplicationInfo] = {
- val state = masterPage.getMasterState
- val activeApps = state.activeApps.sortBy(_.startTime).reverse
- val completedApps = state.completedApps.sortBy(_.endTime).reverse
- activeApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, false) } ++
- completedApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, true) }
- }
-
- def getSparkUI(appId: String): Option[SparkUI] = {
- val state = masterPage.getMasterState
- val activeApps = state.activeApps.sortBy(_.startTime).reverse
- val completedApps = state.completedApps.sortBy(_.endTime).reverse
- (activeApps ++ completedApps).find { _.id == appId }.flatMap {
- master.rebuildSparkUI
- }
- }
}
private[master] object MasterWebUI {
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
index ba9cd711f1..2cd51a9ed5 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
@@ -205,7 +205,7 @@ private[spark] object ApiRootResource {
/**
* This trait is shared by the all the root containers for application UI information --
- * the HistoryServer, the Master UI, and the application UI. This provides the common
+ * the HistoryServer and the application UI. This provides the common
* interface needed for them all to expose application info as json.
*/
private[spark] trait UIRoot {
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
index 0f30183682..02fd2985fa 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
@@ -21,7 +21,6 @@ import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam}
import javax.ws.rs.core.MediaType
import org.apache.spark.deploy.history.ApplicationHistoryInfo
-import org.apache.spark.deploy.master.{ApplicationInfo => InternalApplicationInfo}
@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class ApplicationListResource(uiRoot: UIRoot) {
@@ -84,33 +83,4 @@ private[spark] object ApplicationsListResource {
}
)
}
-
- def convertApplicationInfo(
- internal: InternalApplicationInfo,
- completed: Boolean): ApplicationInfo = {
- // standalone application info always has just one attempt
- new ApplicationInfo(
- id = internal.id,
- name = internal.desc.name,
- coresGranted = Some(internal.coresGranted),
- maxCores = internal.desc.maxCores,
- coresPerExecutor = internal.desc.coresPerExecutor,
- memoryPerExecutorMB = Some(internal.desc.memoryPerExecutorMB),
- attempts = Seq(new ApplicationAttemptInfo(
- attemptId = None,
- startTime = new Date(internal.startTime),
- endTime = new Date(internal.endTime),
- duration =
- if (internal.endTime > 0) {
- internal.endTime - internal.startTime
- } else {
- 0
- },
- lastUpdated = new Date(internal.endTime),
- sparkUser = internal.desc.user,
- completed = completed
- ))
- )
- }
-
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala
index 0c9382a92b..69a460fbc7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala
@@ -17,74 +17,96 @@
package org.apache.spark.deploy.master.ui
+import java.io.DataOutputStream
+import java.net.{HttpURLConnection, URL}
+import java.nio.charset.StandardCharsets
import java.util.Date
-import scala.io.Source
-import scala.language.postfixOps
+import scala.collection.mutable.HashMap
-import org.json4s.jackson.JsonMethods._
-import org.json4s.JsonAST.{JInt, JNothing, JString}
-import org.mockito.Mockito.{mock, when}
-import org.scalatest.BeforeAndAfter
+import org.mockito.Mockito.{mock, times, verify, when}
+import org.scalatest.BeforeAndAfterAll
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.DeployMessages.MasterStateResponse
+import org.apache.spark.deploy.DeployMessages.{KillDriverResponse, RequestKillDriver}
import org.apache.spark.deploy.DeployTestUtils._
import org.apache.spark.deploy.master._
-import org.apache.spark.rpc.RpcEnv
+import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv}
-class MasterWebUISuite extends SparkFunSuite with BeforeAndAfter {
+class MasterWebUISuite extends SparkFunSuite with BeforeAndAfterAll {
- val masterPage = mock(classOf[MasterPage])
- val master = {
- val conf = new SparkConf
- val securityMgr = new SecurityManager(conf)
- val rpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityMgr)
- val master = new Master(rpcEnv, rpcEnv.address, 0, securityMgr, conf)
- master
- }
- val masterWebUI = new MasterWebUI(master, 0, customMasterPage = Some(masterPage))
+ val conf = new SparkConf
+ val securityMgr = new SecurityManager(conf)
+ val rpcEnv = mock(classOf[RpcEnv])
+ val master = mock(classOf[Master])
+ val masterEndpointRef = mock(classOf[RpcEndpointRef])
+ when(master.securityMgr).thenReturn(securityMgr)
+ when(master.conf).thenReturn(conf)
+ when(master.rpcEnv).thenReturn(rpcEnv)
+ when(master.self).thenReturn(masterEndpointRef)
+ val masterWebUI = new MasterWebUI(master, 0)
- before {
+ override def beforeAll() {
+ super.beforeAll()
masterWebUI.bind()
}
- after {
+ override def afterAll() {
masterWebUI.stop()
+ super.afterAll()
}
- test("list applications") {
- val worker = createWorkerInfo()
+ test("kill application") {
val appDesc = createAppDesc()
// use new start date so it isn't filtered by UI
val activeApp = new ApplicationInfo(
- new Date().getTime, "id", appDesc, new Date(), null, Int.MaxValue)
- activeApp.addExecutor(worker, 2)
-
- val workers = Array[WorkerInfo](worker)
- val activeApps = Array(activeApp)
- val completedApps = Array[ApplicationInfo]()
- val activeDrivers = Array[DriverInfo]()
- val completedDrivers = Array[DriverInfo]()
- val stateResponse = new MasterStateResponse(
- "host", 8080, None, workers, activeApps, completedApps,
- activeDrivers, completedDrivers, RecoveryState.ALIVE)
-
- when(masterPage.getMasterState).thenReturn(stateResponse)
-
- val resultJson = Source.fromURL(
- s"http://localhost:${masterWebUI.boundPort}/api/v1/applications")
- .mkString
- val parsedJson = parse(resultJson)
- val firstApp = parsedJson(0)
-
- assert(firstApp \ "id" === JString(activeApp.id))
- assert(firstApp \ "name" === JString(activeApp.desc.name))
- assert(firstApp \ "coresGranted" === JInt(2))
- assert(firstApp \ "maxCores" === JInt(4))
- assert(firstApp \ "memoryPerExecutorMB" === JInt(1234))
- assert(firstApp \ "coresPerExecutor" === JNothing)
+ new Date().getTime, "app-0", appDesc, new Date(), null, Int.MaxValue)
+
+ when(master.idToApp).thenReturn(HashMap[String, ApplicationInfo]((activeApp.id, activeApp)))
+
+ val url = s"http://localhost:${masterWebUI.boundPort}/app/kill/"
+ val body = convPostDataToString(Map(("id", activeApp.id), ("terminate", "true")))
+ val conn = sendHttpRequest(url, "POST", body)
+ conn.getResponseCode
+
+ // Verify the master was called to remove the active app
+ verify(master, times(1)).removeApplication(activeApp, ApplicationState.KILLED)
+ }
+
+ test("kill driver") {
+ val activeDriverId = "driver-0"
+ val url = s"http://localhost:${masterWebUI.boundPort}/driver/kill/"
+ val body = convPostDataToString(Map(("id", activeDriverId), ("terminate", "true")))
+ val conn = sendHttpRequest(url, "POST", body)
+ conn.getResponseCode
+
+ // Verify that master was asked to kill driver with the correct id
+ verify(masterEndpointRef, times(1)).ask[KillDriverResponse](RequestKillDriver(activeDriverId))
}
+ private def convPostDataToString(data: Map[String, String]): String = {
+ (for ((name, value) <- data) yield s"$name=$value").mkString("&")
+ }
+
+ /**
+ * Send an HTTP request to the given URL using the method and the body specified.
+ * Return the connection object.
+ */
+ private def sendHttpRequest(
+ url: String,
+ method: String,
+ body: String = ""): HttpURLConnection = {
+ val conn = new URL(url).openConnection().asInstanceOf[HttpURLConnection]
+ conn.setRequestMethod(method)
+ if (body.nonEmpty) {
+ conn.setDoOutput(true)
+ conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded")
+ conn.setRequestProperty("Content-Length", Integer.toString(body.length))
+ val out = new DataOutputStream(conn.getOutputStream)
+ out.write(body.getBytes(StandardCharsets.UTF_8))
+ out.close()
+ }
+ conn
+ }
}