aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGurvinder Singh <gurvinder.singh@uninett.no>2016-09-08 17:20:20 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-09-08 17:20:20 -0700
commit92ce8d4849a0341c4636e70821b7be57ad3055b1 (patch)
tree6b02cf84575ae2a4aecd8bb9c8646efe3a41b3b9
parent722afbb2b33037a30d385a15725f2db5365bd375 (diff)
downloadspark-92ce8d4849a0341c4636e70821b7be57ad3055b1.tar.gz
spark-92ce8d4849a0341c4636e70821b7be57ad3055b1.tar.bz2
spark-92ce8d4849a0341c4636e70821b7be57ad3055b1.zip
[SPARK-15487][WEB UI] Spark Master UI to reverse proxy Application and Workers UI
## What changes were proposed in this pull request? This pull request adds the functionality to enable accessing worker and application UI through master UI itself. Thus helps in accessing SparkUI when running spark cluster in closed networks e.g. Kubernetes. Cluster admin needs to expose only spark master UI and rest of the UIs can be in the private network, master UI will reverse proxy the connection request to corresponding resource. It adds the path for workers/application UIs as WorkerUI: <http/https>://master-publicIP:<port>/target/workerID/ ApplicationUI: <http/https>://master-publicIP:<port>/target/appID/ This makes it easy for users to easily protect the Spark master cluster access by putting some reverse proxy e.g. https://github.com/bitly/oauth2_proxy ## How was this patch tested? The functionality has been tested manually and there is a unit test too for testing access to worker UI with reverse proxy address. pwendell bomeng BryanCutler can you please review it, thanks. Author: Gurvinder Singh <gurvinder.singh@uninett.no> Closes #13950 from gurvindersingh/rproxy.
-rw-r--r--core/pom.xml12
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala6
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/JettyUtils.scala85
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIUtils.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala27
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISuite.scala37
-rw-r--r--docs/configuration.md14
-rw-r--r--pom.xml14
-rw-r--r--repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala13
-rw-r--r--repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala13
16 files changed, 287 insertions, 11 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 69a0b0ff27..3c8138f974 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -126,6 +126,16 @@
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-proxy</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-client</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
<scope>compile</scope>
</dependency>
@@ -388,7 +398,7 @@
<overWriteIfNewer>true</overWriteIfNewer>
<useSubDirectoryPerType>true</useSubDirectoryPerType>
<includeArtifactIds>
- guava,jetty-io,jetty-servlet,jetty-servlets,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server,jetty-security
+ guava,jetty-io,jetty-servlet,jetty-servlets,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server,jetty-security,jetty-proxy,jetty-client
</includeArtifactIds>
<silent>true</silent>
</configuration>
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4aa795a58a..e32e4aa5b8 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -505,6 +505,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
+ if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
+ System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
+ }
_ui.foreach(_.setAppId(_applicationId))
_env.blockManager.initialize(_applicationId)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index dcf41638e7..8c91aa1516 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -114,6 +114,7 @@ private[deploy] class Master(
// Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
+ val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)
if (defaultCores < 1) {
throw new SparkException("spark.deploy.defaultCores must be positive")
}
@@ -129,6 +130,11 @@ private[deploy] class Master(
webUi = new MasterWebUI(this, webUiPort)
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
+ if (reverseProxy) {
+ masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
+ logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
+ s"Applications UIs are available at $masterWebUiUrl")
+ }
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
@@ -755,6 +761,9 @@ private[deploy] class Master(
workers += worker
idToWorker(worker.id) = worker
addressToWorker(workerAddress) = worker
+ if (reverseProxy) {
+ webUi.addProxyTargets(worker.id, worker.webUiAddress)
+ }
true
}
@@ -763,6 +772,9 @@ private[deploy] class Master(
worker.setState(WorkerState.DEAD)
idToWorker -= worker.id
addressToWorker -= worker.endpoint.address
+ if (reverseProxy) {
+ webUi.removeProxyTargets(worker.id)
+ }
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver.send(ExecutorUpdated(
@@ -810,6 +822,9 @@ private[deploy] class Master(
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
waitingApps += app
+ if (reverseProxy) {
+ webUi.addProxyTargets(app.id, app.desc.appUiUrl)
+ }
}
private def finishApplication(app: ApplicationInfo) {
@@ -823,6 +838,9 @@ private[deploy] class Master(
idToApp -= app.id
endpointToApp -= app.driver
addressToApp -= app.driver.address
+ if (reverseProxy) {
+ webUi.removeProxyTargets(app.id)
+ }
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach { a =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 8875fc2232..17c521cbf9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -77,7 +77,10 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
<li><strong>State:</strong> {app.state}</li>
{
if (!app.isFinished) {
- <li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
+ <li><strong>
+ <a href={UIUtils.makeHref(parent.master.reverseProxy,
+ app.id, app.desc.appUiUrl)}>Application Detail UI</a>
+ </strong></li>
}
}
</ul>
@@ -100,19 +103,21 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
}
private def executorRow(executor: ExecutorDesc): Seq[Node] = {
+ val workerUrlRef = UIUtils.makeHref(parent.master.reverseProxy,
+ executor.worker.id, executor.worker.webUiAddress)
<tr>
<td>{executor.id}</td>
<td>
- <a href={executor.worker.webUiAddress}>{executor.worker.id}</a>
+ <a href={workerUrlRef}>{executor.worker.id}</a>
</td>
<td>{executor.cores}</td>
<td>{executor.memory}</td>
<td>{executor.state}</td>
<td>
<a href={"%s/logPage?appId=%s&executorId=%s&logType=stdout"
- .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
+ .format(workerUrlRef, executor.application.id, executor.id)}>stdout</a>
<a href={"%s/logPage?appId=%s&executorId=%s&logType=stderr"
- .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a>
+ .format(workerUrlRef, executor.application.id, executor.id)}>stderr</a>
</td>
</tr>
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index 5ed3e39edc..3fb860582c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -176,7 +176,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
private def workerRow(worker: WorkerInfo): Seq[Node] = {
<tr>
<td>
- <a href={worker.webUiAddress}>{worker.id}</a>
+ <a href={UIUtils.makeHref(parent.master.reverseProxy,
+ worker.id, worker.webUiAddress)}>{worker.id}</a>
</td>
<td>{worker.host}:{worker.port}</td>
<td>{worker.state}</td>
@@ -210,7 +211,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
if (app.isFinished) {
app.desc.name
} else {
- <a href={app.desc.appUiUrl}>{app.desc.name}</a>
+ <a href={UIUtils.makeHref(parent.master.reverseProxy,
+ app.id, app.desc.appUiUrl)}>{app.desc.name}</a>
}
}
</td>
@@ -244,7 +246,11 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<tr>
<td>{driver.id} {killLink}</td>
<td>{driver.submitDate}</td>
- <td>{driver.worker.map(w => <a href={w.webUiAddress}>{w.id.toString}</a>).getOrElse("None")}
+ <td>{driver.worker.map(w =>
+ <a href=
+ {UIUtils.makeHref(parent.master.reverseProxy, w.id, w.webUiAddress)}>
+ {w.id.toString}</a>
+ ).getOrElse("None")}
</td>
<td>{driver.state}</td>
<td sorttable_customkey={driver.desc.cores.toString}>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index a0727ad83f..8cfd0f6829 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,6 +17,10 @@
package org.apache.spark.deploy.master.ui
+import scala.collection.mutable.HashMap
+
+import org.eclipse.jetty.servlet.ServletContextHandler
+
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.Logging
import org.apache.spark.ui.{SparkUI, WebUI}
@@ -34,6 +38,7 @@ class MasterWebUI(
val masterEndpointRef = master.self
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
+ private val proxyHandlers = new HashMap[String, ServletContextHandler]
initialize()
@@ -48,6 +53,17 @@ class MasterWebUI(
attachHandler(createRedirectHandler(
"/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST")))
}
+
+ def addProxyTargets(id: String, target: String): Unit = {
+ var endTarget = target.stripSuffix("/")
+ val handler = createProxyHandler("/proxy/" + id, endTarget)
+ attachHandler(handler)
+ proxyHandlers(id) = handler
+ }
+
+ def removeProxyTargets(id: String): Unit = {
+ proxyHandlers.remove(id).foreach(detachHandler)
+ }
}
private[master] object MasterWebUI {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 06066248ea..d4d8521cc8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -156,7 +156,11 @@ private[deploy] class ExecutorRunner(
// Add webUI log urls
val baseUrl =
- s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
+ if (conf.getBoolean("spark.ui.reverseProxy", false)) {
+ s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
+ } else {
+ s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
+ }
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 724206bf94..0bedd9a20a 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -203,6 +203,9 @@ private[deploy] class Worker(
activeMasterWebUiUrl = uiUrl
master = Some(masterRef)
connected = true
+ if (conf.getBoolean("spark.ui.reverseProxy", false)) {
+ logInfo(s"WorkerWebUI is available at $activeMasterWebUiUrl/proxy/$workerId")
+ }
// Cancel any outstanding re-registration attempts because we found a new master
cancelLastRegistrationRetry()
}
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 50283f2b74..24f3f75715 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -25,6 +25,8 @@ import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
import scala.xml.Node
+import org.eclipse.jetty.client.api.Response
+import org.eclipse.jetty.proxy.ProxyServlet
import org.eclipse.jetty.server.{Request, Server, ServerConnector}
import org.eclipse.jetty.server.handler._
import org.eclipse.jetty.servlet._
@@ -186,6 +188,47 @@ private[spark] object JettyUtils extends Logging {
contextHandler
}
+ /** Create a handler for proxying request to Workers and Application Drivers */
+ def createProxyHandler(
+ prefix: String,
+ target: String): ServletContextHandler = {
+ val servlet = new ProxyServlet {
+ override def rewriteTarget(request: HttpServletRequest): String = {
+ val rewrittenURI = createProxyURI(
+ prefix, target, request.getRequestURI(), request.getQueryString())
+ if (rewrittenURI == null) {
+ return null
+ }
+ if (!validateDestination(rewrittenURI.getHost(), rewrittenURI.getPort())) {
+ return null
+ }
+ rewrittenURI.toString()
+ }
+
+ override def filterServerResponseHeader(
+ clientRequest: HttpServletRequest,
+ serverResponse: Response,
+ headerName: String,
+ headerValue: String): String = {
+ if (headerName.equalsIgnoreCase("location")) {
+ val newHeader = createProxyLocationHeader(
+ prefix, headerValue, clientRequest, serverResponse.getRequest().getURI())
+ if (newHeader != null) {
+ return newHeader
+ }
+ }
+ super.filterServerResponseHeader(
+ clientRequest, serverResponse, headerName, headerValue)
+ }
+ }
+
+ val contextHandler = new ServletContextHandler
+ val holder = new ServletHolder(servlet)
+ contextHandler.setContextPath(prefix)
+ contextHandler.addServlet(holder, "/")
+ contextHandler
+ }
+
/** Add filters, if any, to the given list of ServletContextHandlers */
def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) {
val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim())
@@ -332,6 +375,48 @@ private[spark] object JettyUtils extends Logging {
redirectHandler
}
+ def createProxyURI(prefix: String, target: String, path: String, query: String): URI = {
+ if (!path.startsWith(prefix)) {
+ return null
+ }
+
+ val uri = new StringBuilder(target)
+ val rest = path.substring(prefix.length())
+
+ if (!rest.isEmpty()) {
+ if (!rest.startsWith("/")) {
+ uri.append("/")
+ }
+ uri.append(rest)
+ }
+
+ val rewrittenURI = URI.create(uri.toString())
+ if (query != null) {
+ return new URI(
+ rewrittenURI.getScheme(),
+ rewrittenURI.getAuthority(),
+ rewrittenURI.getPath(),
+ query,
+ rewrittenURI.getFragment()
+ ).normalize()
+ }
+ rewrittenURI.normalize()
+ }
+
+ def createProxyLocationHeader(
+ prefix: String,
+ headerValue: String,
+ clientRequest: HttpServletRequest,
+ targetUri: URI): String = {
+ val toReplace = targetUri.getScheme() + "://" + targetUri.getAuthority()
+ if (headerValue.startsWith(toReplace)) {
+ clientRequest.getScheme() + "://" + clientRequest.getHeader("host") +
+ prefix + headerValue.substring(toReplace.length())
+ } else {
+ null
+ }
+ }
+
// Create a new URI from the arguments, handling IPv6 host encoding and default ports.
private def createRedirectURI(
scheme: String, server: String, port: Int, path: String, query: String) = {
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 2b6c538485..c0d1a2220f 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -510,4 +510,16 @@ private[spark] object UIUtils extends Logging {
def getTimeZoneOffset() : Int =
TimeZone.getDefault().getOffset(System.currentTimeMillis()) / 1000 / 60
+
+ /**
+ * Return the correct Href after checking if master is running in the
+ * reverse proxy mode or not.
+ */
+ def makeHref(proxy: Boolean, id: String, origHref: String): String = {
+ if (proxy) {
+ s"/proxy/$id"
+ } else {
+ origHref
+ }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 7cbe4e342e..831a7bcb12 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -157,6 +157,33 @@ class MasterSuite extends SparkFunSuite
}
}
+ test("master/worker web ui available with reverseProxy") {
+ implicit val formats = org.json4s.DefaultFormats
+ val reverseProxyUrl = "http://localhost:8080"
+ val conf = new SparkConf()
+ conf.set("spark.ui.reverseProxy", "true")
+ conf.set("spark.ui.reverseProxyUrl", reverseProxyUrl)
+ val localCluster = new LocalSparkCluster(2, 2, 512, conf)
+ localCluster.start()
+ try {
+ eventually(timeout(5 seconds), interval(100 milliseconds)) {
+ val json = Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json")
+ .getLines().mkString("\n")
+ val JArray(workers) = (parse(json) \ "workers")
+ workers.size should be (2)
+ workers.foreach { workerSummaryJson =>
+ val JString(workerId) = workerSummaryJson \ "id"
+ val url = s"http://localhost:${localCluster.masterWebUIPort}/proxy/${workerId}/json"
+ val workerResponse = parse(Source.fromURL(url).getLines().mkString("\n"))
+ (workerResponse \ "cores").extract[Int] should be (2)
+ (workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl)
+ }
+ }
+ } finally {
+ localCluster.stop()
+ }
+ }
+
test("basic scheduling - spread out") {
basicScheduling(spreadOut = true)
}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 2b59b48d8b..dbb8dca4c8 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -18,10 +18,13 @@
package org.apache.spark.ui
import java.net.{BindException, ServerSocket}
+import java.net.URI
+import javax.servlet.http.HttpServletRequest
import scala.io.Source
import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito.{mock, when}
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
@@ -190,6 +193,40 @@ class UISuite extends SparkFunSuite {
}
}
+ test("verify proxy rewrittenURI") {
+ val prefix = "/proxy/worker-id"
+ val target = "http://localhost:8081"
+ val path = "/proxy/worker-id/json"
+ var rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, null)
+ assert(rewrittenURI.toString() === "http://localhost:8081/json")
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, "test=done")
+ assert(rewrittenURI.toString() === "http://localhost:8081/json?test=done")
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id", null)
+ assert(rewrittenURI.toString() === "http://localhost:8081")
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id/test%2F", null)
+ assert(rewrittenURI.toString() === "http://localhost:8081/test%2F")
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id/%F0%9F%98%84", null)
+ assert(rewrittenURI.toString() === "http://localhost:8081/%F0%9F%98%84")
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-noid/json", null)
+ assert(rewrittenURI === null)
+ }
+
+ test("verify rewriting location header for reverse proxy") {
+ val clientRequest = mock(classOf[HttpServletRequest])
+ var headerValue = "http://localhost:4040/jobs"
+ val prefix = "/proxy/worker-id"
+ val targetUri = URI.create("http://localhost:4040")
+ when(clientRequest.getScheme()).thenReturn("http")
+ when(clientRequest.getHeader("host")).thenReturn("localhost:8080")
+ var newHeader = JettyUtils.createProxyLocationHeader(
+ prefix, headerValue, clientRequest, targetUri)
+ assert(newHeader.toString() === "http://localhost:8080/proxy/worker-id/jobs")
+ headerValue = "http://localhost:4041/jobs"
+ newHeader = JettyUtils.createProxyLocationHeader(
+ prefix, headerValue, clientRequest, targetUri)
+ assert(newHeader === null)
+ }
+
def stopServer(info: ServerInfo): Unit = {
if (info != null && info.server != null) info.server.stop
}
diff --git a/docs/configuration.md b/docs/configuration.md
index 6e98f67b73..ebd0aa796d 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -658,6 +658,20 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td><code>spark.ui.reverseProxy</code></td>
+ <td>false</td>
+ <td>
+ Enable running Spark Master as reverse proxy for worker and application UIs. In this mode, Spark master will reverse proxy the worker and application UIs to enable access without requiring direct access to their hosts. Use it with caution, as worker and application UI will not be accessible directly, you will only be able to access them through spark master/proxy public URL. This setting affects all the workers and application UIs running in the cluster and must be set on all the workers, drivers and masters.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.ui.reverseProxyUrl</code></td>
+ <td></td>
+ <td>
+ This is the URL where your proxy is running. This URL is for proxy which is running in front of Spark Master. This is useful when running proxy for authentication e.g. OAuth proxy. Make sure this is a complete URL including scheme (http/https) and port to reach your proxy.
+ </td>
+</tr>
+<tr>
<td><code>spark.worker.ui.retainedExecutors</code></td>
<td>1000</td>
<td>
diff --git a/pom.xml b/pom.xml
index e6c28977ca..3b3ad39b47 100644
--- a/pom.xml
+++ b/pom.xml
@@ -340,6 +340,18 @@
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-proxy</artifactId>
+ <version>${jetty.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-client</artifactId>
+ <version>${jetty.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>${jetty.version}</version>
<scope>provided</scope>
@@ -2256,6 +2268,8 @@
<include>org.spark-project.spark:unused</include>
<include>org.eclipse.jetty:jetty-io</include>
<include>org.eclipse.jetty:jetty-http</include>
+ <include>org.eclipse.jetty:jetty-proxy</include>
+ <include>org.eclipse.jetty:jetty-client</include>
<include>org.eclipse.jetty:jetty-continuation</include>
<include>org.eclipse.jetty:jetty-servlet</include>
<include>org.eclipse.jetty:jetty-servlets</include>
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
index 29f63de8a0..b2a61260c2 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
@@ -126,7 +126,18 @@ private[repl] trait SparkILoopInit {
@transient val spark = org.apache.spark.repl.Main.interp.createSparkSession()
@transient val sc = {
val _sc = spark.sparkContext
- _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}"))
+ if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
+ val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
+ if (proxyUrl != null) {
+ println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
+ } else {
+ println(s"Spark Context Web UI is available at Spark Master Public URL")
+ }
+ } else {
+ _sc.uiWebUrl.foreach {
+ webUrl => println(s"Spark context Web UI available at ${webUrl}")
+ }
+ }
println("Spark context available as 'sc' " +
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
println("Spark session available as 'spark'.")
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 2707b0847a..76a66c1bea 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -43,7 +43,18 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
}
@transient val sc = {
val _sc = spark.sparkContext
- _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}"))
+ if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
+ val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
+ if (proxyUrl != null) {
+ println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
+ } else {
+ println(s"Spark Context Web UI is available at Spark Master Public URL")
+ }
+ } else {
+ _sc.uiWebUrl.foreach {
+ webUrl => println(s"Spark context Web UI available at ${webUrl}")
+ }
+ }
println("Spark context available as 'sc' " +
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
println("Spark session available as 'spark'.")