aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorscwf <wangfei1@huawei.com>2016-01-19 14:49:55 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2016-01-19 14:49:55 -0800
commit43f1d59e17d89d19b322d639c5069a3fc0c8e2ed (patch)
tree9a06aeeeb1345364957ae64bdabedb8381960068 /core
parentefd7eed3222799d66d4fcb68785142dc570c8150 (diff)
downloadspark-43f1d59e17d89d19b322d639c5069a3fc0c8e2ed.tar.gz
spark-43f1d59e17d89d19b322d639c5069a3fc0c8e2ed.tar.bz2
spark-43f1d59e17d89d19b322d639c5069a3fc0c8e2ed.zip
[SPARK-2750][WEB UI] Add https support to the Web UI
Author: scwf <wangfei1@huawei.com> Author: Marcelo Vanzin <vanzin@cloudera.com> Author: WangTaoTheTonic <wangtao111@huawei.com> Author: w00228970 <wangfei1@huawei.com> Closes #10238 from vanzin/SPARK-2750.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SSLOptions.scala50
-rw-r--r--core/src/main/scala/org/apache/spark/SecurityManager.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala2
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/JettyUtils.scala79
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/WebUI.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala2
-rw-r--r--core/src/test/resources/spark.keystorebin0 -> 1383 bytes
-rw-r--r--core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISuite.scala135
20 files changed, 273 insertions, 87 deletions
diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
index 3b9c885bf9..261265f0b4 100644
--- a/core/src/main/scala/org/apache/spark/SSLOptions.scala
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -39,8 +39,11 @@ import org.eclipse.jetty.util.ssl.SslContextFactory
* @param keyStore a path to the key-store file
* @param keyStorePassword a password to access the key-store file
* @param keyPassword a password to access the private key in the key-store
+ * @param keyStoreType the type of the key-store
+ * @param needClientAuth set true if SSL needs client authentication
* @param trustStore a path to the trust-store file
* @param trustStorePassword a password to access the trust-store file
+ * @param trustStoreType the type of the trust-store
* @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
* @param enabledAlgorithms a set of encryption algorithms that may be used
*/
@@ -49,8 +52,11 @@ private[spark] case class SSLOptions(
keyStore: Option[File] = None,
keyStorePassword: Option[String] = None,
keyPassword: Option[String] = None,
+ keyStoreType: Option[String] = None,
+ needClientAuth: Boolean = false,
trustStore: Option[File] = None,
trustStorePassword: Option[String] = None,
+ trustStoreType: Option[String] = None,
protocol: Option[String] = None,
enabledAlgorithms: Set[String] = Set.empty)
extends Logging {
@@ -63,12 +69,18 @@ private[spark] case class SSLOptions(
val sslContextFactory = new SslContextFactory()
keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath))
- trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
keyStorePassword.foreach(sslContextFactory.setKeyStorePassword)
- trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
+ keyStoreType.foreach(sslContextFactory.setKeyStoreType)
+ if (needClientAuth) {
+ trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
+ trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
+ trustStoreType.foreach(sslContextFactory.setTrustStoreType)
+ }
protocol.foreach(sslContextFactory.setProtocol)
- sslContextFactory.setIncludeCipherSuites(supportedAlgorithms.toSeq: _*)
+ if (supportedAlgorithms.nonEmpty) {
+ sslContextFactory.setIncludeCipherSuites(supportedAlgorithms.toSeq: _*)
+ }
Some(sslContextFactory)
} else {
@@ -82,6 +94,13 @@ private[spark] case class SSLOptions(
*/
def createAkkaConfig: Option[Config] = {
if (enabled) {
+ if (keyStoreType.isDefined) {
+ logWarning("Akka configuration does not support key store type.");
+ }
+ if (trustStoreType.isDefined) {
+ logWarning("Akka configuration does not support trust store type.");
+ }
+
Some(ConfigFactory.empty()
.withValue("akka.remote.netty.tcp.security.key-store",
ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
@@ -110,7 +129,9 @@ private[spark] case class SSLOptions(
* The supportedAlgorithms set is a subset of the enabledAlgorithms that
* are supported by the current Java security provider for this protocol.
*/
- private val supportedAlgorithms: Set[String] = {
+ private val supportedAlgorithms: Set[String] = if (enabledAlgorithms.isEmpty) {
+ Set()
+ } else {
var context: SSLContext = null
try {
context = SSLContext.getInstance(protocol.orNull)
@@ -133,7 +154,11 @@ private[spark] case class SSLOptions(
logDebug(s"Discarding unsupported cipher $cipher")
}
- enabledAlgorithms & providerAlgorithms
+ val supported = enabledAlgorithms & providerAlgorithms
+ require(supported.nonEmpty || sys.env.contains("SPARK_TESTING"),
+ "SSLContext does not support any of the enabled algorithms: " +
+ enabledAlgorithms.mkString(","))
+ supported
}
/** Returns a string representation of this SSLOptions with all the passwords masked. */
@@ -153,9 +178,12 @@ private[spark] object SSLOptions extends Logging {
* $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
* $ - `[ns].keyStorePassword` - a password to the key-store file
* $ - `[ns].keyPassword` - a password to the private key
+ * $ - `[ns].keyStoreType` - the type of the key-store
+ * $ - `[ns].needClientAuth` - whether SSL needs client authentication
* $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
* directory
* $ - `[ns].trustStorePassword` - a password to the trust-store file
+ * $ - `[ns].trustStoreType` - the type of trust-store
* $ - `[ns].protocol` - a protocol name supported by a particular Java version
* $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
*
@@ -183,12 +211,21 @@ private[spark] object SSLOptions extends Logging {
val keyPassword = conf.getOption(s"$ns.keyPassword")
.orElse(defaults.flatMap(_.keyPassword))
+ val keyStoreType = conf.getOption(s"$ns.keyStoreType")
+ .orElse(defaults.flatMap(_.keyStoreType))
+
+ val needClientAuth =
+ conf.getBoolean(s"$ns.needClientAuth", defaultValue = defaults.exists(_.needClientAuth))
+
val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_))
.orElse(defaults.flatMap(_.trustStore))
val trustStorePassword = conf.getOption(s"$ns.trustStorePassword")
.orElse(defaults.flatMap(_.trustStorePassword))
+ val trustStoreType = conf.getOption(s"$ns.trustStoreType")
+ .orElse(defaults.flatMap(_.trustStoreType))
+
val protocol = conf.getOption(s"$ns.protocol")
.orElse(defaults.flatMap(_.protocol))
@@ -202,8 +239,11 @@ private[spark] object SSLOptions extends Logging {
keyStore,
keyStorePassword,
keyPassword,
+ keyStoreType,
+ needClientAuth,
trustStore,
trustStorePassword,
+ trustStoreType,
protocol,
enabledAlgorithms)
}
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 64e483e384..c5aec05c03 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -244,14 +244,8 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
// the default SSL configuration - it will be used by all communication layers unless overwritten
private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)
- // SSL configuration for different communication layers - they can override the default
- // configuration at a specified namespace. The namespace *must* start with spark.ssl.
- val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions))
- val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions))
-
- logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions")
- logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions")
-
+ // SSL configuration for the file server. This is used by Utils.setupSecureURLConnection().
+ val fileServerSSLOptions = getSSLOptions("fs")
val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
val trustStoreManagers =
for (trustStore <- fileServerSSLOptions.trustStore) yield {
@@ -292,6 +286,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
(None, None)
}
+ def getSSLOptions(module: String): SSLOptions = {
+ val opts = SSLOptions.parse(sparkConf, s"spark.ssl.$module", Some(defaultSSLOptions))
+ logDebug(s"Created SSL options for $module: $opts")
+ opts
+ }
+
/**
* Split a comma separated String, filter out any empty items, and return a Set of strings
*/
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 3feb7cea59..3e78c7ae24 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -41,8 +41,7 @@ private[deploy] object DeployMessages {
worker: RpcEndpointRef,
cores: Int,
memory: Int,
- webUiPort: Int,
- publicAddress: String)
+ workerWebUiUrl: String)
extends DeployMessage {
Utils.checkHost(host, "Required hostname")
assert (port > 0)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 96007a06e3..1f13d7db34 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -49,7 +49,8 @@ class HistoryServer(
provider: ApplicationHistoryProvider,
securityManager: SecurityManager,
port: Int)
- extends WebUI(securityManager, port, conf) with Logging with UIRoot {
+ extends WebUI(securityManager, securityManager.getSSLOptions("historyServer"), port, conf)
+ with Logging with UIRoot {
// How many applications to retain
private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
@@ -233,7 +234,7 @@ object HistoryServer extends Logging {
val UI_PATH_PREFIX = "/history"
- def main(argStrings: Array[String]) {
+ def main(argStrings: Array[String]): Unit = {
Utils.initDaemon(log)
new HistoryServerArguments(conf, argStrings)
initSecurity()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 0deab8ddd5..202a1b787c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -383,7 +383,7 @@ private[deploy] class Master(
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
- id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
+ id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
@@ -392,7 +392,7 @@ private[deploy] class Master(
context.reply(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
- workerRef, workerUiPort, publicAddress)
+ workerRef, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
context.reply(RegisteredWorker(self, masterWebUiUrl))
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index f751966605..4e20c10fd1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -29,8 +29,7 @@ private[spark] class WorkerInfo(
val cores: Int,
val memory: Int,
val endpoint: RpcEndpointRef,
- val webUiPort: Int,
- val publicAddress: String)
+ val webUiAddress: String)
extends Serializable {
Utils.checkHost(host, "Expected hostname")
@@ -98,10 +97,6 @@ private[spark] class WorkerInfo(
coresUsed -= driver.desc.cores
}
- def webUiAddress : String = {
- "http://" + this.publicAddress + ":" + this.webUiPort
- }
-
def setState(state: WorkerState.Value): Unit = {
this.state = state
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 750ef0a962..d7543926f3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -32,8 +32,8 @@ class MasterWebUI(
val master: Master,
requestedPort: Int,
customMasterPage: Option[MasterPage] = None)
- extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging
- with UIRoot {
+ extends WebUI(master.securityMgr, master.securityMgr.getSSLOptions("standalone"),
+ requestedPort, master.conf, name = "MasterUI") with Logging with UIRoot {
val masterEndpointRef = master.self
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
index da9740bb41..baad098a0c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
@@ -31,7 +31,7 @@ private[spark] class MesosClusterUI(
conf: SparkConf,
dispatcherPublicAddress: String,
val scheduler: MesosClusterScheduler)
- extends WebUI(securityManager, port, conf) {
+ extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) {
initialize()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 98e17da489..179d3b9f20 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -100,6 +100,7 @@ private[deploy] class Worker(
private var master: Option[RpcEndpointRef] = None
private var activeMasterUrl: String = ""
private[worker] var activeMasterWebUiUrl : String = ""
+ private var workerWebUiUrl: String = ""
private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString
private var registered = false
private var connected = false
@@ -184,6 +185,9 @@ private[deploy] class Worker(
shuffleService.startIfEnabled()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
+
+ val scheme = if (webUi.sslOptions.enabled) "https" else "http"
+ workerWebUiUrl = s"$scheme://$publicAddress:${webUi.boundPort}"
registerWithMaster()
metricsSystem.registerSource(workerSource)
@@ -336,7 +340,7 @@ private[deploy] class Worker(
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
- workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
+ workerId, host, port, self, cores, memory, workerWebUiUrl))
.onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 1a0598e50d..b45b682494 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -34,7 +34,8 @@ class WorkerWebUI(
val worker: Worker,
val workDir: File,
requestedPort: Int)
- extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
+ extends WebUI(worker.securityMgr, worker.securityMgr.getSSLOptions("standalone"),
+ requestedPort, worker.conf, name = "WorkerUI")
with Logging {
private[ui] val timeout = RpcUtils.askRpcTimeout(worker.conf)
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index b796a44fe0..bc143b7de3 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -17,21 +17,24 @@
package org.apache.spark.ui
-import java.net.{InetSocketAddress, URL}
+import java.net.{URI, URL}
import javax.servlet.DispatcherType
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
+import scala.collection.mutable.{ArrayBuffer, StringBuilder}
import scala.language.implicitConversions
import scala.xml.Node
-import org.eclipse.jetty.server.Server
+import org.eclipse.jetty.server.{Connector, Request, Server}
import org.eclipse.jetty.server.handler._
+import org.eclipse.jetty.server.nio.SelectChannelConnector
+import org.eclipse.jetty.server.ssl.SslSelectChannelConnector
import org.eclipse.jetty.servlet._
import org.eclipse.jetty.util.thread.QueuedThreadPool
import org.json4s.JValue
import org.json4s.jackson.JsonMethods.{pretty, render}
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SSLOptions}
import org.apache.spark.util.Utils
/**
@@ -224,23 +227,51 @@ private[spark] object JettyUtils extends Logging {
def startJettyServer(
hostName: String,
port: Int,
+ sslOptions: SSLOptions,
handlers: Seq[ServletContextHandler],
conf: SparkConf,
serverName: String = ""): ServerInfo = {
+ val collection = new ContextHandlerCollection
addFilters(handlers, conf)
- val collection = new ContextHandlerCollection
val gzipHandlers = handlers.map { h =>
val gzipHandler = new GzipHandler
gzipHandler.setHandler(h)
gzipHandler
}
- collection.setHandlers(gzipHandlers.toArray)
// Bind to the given port, or throw a java.net.BindException if the port is occupied
def connect(currentPort: Int): (Server, Int) = {
- val server = new Server(new InetSocketAddress(hostName, currentPort))
+ val server = new Server
+ val connectors = new ArrayBuffer[Connector]
+ // Create a connector on port currentPort to listen for HTTP requests
+ val httpConnector = new SelectChannelConnector()
+ httpConnector.setPort(currentPort)
+ connectors += httpConnector
+
+ sslOptions.createJettySslContextFactory().foreach { factory =>
+ // If the new port wraps around, do not try a privileged port.
+ val securePort =
+ if (currentPort != 0) {
+ (currentPort + 400 - 1024) % (65536 - 1024) + 1024
+ } else {
+ 0
+ }
+ val scheme = "https"
+ // Create a connector on port securePort to listen for HTTPS requests
+ val connector = new SslSelectChannelConnector(factory)
+ connector.setPort(securePort)
+ connectors += connector
+
+ // redirect the HTTP requests to HTTPS port
+ collection.addHandler(createRedirectHttpsHandler(securePort, scheme))
+ }
+
+ gzipHandlers.foreach(collection.addHandler)
+ connectors.foreach(_.setHost(hostName))
+ server.setConnectors(connectors.toArray)
+
val pool = new QueuedThreadPool
pool.setDaemon(true)
server.setThreadPool(pool)
@@ -262,6 +293,42 @@ private[spark] object JettyUtils extends Logging {
val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, conf, serverName)
ServerInfo(server, boundPort, collection)
}
+
+ private def createRedirectHttpsHandler(securePort: Int, scheme: String): ContextHandler = {
+ val redirectHandler: ContextHandler = new ContextHandler
+ redirectHandler.setContextPath("/")
+ redirectHandler.setHandler(new AbstractHandler {
+ override def handle(
+ target: String,
+ baseRequest: Request,
+ request: HttpServletRequest,
+ response: HttpServletResponse): Unit = {
+ if (baseRequest.isSecure) {
+ return
+ }
+ val httpsURI = createRedirectURI(scheme, baseRequest.getServerName, securePort,
+ baseRequest.getRequestURI, baseRequest.getQueryString)
+ response.setContentLength(0)
+ response.encodeRedirectURL(httpsURI)
+ response.sendRedirect(httpsURI)
+ baseRequest.setHandled(true)
+ }
+ })
+ redirectHandler
+ }
+
+ // Create a new URI from the arguments, handling IPv6 host encoding and default ports.
+ private def createRedirectURI(
+ scheme: String, server: String, port: Int, path: String, query: String) = {
+ val redirectServer = if (server.contains(":") && !server.startsWith("[")) {
+ s"[${server}]"
+ } else {
+ server
+ }
+ val authority = s"$redirectServer:$port"
+ new URI(scheme, authority, path, query, null).toString
+ }
+
}
private[spark] case class ServerInfo(
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index e319937702..eb53aa8e23 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -50,7 +50,8 @@ private[spark] class SparkUI private (
var appName: String,
val basePath: String,
val startTime: Long)
- extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
+ extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf),
+ conf, basePath, "SparkUI")
with Logging
with UIRoot {
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 3925235984..fe4949b9f6 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -26,7 +26,7 @@ import scala.xml.Node
import org.eclipse.jetty.servlet.ServletContextHandler
import org.json4s.JsonAST.{JNothing, JValue}
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SSLOptions}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
@@ -38,6 +38,7 @@ import org.apache.spark.util.Utils
*/
private[spark] abstract class WebUI(
val securityManager: SecurityManager,
+ val sslOptions: SSLOptions,
port: Int,
conf: SparkConf,
basePath: String = "",
@@ -133,7 +134,7 @@ private[spark] abstract class WebUI(
def bind() {
assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className))
try {
- serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name))
+ serverInfo = Some(startJettyServer("0.0.0.0", port, sslOptions, handlers, conf, name))
logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
} catch {
case e: Exception =>
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index f2d93edd4f..3f4ac9b2f1 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -86,7 +86,7 @@ private[spark] object AkkaUtils extends Logging {
val secureCookie = if (isAuthOn) secretKey else ""
logDebug(s"In createActorSystem, requireCookie is: $requireCookie")
- val akkaSslConfig = securityManager.akkaSSLOptions.createAkkaConfig
+ val akkaSslConfig = securityManager.getSSLOptions("akka").createAkkaConfig
.getOrElse(ConfigFactory.empty())
val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap.asJava)
diff --git a/core/src/test/resources/spark.keystore b/core/src/test/resources/spark.keystore
new file mode 100644
index 0000000000..f30716b57b
--- /dev/null
+++ b/core/src/test/resources/spark.keystore
Binary files differ
diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
index e0226803bb..7603cef773 100644
--- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
@@ -181,9 +181,10 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
"SSL_DHE_RSA_WITH_AES_128_CBC_SHA256")
val securityManager = new SecurityManager(conf)
+ val akkaSSLOptions = securityManager.getSSLOptions("akka")
assert(securityManager.fileServerSSLOptions.enabled === true)
- assert(securityManager.akkaSSLOptions.enabled === true)
+ assert(akkaSSLOptions.enabled === true)
assert(securityManager.sslSocketFactory.isDefined === true)
assert(securityManager.hostnameVerifier.isDefined === true)
@@ -198,15 +199,15 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
assert(securityManager.fileServerSSLOptions.protocol === Some("TLSv1.2"))
assert(securityManager.fileServerSSLOptions.enabledAlgorithms === expectedAlgorithms)
- assert(securityManager.akkaSSLOptions.trustStore.isDefined === true)
- assert(securityManager.akkaSSLOptions.trustStore.get.getName === "truststore")
- assert(securityManager.akkaSSLOptions.keyStore.isDefined === true)
- assert(securityManager.akkaSSLOptions.keyStore.get.getName === "keystore")
- assert(securityManager.akkaSSLOptions.trustStorePassword === Some("password"))
- assert(securityManager.akkaSSLOptions.keyStorePassword === Some("password"))
- assert(securityManager.akkaSSLOptions.keyPassword === Some("password"))
- assert(securityManager.akkaSSLOptions.protocol === Some("TLSv1.2"))
- assert(securityManager.akkaSSLOptions.enabledAlgorithms === expectedAlgorithms)
+ assert(akkaSSLOptions.trustStore.isDefined === true)
+ assert(akkaSSLOptions.trustStore.get.getName === "truststore")
+ assert(akkaSSLOptions.keyStore.isDefined === true)
+ assert(akkaSSLOptions.keyStore.get.getName === "keystore")
+ assert(akkaSSLOptions.trustStorePassword === Some("password"))
+ assert(akkaSSLOptions.keyStorePassword === Some("password"))
+ assert(akkaSSLOptions.keyPassword === Some("password"))
+ assert(akkaSSLOptions.protocol === Some("TLSv1.2"))
+ assert(akkaSSLOptions.enabledAlgorithms === expectedAlgorithms)
}
test("ssl off setup") {
@@ -218,7 +219,6 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
val securityManager = new SecurityManager(conf)
assert(securityManager.fileServerSSLOptions.enabled === false)
- assert(securityManager.akkaSSLOptions.enabled === false)
assert(securityManager.sslSocketFactory.isDefined === false)
assert(securityManager.hostnameVerifier.isDefined === false)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
index 86455a13d0..190e4dd728 100644
--- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
@@ -50,7 +50,7 @@ private[deploy] object DeployTestUtils {
createDriverDesc(), new Date())
def createWorkerInfo(): WorkerInfo = {
- val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
+ val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80")
workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
workerInfo
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 10e33a32ba..ce00807ea4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -90,8 +90,7 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
cores = 0,
memory = 0,
endpoint = null,
- webUiPort = 0,
- publicAddress = ""
+ webUiAddress = "http://localhost:80"
)
val (rpcEnv, _, _) =
@@ -376,7 +375,7 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = {
val workerId = System.currentTimeMillis.toString
- new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, "address")
+ new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, "http://localhost:80")
}
private def scheduleExecutorsOnWorkers(
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
index b4deed7f87..62fe0eaedf 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
@@ -88,9 +88,7 @@ class PersistenceEngineSuite extends SparkFunSuite {
cores = 0,
memory = 0,
endpoint = workerEndpoint,
- webUiPort = 0,
- publicAddress = ""
- )
+ webUiAddress = "http://localhost:80")
persistenceEngine.addWorker(workerToPersist)
@@ -109,8 +107,7 @@ class PersistenceEngineSuite extends SparkFunSuite {
assert(workerToPersist.cores === recoveryWorkerInfo.cores)
assert(workerToPersist.memory === recoveryWorkerInfo.memory)
assert(workerToPersist.endpoint === recoveryWorkerInfo.endpoint)
- assert(workerToPersist.webUiPort === recoveryWorkerInfo.webUiPort)
- assert(workerToPersist.publicAddress === recoveryWorkerInfo.publicAddress)
+ assert(workerToPersist.webUiAddress === recoveryWorkerInfo.webUiAddress)
} finally {
testRpcEnv.shutdown()
testRpcEnv.awaitTermination()
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 2d28b67ef2..69c46058f1 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -17,16 +17,16 @@
package org.apache.spark.ui
-import java.net.ServerSocket
+import java.net.{BindException, ServerSocket}
import scala.io.Source
-import scala.util.{Failure, Success, Try}
+import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.ServletContextHandler
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark._
import org.apache.spark.LocalSparkContext._
class UISuite extends SparkFunSuite {
@@ -45,6 +45,20 @@ class UISuite extends SparkFunSuite {
sc
}
+ private def sslDisabledConf(): (SparkConf, SSLOptions) = {
+ val conf = new SparkConf
+ (conf, new SecurityManager(conf).getSSLOptions("ui"))
+ }
+
+ private def sslEnabledConf(): (SparkConf, SSLOptions) = {
+ val conf = new SparkConf()
+ .set("spark.ssl.ui.enabled", "true")
+ .set("spark.ssl.ui.keyStore", "./src/test/resources/spark.keystore")
+ .set("spark.ssl.ui.keyStorePassword", "123456")
+ .set("spark.ssl.ui.keyPassword", "123456")
+ (conf, new SecurityManager(conf).getSSLOptions("ui"))
+ }
+
ignore("basic ui visibility") {
withSpark(newSparkContext()) { sc =>
// test if the ui is visible, and all the expected tabs are visible
@@ -70,33 +84,92 @@ class UISuite extends SparkFunSuite {
}
test("jetty selects different port under contention") {
- val server = new ServerSocket(0)
- val startPort = server.getLocalPort
- val serverInfo1 = JettyUtils.startJettyServer(
- "0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf)
- val serverInfo2 = JettyUtils.startJettyServer(
- "0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf)
- // Allow some wiggle room in case ports on the machine are under contention
- val boundPort1 = serverInfo1.boundPort
- val boundPort2 = serverInfo2.boundPort
- assert(boundPort1 != startPort)
- assert(boundPort2 != startPort)
- assert(boundPort1 != boundPort2)
- serverInfo1.server.stop()
- serverInfo2.server.stop()
- server.close()
+ var server: ServerSocket = null
+ var serverInfo1: ServerInfo = null
+ var serverInfo2: ServerInfo = null
+ val (conf, sslOptions) = sslDisabledConf()
+ try {
+ server = new ServerSocket(0)
+ val startPort = server.getLocalPort
+ serverInfo1 = JettyUtils.startJettyServer(
+ "0.0.0.0", startPort, sslOptions, Seq[ServletContextHandler](), conf)
+ serverInfo2 = JettyUtils.startJettyServer(
+ "0.0.0.0", startPort, sslOptions, Seq[ServletContextHandler](), conf)
+ // Allow some wiggle room in case ports on the machine are under contention
+ val boundPort1 = serverInfo1.boundPort
+ val boundPort2 = serverInfo2.boundPort
+ assert(boundPort1 != startPort)
+ assert(boundPort2 != startPort)
+ assert(boundPort1 != boundPort2)
+ } finally {
+ stopServer(serverInfo1)
+ stopServer(serverInfo2)
+ closeSocket(server)
+ }
+ }
+
+ test("jetty with https selects different port under contention") {
+ var server: ServerSocket = null
+ var serverInfo1: ServerInfo = null
+ var serverInfo2: ServerInfo = null
+ try {
+ server = new ServerSocket(0)
+ val startPort = server.getLocalPort
+ val (conf, sslOptions) = sslEnabledConf()
+ serverInfo1 = JettyUtils.startJettyServer(
+ "0.0.0.0", startPort, sslOptions, Seq[ServletContextHandler](), conf, "server1")
+ serverInfo2 = JettyUtils.startJettyServer(
+ "0.0.0.0", startPort, sslOptions, Seq[ServletContextHandler](), conf, "server2")
+ // Allow some wiggle room in case ports on the machine are under contention
+ val boundPort1 = serverInfo1.boundPort
+ val boundPort2 = serverInfo2.boundPort
+ assert(boundPort1 != startPort)
+ assert(boundPort2 != startPort)
+ assert(boundPort1 != boundPort2)
+ } finally {
+ stopServer(serverInfo1)
+ stopServer(serverInfo2)
+ closeSocket(server)
+ }
}
test("jetty binds to port 0 correctly") {
- val serverInfo = JettyUtils.startJettyServer(
- "0.0.0.0", 0, Seq[ServletContextHandler](), new SparkConf)
- val server = serverInfo.server
- val boundPort = serverInfo.boundPort
- assert(server.getState === "STARTED")
- assert(boundPort != 0)
- Try { new ServerSocket(boundPort) } match {
- case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort))
- case Failure(e) =>
+ var socket: ServerSocket = null
+ var serverInfo: ServerInfo = null
+ val (conf, sslOptions) = sslDisabledConf()
+ try {
+ serverInfo = JettyUtils.startJettyServer(
+ "0.0.0.0", 0, sslOptions, Seq[ServletContextHandler](), conf)
+ val server = serverInfo.server
+ val boundPort = serverInfo.boundPort
+ assert(server.getState === "STARTED")
+ assert(boundPort != 0)
+ intercept[BindException] {
+ socket = new ServerSocket(boundPort)
+ }
+ } finally {
+ stopServer(serverInfo)
+ closeSocket(socket)
+ }
+ }
+
+ test("jetty with https binds to port 0 correctly") {
+ var socket: ServerSocket = null
+ var serverInfo: ServerInfo = null
+ try {
+ val (conf, sslOptions) = sslEnabledConf()
+ serverInfo = JettyUtils.startJettyServer(
+ "0.0.0.0", 0, sslOptions, Seq[ServletContextHandler](), conf)
+ val server = serverInfo.server
+ val boundPort = serverInfo.boundPort
+ assert(server.getState === "STARTED")
+ assert(boundPort != 0)
+ intercept[BindException] {
+ socket = new ServerSocket(boundPort)
+ }
+ } finally {
+ stopServer(serverInfo)
+ closeSocket(socket)
}
}
@@ -117,4 +190,12 @@ class UISuite extends SparkFunSuite {
assert(splitUIAddress(2).toInt == boundPort)
}
}
+
+ def stopServer(info: ServerInfo): Unit = {
+ if (info != null && info.server != null) info.server.stop
+ }
+
+ def closeSocket(socket: ServerSocket): Unit = {
+ if (socket != null) socket.close
+ }
}