diff options
author | scwf <wangfei1@huawei.com> | 2016-01-19 14:49:55 -0800 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2016-01-19 14:49:55 -0800 |
commit | 43f1d59e17d89d19b322d639c5069a3fc0c8e2ed (patch) | |
tree | 9a06aeeeb1345364957ae64bdabedb8381960068 /core | |
parent | efd7eed3222799d66d4fcb68785142dc570c8150 (diff) | |
download | spark-43f1d59e17d89d19b322d639c5069a3fc0c8e2ed.tar.gz spark-43f1d59e17d89d19b322d639c5069a3fc0c8e2ed.tar.bz2 spark-43f1d59e17d89d19b322d639c5069a3fc0c8e2ed.zip |
[SPARK-2750][WEB UI] Add https support to the Web UI
Author: scwf <wangfei1@huawei.com>
Author: Marcelo Vanzin <vanzin@cloudera.com>
Author: WangTaoTheTonic <wangtao111@huawei.com>
Author: w00228970 <wangfei1@huawei.com>
Closes #10238 from vanzin/SPARK-2750.
Diffstat (limited to 'core')
20 files changed, 273 insertions, 87 deletions
diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index 3b9c885bf9..261265f0b4 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -39,8 +39,11 @@ import org.eclipse.jetty.util.ssl.SslContextFactory * @param keyStore a path to the key-store file * @param keyStorePassword a password to access the key-store file * @param keyPassword a password to access the private key in the key-store + * @param keyStoreType the type of the key-store + * @param needClientAuth set true if SSL needs client authentication * @param trustStore a path to the trust-store file * @param trustStorePassword a password to access the trust-store file + * @param trustStoreType the type of the trust-store * @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java * @param enabledAlgorithms a set of encryption algorithms that may be used */ @@ -49,8 +52,11 @@ private[spark] case class SSLOptions( keyStore: Option[File] = None, keyStorePassword: Option[String] = None, keyPassword: Option[String] = None, + keyStoreType: Option[String] = None, + needClientAuth: Boolean = false, trustStore: Option[File] = None, trustStorePassword: Option[String] = None, + trustStoreType: Option[String] = None, protocol: Option[String] = None, enabledAlgorithms: Set[String] = Set.empty) extends Logging { @@ -63,12 +69,18 @@ private[spark] case class SSLOptions( val sslContextFactory = new SslContextFactory() keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath)) - trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath)) keyStorePassword.foreach(sslContextFactory.setKeyStorePassword) - trustStorePassword.foreach(sslContextFactory.setTrustStorePassword) keyPassword.foreach(sslContextFactory.setKeyManagerPassword) + keyStoreType.foreach(sslContextFactory.setKeyStoreType) + if (needClientAuth) { + trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath)) + trustStorePassword.foreach(sslContextFactory.setTrustStorePassword) + trustStoreType.foreach(sslContextFactory.setTrustStoreType) + } protocol.foreach(sslContextFactory.setProtocol) - sslContextFactory.setIncludeCipherSuites(supportedAlgorithms.toSeq: _*) + if (supportedAlgorithms.nonEmpty) { + sslContextFactory.setIncludeCipherSuites(supportedAlgorithms.toSeq: _*) + } Some(sslContextFactory) } else { @@ -82,6 +94,13 @@ private[spark] case class SSLOptions( */ def createAkkaConfig: Option[Config] = { if (enabled) { + if (keyStoreType.isDefined) { + logWarning("Akka configuration does not support key store type."); + } + if (trustStoreType.isDefined) { + logWarning("Akka configuration does not support trust store type."); + } + Some(ConfigFactory.empty() .withValue("akka.remote.netty.tcp.security.key-store", ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse(""))) @@ -110,7 +129,9 @@ private[spark] case class SSLOptions( * The supportedAlgorithms set is a subset of the enabledAlgorithms that * are supported by the current Java security provider for this protocol. */ - private val supportedAlgorithms: Set[String] = { + private val supportedAlgorithms: Set[String] = if (enabledAlgorithms.isEmpty) { + Set() + } else { var context: SSLContext = null try { context = SSLContext.getInstance(protocol.orNull) @@ -133,7 +154,11 @@ private[spark] case class SSLOptions( logDebug(s"Discarding unsupported cipher $cipher") } - enabledAlgorithms & providerAlgorithms + val supported = enabledAlgorithms & providerAlgorithms + require(supported.nonEmpty || sys.env.contains("SPARK_TESTING"), + "SSLContext does not support any of the enabled algorithms: " + + enabledAlgorithms.mkString(",")) + supported } /** Returns a string representation of this SSLOptions with all the passwords masked. */ @@ -153,9 +178,12 @@ private[spark] object SSLOptions extends Logging { * $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory * $ - `[ns].keyStorePassword` - a password to the key-store file * $ - `[ns].keyPassword` - a password to the private key + * $ - `[ns].keyStoreType` - the type of the key-store + * $ - `[ns].needClientAuth` - whether SSL needs client authentication * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current * directory * $ - `[ns].trustStorePassword` - a password to the trust-store file + * $ - `[ns].trustStoreType` - the type of trust-store * $ - `[ns].protocol` - a protocol name supported by a particular Java version * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers * @@ -183,12 +211,21 @@ private[spark] object SSLOptions extends Logging { val keyPassword = conf.getOption(s"$ns.keyPassword") .orElse(defaults.flatMap(_.keyPassword)) + val keyStoreType = conf.getOption(s"$ns.keyStoreType") + .orElse(defaults.flatMap(_.keyStoreType)) + + val needClientAuth = + conf.getBoolean(s"$ns.needClientAuth", defaultValue = defaults.exists(_.needClientAuth)) + val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_)) .orElse(defaults.flatMap(_.trustStore)) val trustStorePassword = conf.getOption(s"$ns.trustStorePassword") .orElse(defaults.flatMap(_.trustStorePassword)) + val trustStoreType = conf.getOption(s"$ns.trustStoreType") + .orElse(defaults.flatMap(_.trustStoreType)) + val protocol = conf.getOption(s"$ns.protocol") .orElse(defaults.flatMap(_.protocol)) @@ -202,8 +239,11 @@ private[spark] object SSLOptions extends Logging { keyStore, keyStorePassword, keyPassword, + keyStoreType, + needClientAuth, trustStore, trustStorePassword, + trustStoreType, protocol, enabledAlgorithms) } diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 64e483e384..c5aec05c03 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -244,14 +244,8 @@ private[spark] class SecurityManager(sparkConf: SparkConf) // the default SSL configuration - it will be used by all communication layers unless overwritten private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None) - // SSL configuration for different communication layers - they can override the default - // configuration at a specified namespace. The namespace *must* start with spark.ssl. - val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions)) - val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions)) - - logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions") - logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions") - + // SSL configuration for the file server. This is used by Utils.setupSecureURLConnection(). + val fileServerSSLOptions = getSSLOptions("fs") val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) { val trustStoreManagers = for (trustStore <- fileServerSSLOptions.trustStore) yield { @@ -292,6 +286,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf) (None, None) } + def getSSLOptions(module: String): SSLOptions = { + val opts = SSLOptions.parse(sparkConf, s"spark.ssl.$module", Some(defaultSSLOptions)) + logDebug(s"Created SSL options for $module: $opts") + opts + } + /** * Split a comma separated String, filter out any empty items, and return a Set of strings */ diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 3feb7cea59..3e78c7ae24 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -41,8 +41,7 @@ private[deploy] object DeployMessages { worker: RpcEndpointRef, cores: Int, memory: Int, - webUiPort: Int, - publicAddress: String) + workerWebUiUrl: String) extends DeployMessage { Utils.checkHost(host, "Required hostname") assert (port > 0) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 96007a06e3..1f13d7db34 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -49,7 +49,8 @@ class HistoryServer( provider: ApplicationHistoryProvider, securityManager: SecurityManager, port: Int) - extends WebUI(securityManager, port, conf) with Logging with UIRoot { + extends WebUI(securityManager, securityManager.getSSLOptions("historyServer"), port, conf) + with Logging with UIRoot { // How many applications to retain private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50) @@ -233,7 +234,7 @@ object HistoryServer extends Logging { val UI_PATH_PREFIX = "/history" - def main(argStrings: Array[String]) { + def main(argStrings: Array[String]): Unit = { Utils.initDaemon(log) new HistoryServerArguments(conf, argStrings) initSecurity() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0deab8ddd5..202a1b787c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -383,7 +383,7 @@ private[deploy] class Master( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterWorker( - id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => { + id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { @@ -392,7 +392,7 @@ private[deploy] class Master( context.reply(RegisterWorkerFailed("Duplicate worker ID")) } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, - workerRef, workerUiPort, publicAddress) + workerRef, workerWebUiUrl) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) context.reply(RegisteredWorker(self, masterWebUiUrl)) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index f751966605..4e20c10fd1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -29,8 +29,7 @@ private[spark] class WorkerInfo( val cores: Int, val memory: Int, val endpoint: RpcEndpointRef, - val webUiPort: Int, - val publicAddress: String) + val webUiAddress: String) extends Serializable { Utils.checkHost(host, "Expected hostname") @@ -98,10 +97,6 @@ private[spark] class WorkerInfo( coresUsed -= driver.desc.cores } - def webUiAddress : String = { - "http://" + this.publicAddress + ":" + this.webUiPort - } - def setState(state: WorkerState.Value): Unit = { this.state = state } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 750ef0a962..d7543926f3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -32,8 +32,8 @@ class MasterWebUI( val master: Master, requestedPort: Int, customMasterPage: Option[MasterPage] = None) - extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging - with UIRoot { + extends WebUI(master.securityMgr, master.securityMgr.getSSLOptions("standalone"), + requestedPort, master.conf, name = "MasterUI") with Logging with UIRoot { val masterEndpointRef = master.self val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true) diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala index da9740bb41..baad098a0c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala @@ -31,7 +31,7 @@ private[spark] class MesosClusterUI( conf: SparkConf, dispatcherPublicAddress: String, val scheduler: MesosClusterScheduler) - extends WebUI(securityManager, port, conf) { + extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) { initialize() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 98e17da489..179d3b9f20 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -100,6 +100,7 @@ private[deploy] class Worker( private var master: Option[RpcEndpointRef] = None private var activeMasterUrl: String = "" private[worker] var activeMasterWebUiUrl : String = "" + private var workerWebUiUrl: String = "" private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString private var registered = false private var connected = false @@ -184,6 +185,9 @@ private[deploy] class Worker( shuffleService.startIfEnabled() webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() + + val scheme = if (webUi.sslOptions.enabled) "https" else "http" + workerWebUiUrl = s"$scheme://$publicAddress:${webUi.boundPort}" registerWithMaster() metricsSystem.registerSource(workerSource) @@ -336,7 +340,7 @@ private[deploy] class Worker( private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = { masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker( - workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress)) + workerId, host, port, self, cores, memory, workerWebUiUrl)) .onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 1a0598e50d..b45b682494 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -34,7 +34,8 @@ class WorkerWebUI( val worker: Worker, val workDir: File, requestedPort: Int) - extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI") + extends WebUI(worker.securityMgr, worker.securityMgr.getSSLOptions("standalone"), + requestedPort, worker.conf, name = "WorkerUI") with Logging { private[ui] val timeout = RpcUtils.askRpcTimeout(worker.conf) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index b796a44fe0..bc143b7de3 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -17,21 +17,24 @@ package org.apache.spark.ui -import java.net.{InetSocketAddress, URL} +import java.net.{URI, URL} import javax.servlet.DispatcherType import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} +import scala.collection.mutable.{ArrayBuffer, StringBuilder} import scala.language.implicitConversions import scala.xml.Node -import org.eclipse.jetty.server.Server +import org.eclipse.jetty.server.{Connector, Request, Server} import org.eclipse.jetty.server.handler._ +import org.eclipse.jetty.server.nio.SelectChannelConnector +import org.eclipse.jetty.server.ssl.SslSelectChannelConnector import org.eclipse.jetty.servlet._ import org.eclipse.jetty.util.thread.QueuedThreadPool import org.json4s.JValue import org.json4s.jackson.JsonMethods.{pretty, render} -import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SSLOptions} import org.apache.spark.util.Utils /** @@ -224,23 +227,51 @@ private[spark] object JettyUtils extends Logging { def startJettyServer( hostName: String, port: Int, + sslOptions: SSLOptions, handlers: Seq[ServletContextHandler], conf: SparkConf, serverName: String = ""): ServerInfo = { + val collection = new ContextHandlerCollection addFilters(handlers, conf) - val collection = new ContextHandlerCollection val gzipHandlers = handlers.map { h => val gzipHandler = new GzipHandler gzipHandler.setHandler(h) gzipHandler } - collection.setHandlers(gzipHandlers.toArray) // Bind to the given port, or throw a java.net.BindException if the port is occupied def connect(currentPort: Int): (Server, Int) = { - val server = new Server(new InetSocketAddress(hostName, currentPort)) + val server = new Server + val connectors = new ArrayBuffer[Connector] + // Create a connector on port currentPort to listen for HTTP requests + val httpConnector = new SelectChannelConnector() + httpConnector.setPort(currentPort) + connectors += httpConnector + + sslOptions.createJettySslContextFactory().foreach { factory => + // If the new port wraps around, do not try a privileged port. + val securePort = + if (currentPort != 0) { + (currentPort + 400 - 1024) % (65536 - 1024) + 1024 + } else { + 0 + } + val scheme = "https" + // Create a connector on port securePort to listen for HTTPS requests + val connector = new SslSelectChannelConnector(factory) + connector.setPort(securePort) + connectors += connector + + // redirect the HTTP requests to HTTPS port + collection.addHandler(createRedirectHttpsHandler(securePort, scheme)) + } + + gzipHandlers.foreach(collection.addHandler) + connectors.foreach(_.setHost(hostName)) + server.setConnectors(connectors.toArray) + val pool = new QueuedThreadPool pool.setDaemon(true) server.setThreadPool(pool) @@ -262,6 +293,42 @@ private[spark] object JettyUtils extends Logging { val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, conf, serverName) ServerInfo(server, boundPort, collection) } + + private def createRedirectHttpsHandler(securePort: Int, scheme: String): ContextHandler = { + val redirectHandler: ContextHandler = new ContextHandler + redirectHandler.setContextPath("/") + redirectHandler.setHandler(new AbstractHandler { + override def handle( + target: String, + baseRequest: Request, + request: HttpServletRequest, + response: HttpServletResponse): Unit = { + if (baseRequest.isSecure) { + return + } + val httpsURI = createRedirectURI(scheme, baseRequest.getServerName, securePort, + baseRequest.getRequestURI, baseRequest.getQueryString) + response.setContentLength(0) + response.encodeRedirectURL(httpsURI) + response.sendRedirect(httpsURI) + baseRequest.setHandled(true) + } + }) + redirectHandler + } + + // Create a new URI from the arguments, handling IPv6 host encoding and default ports. + private def createRedirectURI( + scheme: String, server: String, port: Int, path: String, query: String) = { + val redirectServer = if (server.contains(":") && !server.startsWith("[")) { + s"[${server}]" + } else { + server + } + val authority = s"$redirectServer:$port" + new URI(scheme, authority, path, query, null).toString + } + } private[spark] case class ServerInfo( diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index e319937702..eb53aa8e23 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -50,7 +50,8 @@ private[spark] class SparkUI private ( var appName: String, val basePath: String, val startTime: Long) - extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI") + extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf), + conf, basePath, "SparkUI") with Logging with UIRoot { diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 3925235984..fe4949b9f6 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -26,7 +26,7 @@ import scala.xml.Node import org.eclipse.jetty.servlet.ServletContextHandler import org.json4s.JsonAST.{JNothing, JValue} -import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SSLOptions} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.Utils @@ -38,6 +38,7 @@ import org.apache.spark.util.Utils */ private[spark] abstract class WebUI( val securityManager: SecurityManager, + val sslOptions: SSLOptions, port: Int, conf: SparkConf, basePath: String = "", @@ -133,7 +134,7 @@ private[spark] abstract class WebUI( def bind() { assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { - serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name)) + serverInfo = Some(startJettyServer("0.0.0.0", port, sslOptions, handlers, conf, name)) logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) } catch { case e: Exception => diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index f2d93edd4f..3f4ac9b2f1 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -86,7 +86,7 @@ private[spark] object AkkaUtils extends Logging { val secureCookie = if (isAuthOn) secretKey else "" logDebug(s"In createActorSystem, requireCookie is: $requireCookie") - val akkaSslConfig = securityManager.akkaSSLOptions.createAkkaConfig + val akkaSslConfig = securityManager.getSSLOptions("akka").createAkkaConfig .getOrElse(ConfigFactory.empty()) val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap.asJava) diff --git a/core/src/test/resources/spark.keystore b/core/src/test/resources/spark.keystore Binary files differnew file mode 100644 index 0000000000..f30716b57b --- /dev/null +++ b/core/src/test/resources/spark.keystore diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala index e0226803bb..7603cef773 100644 --- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala @@ -181,9 +181,10 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties { "SSL_DHE_RSA_WITH_AES_128_CBC_SHA256") val securityManager = new SecurityManager(conf) + val akkaSSLOptions = securityManager.getSSLOptions("akka") assert(securityManager.fileServerSSLOptions.enabled === true) - assert(securityManager.akkaSSLOptions.enabled === true) + assert(akkaSSLOptions.enabled === true) assert(securityManager.sslSocketFactory.isDefined === true) assert(securityManager.hostnameVerifier.isDefined === true) @@ -198,15 +199,15 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties { assert(securityManager.fileServerSSLOptions.protocol === Some("TLSv1.2")) assert(securityManager.fileServerSSLOptions.enabledAlgorithms === expectedAlgorithms) - assert(securityManager.akkaSSLOptions.trustStore.isDefined === true) - assert(securityManager.akkaSSLOptions.trustStore.get.getName === "truststore") - assert(securityManager.akkaSSLOptions.keyStore.isDefined === true) - assert(securityManager.akkaSSLOptions.keyStore.get.getName === "keystore") - assert(securityManager.akkaSSLOptions.trustStorePassword === Some("password")) - assert(securityManager.akkaSSLOptions.keyStorePassword === Some("password")) - assert(securityManager.akkaSSLOptions.keyPassword === Some("password")) - assert(securityManager.akkaSSLOptions.protocol === Some("TLSv1.2")) - assert(securityManager.akkaSSLOptions.enabledAlgorithms === expectedAlgorithms) + assert(akkaSSLOptions.trustStore.isDefined === true) + assert(akkaSSLOptions.trustStore.get.getName === "truststore") + assert(akkaSSLOptions.keyStore.isDefined === true) + assert(akkaSSLOptions.keyStore.get.getName === "keystore") + assert(akkaSSLOptions.trustStorePassword === Some("password")) + assert(akkaSSLOptions.keyStorePassword === Some("password")) + assert(akkaSSLOptions.keyPassword === Some("password")) + assert(akkaSSLOptions.protocol === Some("TLSv1.2")) + assert(akkaSSLOptions.enabledAlgorithms === expectedAlgorithms) } test("ssl off setup") { @@ -218,7 +219,6 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties { val securityManager = new SecurityManager(conf) assert(securityManager.fileServerSSLOptions.enabled === false) - assert(securityManager.akkaSSLOptions.enabled === false) assert(securityManager.sslSocketFactory.isDefined === false) assert(securityManager.hostnameVerifier.isDefined === false) } diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala index 86455a13d0..190e4dd728 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala @@ -50,7 +50,7 @@ private[deploy] object DeployTestUtils { createDriverDesc(), new Date()) def createWorkerInfo(): WorkerInfo = { - val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress") + val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80") workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis workerInfo } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 10e33a32ba..ce00807ea4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -90,8 +90,7 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva cores = 0, memory = 0, endpoint = null, - webUiPort = 0, - publicAddress = "" + webUiAddress = "http://localhost:80" ) val (rpcEnv, _, _) = @@ -376,7 +375,7 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = { val workerId = System.currentTimeMillis.toString - new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, "address") + new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, "http://localhost:80") } private def scheduleExecutorsOnWorkers( diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala index b4deed7f87..62fe0eaedf 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala @@ -88,9 +88,7 @@ class PersistenceEngineSuite extends SparkFunSuite { cores = 0, memory = 0, endpoint = workerEndpoint, - webUiPort = 0, - publicAddress = "" - ) + webUiAddress = "http://localhost:80") persistenceEngine.addWorker(workerToPersist) @@ -109,8 +107,7 @@ class PersistenceEngineSuite extends SparkFunSuite { assert(workerToPersist.cores === recoveryWorkerInfo.cores) assert(workerToPersist.memory === recoveryWorkerInfo.memory) assert(workerToPersist.endpoint === recoveryWorkerInfo.endpoint) - assert(workerToPersist.webUiPort === recoveryWorkerInfo.webUiPort) - assert(workerToPersist.publicAddress === recoveryWorkerInfo.publicAddress) + assert(workerToPersist.webUiAddress === recoveryWorkerInfo.webUiAddress) } finally { testRpcEnv.shutdown() testRpcEnv.awaitTermination() diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 2d28b67ef2..69c46058f1 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -17,16 +17,16 @@ package org.apache.spark.ui -import java.net.ServerSocket +import java.net.{BindException, ServerSocket} import scala.io.Source -import scala.util.{Failure, Success, Try} +import org.eclipse.jetty.server.Server import org.eclipse.jetty.servlet.ServletContextHandler import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark._ import org.apache.spark.LocalSparkContext._ class UISuite extends SparkFunSuite { @@ -45,6 +45,20 @@ class UISuite extends SparkFunSuite { sc } + private def sslDisabledConf(): (SparkConf, SSLOptions) = { + val conf = new SparkConf + (conf, new SecurityManager(conf).getSSLOptions("ui")) + } + + private def sslEnabledConf(): (SparkConf, SSLOptions) = { + val conf = new SparkConf() + .set("spark.ssl.ui.enabled", "true") + .set("spark.ssl.ui.keyStore", "./src/test/resources/spark.keystore") + .set("spark.ssl.ui.keyStorePassword", "123456") + .set("spark.ssl.ui.keyPassword", "123456") + (conf, new SecurityManager(conf).getSSLOptions("ui")) + } + ignore("basic ui visibility") { withSpark(newSparkContext()) { sc => // test if the ui is visible, and all the expected tabs are visible @@ -70,33 +84,92 @@ class UISuite extends SparkFunSuite { } test("jetty selects different port under contention") { - val server = new ServerSocket(0) - val startPort = server.getLocalPort - val serverInfo1 = JettyUtils.startJettyServer( - "0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf) - val serverInfo2 = JettyUtils.startJettyServer( - "0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf) - // Allow some wiggle room in case ports on the machine are under contention - val boundPort1 = serverInfo1.boundPort - val boundPort2 = serverInfo2.boundPort - assert(boundPort1 != startPort) - assert(boundPort2 != startPort) - assert(boundPort1 != boundPort2) - serverInfo1.server.stop() - serverInfo2.server.stop() - server.close() + var server: ServerSocket = null + var serverInfo1: ServerInfo = null + var serverInfo2: ServerInfo = null + val (conf, sslOptions) = sslDisabledConf() + try { + server = new ServerSocket(0) + val startPort = server.getLocalPort + serverInfo1 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, sslOptions, Seq[ServletContextHandler](), conf) + serverInfo2 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, sslOptions, Seq[ServletContextHandler](), conf) + // Allow some wiggle room in case ports on the machine are under contention + val boundPort1 = serverInfo1.boundPort + val boundPort2 = serverInfo2.boundPort + assert(boundPort1 != startPort) + assert(boundPort2 != startPort) + assert(boundPort1 != boundPort2) + } finally { + stopServer(serverInfo1) + stopServer(serverInfo2) + closeSocket(server) + } + } + + test("jetty with https selects different port under contention") { + var server: ServerSocket = null + var serverInfo1: ServerInfo = null + var serverInfo2: ServerInfo = null + try { + server = new ServerSocket(0) + val startPort = server.getLocalPort + val (conf, sslOptions) = sslEnabledConf() + serverInfo1 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, sslOptions, Seq[ServletContextHandler](), conf, "server1") + serverInfo2 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, sslOptions, Seq[ServletContextHandler](), conf, "server2") + // Allow some wiggle room in case ports on the machine are under contention + val boundPort1 = serverInfo1.boundPort + val boundPort2 = serverInfo2.boundPort + assert(boundPort1 != startPort) + assert(boundPort2 != startPort) + assert(boundPort1 != boundPort2) + } finally { + stopServer(serverInfo1) + stopServer(serverInfo2) + closeSocket(server) + } } test("jetty binds to port 0 correctly") { - val serverInfo = JettyUtils.startJettyServer( - "0.0.0.0", 0, Seq[ServletContextHandler](), new SparkConf) - val server = serverInfo.server - val boundPort = serverInfo.boundPort - assert(server.getState === "STARTED") - assert(boundPort != 0) - Try { new ServerSocket(boundPort) } match { - case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort)) - case Failure(e) => + var socket: ServerSocket = null + var serverInfo: ServerInfo = null + val (conf, sslOptions) = sslDisabledConf() + try { + serverInfo = JettyUtils.startJettyServer( + "0.0.0.0", 0, sslOptions, Seq[ServletContextHandler](), conf) + val server = serverInfo.server + val boundPort = serverInfo.boundPort + assert(server.getState === "STARTED") + assert(boundPort != 0) + intercept[BindException] { + socket = new ServerSocket(boundPort) + } + } finally { + stopServer(serverInfo) + closeSocket(socket) + } + } + + test("jetty with https binds to port 0 correctly") { + var socket: ServerSocket = null + var serverInfo: ServerInfo = null + try { + val (conf, sslOptions) = sslEnabledConf() + serverInfo = JettyUtils.startJettyServer( + "0.0.0.0", 0, sslOptions, Seq[ServletContextHandler](), conf) + val server = serverInfo.server + val boundPort = serverInfo.boundPort + assert(server.getState === "STARTED") + assert(boundPort != 0) + intercept[BindException] { + socket = new ServerSocket(boundPort) + } + } finally { + stopServer(serverInfo) + closeSocket(socket) } } @@ -117,4 +190,12 @@ class UISuite extends SparkFunSuite { assert(splitUIAddress(2).toInt == boundPort) } } + + def stopServer(info: ServerInfo): Unit = { + if (info != null && info.server != null) info.server.stop + } + + def closeSocket(socket: ServerSocket): Unit = { + if (socket != null) socket.close + } } |