aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/SSLOptions.scala50
-rw-r--r--core/src/main/scala/org/apache/spark/SecurityManager.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala2
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/JettyUtils.scala79
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/WebUI.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala2
14 files changed, 149 insertions, 40 deletions
diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
index 3b9c885bf9..261265f0b4 100644
--- a/core/src/main/scala/org/apache/spark/SSLOptions.scala
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -39,8 +39,11 @@ import org.eclipse.jetty.util.ssl.SslContextFactory
* @param keyStore a path to the key-store file
* @param keyStorePassword a password to access the key-store file
* @param keyPassword a password to access the private key in the key-store
+ * @param keyStoreType the type of the key-store
+ * @param needClientAuth set true if SSL needs client authentication
* @param trustStore a path to the trust-store file
* @param trustStorePassword a password to access the trust-store file
+ * @param trustStoreType the type of the trust-store
* @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
* @param enabledAlgorithms a set of encryption algorithms that may be used
*/
@@ -49,8 +52,11 @@ private[spark] case class SSLOptions(
keyStore: Option[File] = None,
keyStorePassword: Option[String] = None,
keyPassword: Option[String] = None,
+ keyStoreType: Option[String] = None,
+ needClientAuth: Boolean = false,
trustStore: Option[File] = None,
trustStorePassword: Option[String] = None,
+ trustStoreType: Option[String] = None,
protocol: Option[String] = None,
enabledAlgorithms: Set[String] = Set.empty)
extends Logging {
@@ -63,12 +69,18 @@ private[spark] case class SSLOptions(
val sslContextFactory = new SslContextFactory()
keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath))
- trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
keyStorePassword.foreach(sslContextFactory.setKeyStorePassword)
- trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
+ keyStoreType.foreach(sslContextFactory.setKeyStoreType)
+ if (needClientAuth) {
+ trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
+ trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
+ trustStoreType.foreach(sslContextFactory.setTrustStoreType)
+ }
protocol.foreach(sslContextFactory.setProtocol)
- sslContextFactory.setIncludeCipherSuites(supportedAlgorithms.toSeq: _*)
+ if (supportedAlgorithms.nonEmpty) {
+ sslContextFactory.setIncludeCipherSuites(supportedAlgorithms.toSeq: _*)
+ }
Some(sslContextFactory)
} else {
@@ -82,6 +94,13 @@ private[spark] case class SSLOptions(
*/
def createAkkaConfig: Option[Config] = {
if (enabled) {
+ if (keyStoreType.isDefined) {
+ logWarning("Akka configuration does not support key store type.");
+ }
+ if (trustStoreType.isDefined) {
+ logWarning("Akka configuration does not support trust store type.");
+ }
+
Some(ConfigFactory.empty()
.withValue("akka.remote.netty.tcp.security.key-store",
ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
@@ -110,7 +129,9 @@ private[spark] case class SSLOptions(
* The supportedAlgorithms set is a subset of the enabledAlgorithms that
* are supported by the current Java security provider for this protocol.
*/
- private val supportedAlgorithms: Set[String] = {
+ private val supportedAlgorithms: Set[String] = if (enabledAlgorithms.isEmpty) {
+ Set()
+ } else {
var context: SSLContext = null
try {
context = SSLContext.getInstance(protocol.orNull)
@@ -133,7 +154,11 @@ private[spark] case class SSLOptions(
logDebug(s"Discarding unsupported cipher $cipher")
}
- enabledAlgorithms & providerAlgorithms
+ val supported = enabledAlgorithms & providerAlgorithms
+ require(supported.nonEmpty || sys.env.contains("SPARK_TESTING"),
+ "SSLContext does not support any of the enabled algorithms: " +
+ enabledAlgorithms.mkString(","))
+ supported
}
/** Returns a string representation of this SSLOptions with all the passwords masked. */
@@ -153,9 +178,12 @@ private[spark] object SSLOptions extends Logging {
* $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
* $ - `[ns].keyStorePassword` - a password to the key-store file
* $ - `[ns].keyPassword` - a password to the private key
+ * $ - `[ns].keyStoreType` - the type of the key-store
+ * $ - `[ns].needClientAuth` - whether SSL needs client authentication
* $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
* directory
* $ - `[ns].trustStorePassword` - a password to the trust-store file
+ * $ - `[ns].trustStoreType` - the type of trust-store
* $ - `[ns].protocol` - a protocol name supported by a particular Java version
* $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
*
@@ -183,12 +211,21 @@ private[spark] object SSLOptions extends Logging {
val keyPassword = conf.getOption(s"$ns.keyPassword")
.orElse(defaults.flatMap(_.keyPassword))
+ val keyStoreType = conf.getOption(s"$ns.keyStoreType")
+ .orElse(defaults.flatMap(_.keyStoreType))
+
+ val needClientAuth =
+ conf.getBoolean(s"$ns.needClientAuth", defaultValue = defaults.exists(_.needClientAuth))
+
val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_))
.orElse(defaults.flatMap(_.trustStore))
val trustStorePassword = conf.getOption(s"$ns.trustStorePassword")
.orElse(defaults.flatMap(_.trustStorePassword))
+ val trustStoreType = conf.getOption(s"$ns.trustStoreType")
+ .orElse(defaults.flatMap(_.trustStoreType))
+
val protocol = conf.getOption(s"$ns.protocol")
.orElse(defaults.flatMap(_.protocol))
@@ -202,8 +239,11 @@ private[spark] object SSLOptions extends Logging {
keyStore,
keyStorePassword,
keyPassword,
+ keyStoreType,
+ needClientAuth,
trustStore,
trustStorePassword,
+ trustStoreType,
protocol,
enabledAlgorithms)
}
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 64e483e384..c5aec05c03 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -244,14 +244,8 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
// the default SSL configuration - it will be used by all communication layers unless overwritten
private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)
- // SSL configuration for different communication layers - they can override the default
- // configuration at a specified namespace. The namespace *must* start with spark.ssl.
- val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions))
- val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions))
-
- logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions")
- logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions")
-
+ // SSL configuration for the file server. This is used by Utils.setupSecureURLConnection().
+ val fileServerSSLOptions = getSSLOptions("fs")
val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
val trustStoreManagers =
for (trustStore <- fileServerSSLOptions.trustStore) yield {
@@ -292,6 +286,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
(None, None)
}
+ def getSSLOptions(module: String): SSLOptions = {
+ val opts = SSLOptions.parse(sparkConf, s"spark.ssl.$module", Some(defaultSSLOptions))
+ logDebug(s"Created SSL options for $module: $opts")
+ opts
+ }
+
/**
* Split a comma separated String, filter out any empty items, and return a Set of strings
*/
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 3feb7cea59..3e78c7ae24 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -41,8 +41,7 @@ private[deploy] object DeployMessages {
worker: RpcEndpointRef,
cores: Int,
memory: Int,
- webUiPort: Int,
- publicAddress: String)
+ workerWebUiUrl: String)
extends DeployMessage {
Utils.checkHost(host, "Required hostname")
assert (port > 0)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 96007a06e3..1f13d7db34 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -49,7 +49,8 @@ class HistoryServer(
provider: ApplicationHistoryProvider,
securityManager: SecurityManager,
port: Int)
- extends WebUI(securityManager, port, conf) with Logging with UIRoot {
+ extends WebUI(securityManager, securityManager.getSSLOptions("historyServer"), port, conf)
+ with Logging with UIRoot {
// How many applications to retain
private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
@@ -233,7 +234,7 @@ object HistoryServer extends Logging {
val UI_PATH_PREFIX = "/history"
- def main(argStrings: Array[String]) {
+ def main(argStrings: Array[String]): Unit = {
Utils.initDaemon(log)
new HistoryServerArguments(conf, argStrings)
initSecurity()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 0deab8ddd5..202a1b787c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -383,7 +383,7 @@ private[deploy] class Master(
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
- id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
+ id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
@@ -392,7 +392,7 @@ private[deploy] class Master(
context.reply(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
- workerRef, workerUiPort, publicAddress)
+ workerRef, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
context.reply(RegisteredWorker(self, masterWebUiUrl))
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index f751966605..4e20c10fd1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -29,8 +29,7 @@ private[spark] class WorkerInfo(
val cores: Int,
val memory: Int,
val endpoint: RpcEndpointRef,
- val webUiPort: Int,
- val publicAddress: String)
+ val webUiAddress: String)
extends Serializable {
Utils.checkHost(host, "Expected hostname")
@@ -98,10 +97,6 @@ private[spark] class WorkerInfo(
coresUsed -= driver.desc.cores
}
- def webUiAddress : String = {
- "http://" + this.publicAddress + ":" + this.webUiPort
- }
-
def setState(state: WorkerState.Value): Unit = {
this.state = state
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 750ef0a962..d7543926f3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -32,8 +32,8 @@ class MasterWebUI(
val master: Master,
requestedPort: Int,
customMasterPage: Option[MasterPage] = None)
- extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging
- with UIRoot {
+ extends WebUI(master.securityMgr, master.securityMgr.getSSLOptions("standalone"),
+ requestedPort, master.conf, name = "MasterUI") with Logging with UIRoot {
val masterEndpointRef = master.self
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
index da9740bb41..baad098a0c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
@@ -31,7 +31,7 @@ private[spark] class MesosClusterUI(
conf: SparkConf,
dispatcherPublicAddress: String,
val scheduler: MesosClusterScheduler)
- extends WebUI(securityManager, port, conf) {
+ extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) {
initialize()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 98e17da489..179d3b9f20 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -100,6 +100,7 @@ private[deploy] class Worker(
private var master: Option[RpcEndpointRef] = None
private var activeMasterUrl: String = ""
private[worker] var activeMasterWebUiUrl : String = ""
+ private var workerWebUiUrl: String = ""
private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString
private var registered = false
private var connected = false
@@ -184,6 +185,9 @@ private[deploy] class Worker(
shuffleService.startIfEnabled()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
+
+ val scheme = if (webUi.sslOptions.enabled) "https" else "http"
+ workerWebUiUrl = s"$scheme://$publicAddress:${webUi.boundPort}"
registerWithMaster()
metricsSystem.registerSource(workerSource)
@@ -336,7 +340,7 @@ private[deploy] class Worker(
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
- workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
+ workerId, host, port, self, cores, memory, workerWebUiUrl))
.onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 1a0598e50d..b45b682494 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -34,7 +34,8 @@ class WorkerWebUI(
val worker: Worker,
val workDir: File,
requestedPort: Int)
- extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
+ extends WebUI(worker.securityMgr, worker.securityMgr.getSSLOptions("standalone"),
+ requestedPort, worker.conf, name = "WorkerUI")
with Logging {
private[ui] val timeout = RpcUtils.askRpcTimeout(worker.conf)
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index b796a44fe0..bc143b7de3 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -17,21 +17,24 @@
package org.apache.spark.ui
-import java.net.{InetSocketAddress, URL}
+import java.net.{URI, URL}
import javax.servlet.DispatcherType
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
+import scala.collection.mutable.{ArrayBuffer, StringBuilder}
import scala.language.implicitConversions
import scala.xml.Node
-import org.eclipse.jetty.server.Server
+import org.eclipse.jetty.server.{Connector, Request, Server}
import org.eclipse.jetty.server.handler._
+import org.eclipse.jetty.server.nio.SelectChannelConnector
+import org.eclipse.jetty.server.ssl.SslSelectChannelConnector
import org.eclipse.jetty.servlet._
import org.eclipse.jetty.util.thread.QueuedThreadPool
import org.json4s.JValue
import org.json4s.jackson.JsonMethods.{pretty, render}
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SSLOptions}
import org.apache.spark.util.Utils
/**
@@ -224,23 +227,51 @@ private[spark] object JettyUtils extends Logging {
def startJettyServer(
hostName: String,
port: Int,
+ sslOptions: SSLOptions,
handlers: Seq[ServletContextHandler],
conf: SparkConf,
serverName: String = ""): ServerInfo = {
+ val collection = new ContextHandlerCollection
addFilters(handlers, conf)
- val collection = new ContextHandlerCollection
val gzipHandlers = handlers.map { h =>
val gzipHandler = new GzipHandler
gzipHandler.setHandler(h)
gzipHandler
}
- collection.setHandlers(gzipHandlers.toArray)
// Bind to the given port, or throw a java.net.BindException if the port is occupied
def connect(currentPort: Int): (Server, Int) = {
- val server = new Server(new InetSocketAddress(hostName, currentPort))
+ val server = new Server
+ val connectors = new ArrayBuffer[Connector]
+ // Create a connector on port currentPort to listen for HTTP requests
+ val httpConnector = new SelectChannelConnector()
+ httpConnector.setPort(currentPort)
+ connectors += httpConnector
+
+ sslOptions.createJettySslContextFactory().foreach { factory =>
+ // If the new port wraps around, do not try a privileged port.
+ val securePort =
+ if (currentPort != 0) {
+ (currentPort + 400 - 1024) % (65536 - 1024) + 1024
+ } else {
+ 0
+ }
+ val scheme = "https"
+ // Create a connector on port securePort to listen for HTTPS requests
+ val connector = new SslSelectChannelConnector(factory)
+ connector.setPort(securePort)
+ connectors += connector
+
+ // redirect the HTTP requests to HTTPS port
+ collection.addHandler(createRedirectHttpsHandler(securePort, scheme))
+ }
+
+ gzipHandlers.foreach(collection.addHandler)
+ connectors.foreach(_.setHost(hostName))
+ server.setConnectors(connectors.toArray)
+
val pool = new QueuedThreadPool
pool.setDaemon(true)
server.setThreadPool(pool)
@@ -262,6 +293,42 @@ private[spark] object JettyUtils extends Logging {
val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, conf, serverName)
ServerInfo(server, boundPort, collection)
}
+
+ private def createRedirectHttpsHandler(securePort: Int, scheme: String): ContextHandler = {
+ val redirectHandler: ContextHandler = new ContextHandler
+ redirectHandler.setContextPath("/")
+ redirectHandler.setHandler(new AbstractHandler {
+ override def handle(
+ target: String,
+ baseRequest: Request,
+ request: HttpServletRequest,
+ response: HttpServletResponse): Unit = {
+ if (baseRequest.isSecure) {
+ return
+ }
+ val httpsURI = createRedirectURI(scheme, baseRequest.getServerName, securePort,
+ baseRequest.getRequestURI, baseRequest.getQueryString)
+ response.setContentLength(0)
+ response.encodeRedirectURL(httpsURI)
+ response.sendRedirect(httpsURI)
+ baseRequest.setHandled(true)
+ }
+ })
+ redirectHandler
+ }
+
+ // Create a new URI from the arguments, handling IPv6 host encoding and default ports.
+ private def createRedirectURI(
+ scheme: String, server: String, port: Int, path: String, query: String) = {
+ val redirectServer = if (server.contains(":") && !server.startsWith("[")) {
+ s"[${server}]"
+ } else {
+ server
+ }
+ val authority = s"$redirectServer:$port"
+ new URI(scheme, authority, path, query, null).toString
+ }
+
}
private[spark] case class ServerInfo(
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index e319937702..eb53aa8e23 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -50,7 +50,8 @@ private[spark] class SparkUI private (
var appName: String,
val basePath: String,
val startTime: Long)
- extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
+ extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf),
+ conf, basePath, "SparkUI")
with Logging
with UIRoot {
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 3925235984..fe4949b9f6 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -26,7 +26,7 @@ import scala.xml.Node
import org.eclipse.jetty.servlet.ServletContextHandler
import org.json4s.JsonAST.{JNothing, JValue}
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SSLOptions}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
@@ -38,6 +38,7 @@ import org.apache.spark.util.Utils
*/
private[spark] abstract class WebUI(
val securityManager: SecurityManager,
+ val sslOptions: SSLOptions,
port: Int,
conf: SparkConf,
basePath: String = "",
@@ -133,7 +134,7 @@ private[spark] abstract class WebUI(
def bind() {
assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className))
try {
- serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name))
+ serverInfo = Some(startJettyServer("0.0.0.0", port, sslOptions, handlers, conf, name))
logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
} catch {
case e: Exception =>
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index f2d93edd4f..3f4ac9b2f1 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -86,7 +86,7 @@ private[spark] object AkkaUtils extends Logging {
val secureCookie = if (isAuthOn) secretKey else ""
logDebug(s"In createActorSystem, requireCookie is: $requireCookie")
- val akkaSslConfig = securityManager.akkaSSLOptions.createAkkaConfig
+ val akkaSslConfig = securityManager.getSSLOptions("akka").createAkkaConfig
.getOrElse(ConfigFactory.empty())
val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap.asJava)