aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml12
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala6
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/JettyUtils.scala85
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIUtils.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala27
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISuite.scala37
12 files changed, 235 insertions, 9 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 69a0b0ff27..3c8138f974 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -126,6 +126,16 @@
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-proxy</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-client</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
<scope>compile</scope>
</dependency>
@@ -388,7 +398,7 @@
<overWriteIfNewer>true</overWriteIfNewer>
<useSubDirectoryPerType>true</useSubDirectoryPerType>
<includeArtifactIds>
- guava,jetty-io,jetty-servlet,jetty-servlets,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server,jetty-security
+ guava,jetty-io,jetty-servlet,jetty-servlets,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server,jetty-security,jetty-proxy,jetty-client
</includeArtifactIds>
<silent>true</silent>
</configuration>
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4aa795a58a..e32e4aa5b8 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -505,6 +505,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
+ if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
+ System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
+ }
_ui.foreach(_.setAppId(_applicationId))
_env.blockManager.initialize(_applicationId)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index dcf41638e7..8c91aa1516 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -114,6 +114,7 @@ private[deploy] class Master(
// Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
+ val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)
if (defaultCores < 1) {
throw new SparkException("spark.deploy.defaultCores must be positive")
}
@@ -129,6 +130,11 @@ private[deploy] class Master(
webUi = new MasterWebUI(this, webUiPort)
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
+ if (reverseProxy) {
+ masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
+ logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
+ s"Applications UIs are available at $masterWebUiUrl")
+ }
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
@@ -755,6 +761,9 @@ private[deploy] class Master(
workers += worker
idToWorker(worker.id) = worker
addressToWorker(workerAddress) = worker
+ if (reverseProxy) {
+ webUi.addProxyTargets(worker.id, worker.webUiAddress)
+ }
true
}
@@ -763,6 +772,9 @@ private[deploy] class Master(
worker.setState(WorkerState.DEAD)
idToWorker -= worker.id
addressToWorker -= worker.endpoint.address
+ if (reverseProxy) {
+ webUi.removeProxyTargets(worker.id)
+ }
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver.send(ExecutorUpdated(
@@ -810,6 +822,9 @@ private[deploy] class Master(
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
waitingApps += app
+ if (reverseProxy) {
+ webUi.addProxyTargets(app.id, app.desc.appUiUrl)
+ }
}
private def finishApplication(app: ApplicationInfo) {
@@ -823,6 +838,9 @@ private[deploy] class Master(
idToApp -= app.id
endpointToApp -= app.driver
addressToApp -= app.driver.address
+ if (reverseProxy) {
+ webUi.removeProxyTargets(app.id)
+ }
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach { a =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 8875fc2232..17c521cbf9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -77,7 +77,10 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
<li><strong>State:</strong> {app.state}</li>
{
if (!app.isFinished) {
- <li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
+ <li><strong>
+ <a href={UIUtils.makeHref(parent.master.reverseProxy,
+ app.id, app.desc.appUiUrl)}>Application Detail UI</a>
+ </strong></li>
}
}
</ul>
@@ -100,19 +103,21 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
}
private def executorRow(executor: ExecutorDesc): Seq[Node] = {
+ val workerUrlRef = UIUtils.makeHref(parent.master.reverseProxy,
+ executor.worker.id, executor.worker.webUiAddress)
<tr>
<td>{executor.id}</td>
<td>
- <a href={executor.worker.webUiAddress}>{executor.worker.id}</a>
+ <a href={workerUrlRef}>{executor.worker.id}</a>
</td>
<td>{executor.cores}</td>
<td>{executor.memory}</td>
<td>{executor.state}</td>
<td>
<a href={"%s/logPage?appId=%s&executorId=%s&logType=stdout"
- .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
+ .format(workerUrlRef, executor.application.id, executor.id)}>stdout</a>
<a href={"%s/logPage?appId=%s&executorId=%s&logType=stderr"
- .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a>
+ .format(workerUrlRef, executor.application.id, executor.id)}>stderr</a>
</td>
</tr>
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index 5ed3e39edc..3fb860582c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -176,7 +176,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
private def workerRow(worker: WorkerInfo): Seq[Node] = {
<tr>
<td>
- <a href={worker.webUiAddress}>{worker.id}</a>
+ <a href={UIUtils.makeHref(parent.master.reverseProxy,
+ worker.id, worker.webUiAddress)}>{worker.id}</a>
</td>
<td>{worker.host}:{worker.port}</td>
<td>{worker.state}</td>
@@ -210,7 +211,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
if (app.isFinished) {
app.desc.name
} else {
- <a href={app.desc.appUiUrl}>{app.desc.name}</a>
+ <a href={UIUtils.makeHref(parent.master.reverseProxy,
+ app.id, app.desc.appUiUrl)}>{app.desc.name}</a>
}
}
</td>
@@ -244,7 +246,11 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<tr>
<td>{driver.id} {killLink}</td>
<td>{driver.submitDate}</td>
- <td>{driver.worker.map(w => <a href={w.webUiAddress}>{w.id.toString}</a>).getOrElse("None")}
+ <td>{driver.worker.map(w =>
+ <a href=
+ {UIUtils.makeHref(parent.master.reverseProxy, w.id, w.webUiAddress)}>
+ {w.id.toString}</a>
+ ).getOrElse("None")}
</td>
<td>{driver.state}</td>
<td sorttable_customkey={driver.desc.cores.toString}>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index a0727ad83f..8cfd0f6829 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,6 +17,10 @@
package org.apache.spark.deploy.master.ui
+import scala.collection.mutable.HashMap
+
+import org.eclipse.jetty.servlet.ServletContextHandler
+
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.Logging
import org.apache.spark.ui.{SparkUI, WebUI}
@@ -34,6 +38,7 @@ class MasterWebUI(
val masterEndpointRef = master.self
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
+ private val proxyHandlers = new HashMap[String, ServletContextHandler]
initialize()
@@ -48,6 +53,17 @@ class MasterWebUI(
attachHandler(createRedirectHandler(
"/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST")))
}
+
+ def addProxyTargets(id: String, target: String): Unit = {
+ var endTarget = target.stripSuffix("/")
+ val handler = createProxyHandler("/proxy/" + id, endTarget)
+ attachHandler(handler)
+ proxyHandlers(id) = handler
+ }
+
+ def removeProxyTargets(id: String): Unit = {
+ proxyHandlers.remove(id).foreach(detachHandler)
+ }
}
private[master] object MasterWebUI {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 06066248ea..d4d8521cc8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -156,7 +156,11 @@ private[deploy] class ExecutorRunner(
// Add webUI log urls
val baseUrl =
- s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
+ if (conf.getBoolean("spark.ui.reverseProxy", false)) {
+ s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
+ } else {
+ s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
+ }
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 724206bf94..0bedd9a20a 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -203,6 +203,9 @@ private[deploy] class Worker(
activeMasterWebUiUrl = uiUrl
master = Some(masterRef)
connected = true
+ if (conf.getBoolean("spark.ui.reverseProxy", false)) {
+ logInfo(s"WorkerWebUI is available at $activeMasterWebUiUrl/proxy/$workerId")
+ }
// Cancel any outstanding re-registration attempts because we found a new master
cancelLastRegistrationRetry()
}
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 50283f2b74..24f3f75715 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -25,6 +25,8 @@ import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
import scala.xml.Node
+import org.eclipse.jetty.client.api.Response
+import org.eclipse.jetty.proxy.ProxyServlet
import org.eclipse.jetty.server.{Request, Server, ServerConnector}
import org.eclipse.jetty.server.handler._
import org.eclipse.jetty.servlet._
@@ -186,6 +188,47 @@ private[spark] object JettyUtils extends Logging {
contextHandler
}
+ /** Create a handler for proxying request to Workers and Application Drivers */
+ def createProxyHandler(
+ prefix: String,
+ target: String): ServletContextHandler = {
+ val servlet = new ProxyServlet {
+ override def rewriteTarget(request: HttpServletRequest): String = {
+ val rewrittenURI = createProxyURI(
+ prefix, target, request.getRequestURI(), request.getQueryString())
+ if (rewrittenURI == null) {
+ return null
+ }
+ if (!validateDestination(rewrittenURI.getHost(), rewrittenURI.getPort())) {
+ return null
+ }
+ rewrittenURI.toString()
+ }
+
+ override def filterServerResponseHeader(
+ clientRequest: HttpServletRequest,
+ serverResponse: Response,
+ headerName: String,
+ headerValue: String): String = {
+ if (headerName.equalsIgnoreCase("location")) {
+ val newHeader = createProxyLocationHeader(
+ prefix, headerValue, clientRequest, serverResponse.getRequest().getURI())
+ if (newHeader != null) {
+ return newHeader
+ }
+ }
+ super.filterServerResponseHeader(
+ clientRequest, serverResponse, headerName, headerValue)
+ }
+ }
+
+ val contextHandler = new ServletContextHandler
+ val holder = new ServletHolder(servlet)
+ contextHandler.setContextPath(prefix)
+ contextHandler.addServlet(holder, "/")
+ contextHandler
+ }
+
/** Add filters, if any, to the given list of ServletContextHandlers */
def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) {
val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim())
@@ -332,6 +375,48 @@ private[spark] object JettyUtils extends Logging {
redirectHandler
}
+ def createProxyURI(prefix: String, target: String, path: String, query: String): URI = {
+ if (!path.startsWith(prefix)) {
+ return null
+ }
+
+ val uri = new StringBuilder(target)
+ val rest = path.substring(prefix.length())
+
+ if (!rest.isEmpty()) {
+ if (!rest.startsWith("/")) {
+ uri.append("/")
+ }
+ uri.append(rest)
+ }
+
+ val rewrittenURI = URI.create(uri.toString())
+ if (query != null) {
+ return new URI(
+ rewrittenURI.getScheme(),
+ rewrittenURI.getAuthority(),
+ rewrittenURI.getPath(),
+ query,
+ rewrittenURI.getFragment()
+ ).normalize()
+ }
+ rewrittenURI.normalize()
+ }
+
+ def createProxyLocationHeader(
+ prefix: String,
+ headerValue: String,
+ clientRequest: HttpServletRequest,
+ targetUri: URI): String = {
+ val toReplace = targetUri.getScheme() + "://" + targetUri.getAuthority()
+ if (headerValue.startsWith(toReplace)) {
+ clientRequest.getScheme() + "://" + clientRequest.getHeader("host") +
+ prefix + headerValue.substring(toReplace.length())
+ } else {
+ null
+ }
+ }
+
// Create a new URI from the arguments, handling IPv6 host encoding and default ports.
private def createRedirectURI(
scheme: String, server: String, port: Int, path: String, query: String) = {
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 2b6c538485..c0d1a2220f 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -510,4 +510,16 @@ private[spark] object UIUtils extends Logging {
def getTimeZoneOffset() : Int =
TimeZone.getDefault().getOffset(System.currentTimeMillis()) / 1000 / 60
+
+ /**
+ * Return the correct Href after checking if master is running in the
+ * reverse proxy mode or not.
+ */
+ def makeHref(proxy: Boolean, id: String, origHref: String): String = {
+ if (proxy) {
+ s"/proxy/$id"
+ } else {
+ origHref
+ }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 7cbe4e342e..831a7bcb12 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -157,6 +157,33 @@ class MasterSuite extends SparkFunSuite
}
}
+ test("master/worker web ui available with reverseProxy") {
+ implicit val formats = org.json4s.DefaultFormats
+ val reverseProxyUrl = "http://localhost:8080"
+ val conf = new SparkConf()
+ conf.set("spark.ui.reverseProxy", "true")
+ conf.set("spark.ui.reverseProxyUrl", reverseProxyUrl)
+ val localCluster = new LocalSparkCluster(2, 2, 512, conf)
+ localCluster.start()
+ try {
+ eventually(timeout(5 seconds), interval(100 milliseconds)) {
+ val json = Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json")
+ .getLines().mkString("\n")
+ val JArray(workers) = (parse(json) \ "workers")
+ workers.size should be (2)
+ workers.foreach { workerSummaryJson =>
+ val JString(workerId) = workerSummaryJson \ "id"
+ val url = s"http://localhost:${localCluster.masterWebUIPort}/proxy/${workerId}/json"
+ val workerResponse = parse(Source.fromURL(url).getLines().mkString("\n"))
+ (workerResponse \ "cores").extract[Int] should be (2)
+ (workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl)
+ }
+ }
+ } finally {
+ localCluster.stop()
+ }
+ }
+
test("basic scheduling - spread out") {
basicScheduling(spreadOut = true)
}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 2b59b48d8b..dbb8dca4c8 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -18,10 +18,13 @@
package org.apache.spark.ui
import java.net.{BindException, ServerSocket}
+import java.net.URI
+import javax.servlet.http.HttpServletRequest
import scala.io.Source
import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito.{mock, when}
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
@@ -190,6 +193,40 @@ class UISuite extends SparkFunSuite {
}
}
+ test("verify proxy rewrittenURI") {
+ val prefix = "/proxy/worker-id"
+ val target = "http://localhost:8081"
+ val path = "/proxy/worker-id/json"
+ var rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, null)
+ assert(rewrittenURI.toString() === "http://localhost:8081/json")
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, "test=done")
+ assert(rewrittenURI.toString() === "http://localhost:8081/json?test=done")
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id", null)
+ assert(rewrittenURI.toString() === "http://localhost:8081")
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id/test%2F", null)
+ assert(rewrittenURI.toString() === "http://localhost:8081/test%2F")
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id/%F0%9F%98%84", null)
+ assert(rewrittenURI.toString() === "http://localhost:8081/%F0%9F%98%84")
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-noid/json", null)
+ assert(rewrittenURI === null)
+ }
+
+ test("verify rewriting location header for reverse proxy") {
+ val clientRequest = mock(classOf[HttpServletRequest])
+ var headerValue = "http://localhost:4040/jobs"
+ val prefix = "/proxy/worker-id"
+ val targetUri = URI.create("http://localhost:4040")
+ when(clientRequest.getScheme()).thenReturn("http")
+ when(clientRequest.getHeader("host")).thenReturn("localhost:8080")
+ var newHeader = JettyUtils.createProxyLocationHeader(
+ prefix, headerValue, clientRequest, targetUri)
+ assert(newHeader.toString() === "http://localhost:8080/proxy/worker-id/jobs")
+ headerValue = "http://localhost:4041/jobs"
+ newHeader = JettyUtils.createProxyLocationHeader(
+ prefix, headerValue, clientRequest, targetUri)
+ assert(newHeader === null)
+ }
+
def stopServer(info: ServerInfo): Unit = {
if (info != null && info.server != null) info.server.stop
}