From cfea30037ff4ac7e386a1478e7dce07ca3bb9072 Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Mon, 2 Feb 2015 17:18:54 -0800 Subject: Spark 3883: SSL support for HttpServer and Akka SPARK-3883: SSL support for Akka connections and Jetty based file servers. This story introduced the following changes: - Introduced SSLOptions object which holds the SSL configuration and can build the appropriate configuration for Akka or Jetty. SSLOptions can be created by parsing SparkConf entries at a specified namespace. - SSLOptions is created and kept by SecurityManager - All Akka actor address creation snippets based on interpolated strings were replaced by a dedicated methods from AkkaUtils. Those methods select the proper Akka protocol - whether akka.tcp or akka.ssl.tcp - Added tests cases for AkkaUtils, FileServer, SSLOptions and SecurityManager - Added a way to use node local SSL configuration by executors and driver in standalone mode. It can be done by specifying spark.ssl.useNodeLocalConf in SparkConf. - Made CoarseGrainedExecutorBackend not overwrite the settings which are executor startup configuration - they are passed anyway from Worker Refer to https://github.com/apache/spark/pull/3571 for discussion and details Author: Jacek Lewandowski Author: Jacek Lewandowski Closes #3571 from jacek-lewandowski/SPARK-3883-master and squashes the following commits: 9ef4ed1 [Jacek Lewandowski] Merge pull request #2 from jacek-lewandowski/SPARK-3883-docs2 fb31b49 [Jacek Lewandowski] SPARK-3883: Added SSL setup documentation 2532668 [Jacek Lewandowski] SPARK-3883: Refactored AkkaUtils.protocol method to not use Try 90a8762 [Jacek Lewandowski] SPARK-3883: Refactored methods to resolve Akka address and made it possible to easily configure multiple communication layers for SSL 72b2541 [Jacek Lewandowski] SPARK-3883: A reference to the fallback SSLOptions can be provided when constructing SSLOptions 93050f4 [Jacek Lewandowski] SPARK-3883: SSL support for HttpServer and Akka --- .../main/scala/org/apache/spark/HttpServer.scala | 11 +- .../main/scala/org/apache/spark/SSLOptions.scala | 178 +++++++++++++++++++++ .../scala/org/apache/spark/SecurityManager.scala | 100 +++++++++++- .../main/scala/org/apache/spark/SparkConf.scala | 1 + .../org/apache/spark/broadcast/HttpBroadcast.scala | 1 + .../spark/deploy/ApplicationDescription.scala | 9 ++ .../scala/org/apache/spark/deploy/Client.scala | 5 +- .../apache/spark/deploy/DriverDescription.scala | 8 + .../org/apache/spark/deploy/client/AppClient.scala | 7 +- .../org/apache/spark/deploy/master/Master.scala | 8 +- .../spark/deploy/worker/ExecutorRunner.scala | 2 +- .../org/apache/spark/deploy/worker/Worker.scala | 67 ++++++-- .../executor/CoarseGrainedExecutorBackend.scala | 16 +- .../scheduler/cluster/SimrSchedulerBackend.scala | 4 +- .../cluster/SparkDeploySchedulerBackend.scala | 5 +- .../mesos/CoarseMesosSchedulerBackend.scala | 5 +- .../scala/org/apache/spark/util/AkkaUtils.scala | 36 ++++- .../main/scala/org/apache/spark/util/Utils.scala | 20 ++- 18 files changed, 440 insertions(+), 43 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/SSLOptions.scala (limited to 'core/src/main') diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index fa22787ce7..09a9ccc226 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -19,6 +19,7 @@ package org.apache.spark import java.io.File +import org.eclipse.jetty.server.ssl.SslSocketConnector import org.eclipse.jetty.util.security.{Constraint, Password} import org.eclipse.jetty.security.authentication.DigestAuthenticator import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService} @@ -72,7 +73,10 @@ private[spark] class HttpServer( */ private def doStart(startPort: Int): (Server, Int) = { val server = new Server() - val connector = new SocketConnector + + val connector = securityManager.fileServerSSLOptions.createJettySslContextFactory() + .map(new SslSocketConnector(_)).getOrElse(new SocketConnector) + connector.setMaxIdleTime(60 * 1000) connector.setSoLingerTime(-1) connector.setPort(startPort) @@ -149,13 +153,14 @@ private[spark] class HttpServer( } /** - * Get the URI of this HTTP server (http://host:port) + * Get the URI of this HTTP server (http://host:port or https://host:port) */ def uri: String = { if (server == null) { throw new ServerStateException("Server is not started") } else { - "http://" + Utils.localIpAddress + ":" + port + val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http" + s"$scheme://${Utils.localIpAddress}:$port" } } } diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala new file mode 100644 index 0000000000..2cdc167f85 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.io.File + +import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} +import org.eclipse.jetty.util.ssl.SslContextFactory + +/** + * SSLOptions class is a common container for SSL configuration options. It offers methods to + * generate specific objects to configure SSL for different communication protocols. + * + * SSLOptions is intended to provide the maximum common set of SSL settings, which are supported + * by the protocol, which it can generate the configuration for. Since Akka doesn't support client + * authentication with SSL, SSLOptions cannot support it either. + * + * @param enabled enables or disables SSL; if it is set to false, the rest of the + * settings are disregarded + * @param keyStore a path to the key-store file + * @param keyStorePassword a password to access the key-store file + * @param keyPassword a password to access the private key in the key-store + * @param trustStore a path to the trust-store file + * @param trustStorePassword a password to access the trust-store file + * @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java + * @param enabledAlgorithms a set of encryption algorithms to use + */ +private[spark] case class SSLOptions( + enabled: Boolean = false, + keyStore: Option[File] = None, + keyStorePassword: Option[String] = None, + keyPassword: Option[String] = None, + trustStore: Option[File] = None, + trustStorePassword: Option[String] = None, + protocol: Option[String] = None, + enabledAlgorithms: Set[String] = Set.empty) { + + /** + * Creates a Jetty SSL context factory according to the SSL settings represented by this object. + */ + def createJettySslContextFactory(): Option[SslContextFactory] = { + if (enabled) { + val sslContextFactory = new SslContextFactory() + + keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath)) + trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath)) + keyStorePassword.foreach(sslContextFactory.setKeyStorePassword) + trustStorePassword.foreach(sslContextFactory.setTrustStorePassword) + keyPassword.foreach(sslContextFactory.setKeyManagerPassword) + protocol.foreach(sslContextFactory.setProtocol) + sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*) + + Some(sslContextFactory) + } else { + None + } + } + + /** + * Creates an Akka configuration object which contains all the SSL settings represented by this + * object. It can be used then to compose the ultimate Akka configuration. + */ + def createAkkaConfig: Option[Config] = { + import scala.collection.JavaConversions._ + if (enabled) { + Some(ConfigFactory.empty() + .withValue("akka.remote.netty.tcp.security.key-store", + ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.key-store-password", + ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.trust-store", + ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.trust-store-password", + ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.key-password", + ConfigValueFactory.fromAnyRef(keyPassword.getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.random-number-generator", + ConfigValueFactory.fromAnyRef("")) + .withValue("akka.remote.netty.tcp.security.protocol", + ConfigValueFactory.fromAnyRef(protocol.getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.enabled-algorithms", + ConfigValueFactory.fromIterable(enabledAlgorithms.toSeq)) + .withValue("akka.remote.netty.tcp.enable-ssl", + ConfigValueFactory.fromAnyRef(true))) + } else { + None + } + } + + /** Returns a string representation of this SSLOptions with all the passwords masked. */ + override def toString: String = s"SSLOptions{enabled=$enabled, " + + s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " + + s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " + + s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}" + +} + +private[spark] object SSLOptions extends Logging { + + /** Resolves SSLOptions settings from a given Spark configuration object at a given namespace. + * + * The following settings are allowed: + * $ - `[ns].enabled` - `true` or `false`, to enable or disable SSL respectively + * $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory + * $ - `[ns].keyStorePassword` - a password to the key-store file + * $ - `[ns].keyPassword` - a password to the private key + * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current + * directory + * $ - `[ns].trustStorePassword` - a password to the trust-store file + * $ - `[ns].protocol` - a protocol name supported by a particular Java version + * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers + * + * For a list of protocols and ciphers supported by particular Java versions, you may go to + * [[https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https Oracle + * blog page]]. + * + * You can optionally specify the default configuration. If you do, for each setting which is + * missing in SparkConf, the corresponding setting is used from the default configuration. + * + * @param conf Spark configuration object where the settings are collected from + * @param ns the namespace name + * @param defaults the default configuration + * @return [[org.apache.spark.SSLOptions]] object + */ + def parse(conf: SparkConf, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = { + val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled)) + + val keyStore = conf.getOption(s"$ns.keyStore").map(new File(_)) + .orElse(defaults.flatMap(_.keyStore)) + + val keyStorePassword = conf.getOption(s"$ns.keyStorePassword") + .orElse(defaults.flatMap(_.keyStorePassword)) + + val keyPassword = conf.getOption(s"$ns.keyPassword") + .orElse(defaults.flatMap(_.keyPassword)) + + val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_)) + .orElse(defaults.flatMap(_.trustStore)) + + val trustStorePassword = conf.getOption(s"$ns.trustStorePassword") + .orElse(defaults.flatMap(_.trustStorePassword)) + + val protocol = conf.getOption(s"$ns.protocol") + .orElse(defaults.flatMap(_.protocol)) + + val enabledAlgorithms = conf.getOption(s"$ns.enabledAlgorithms") + .map(_.split(",").map(_.trim).filter(_.nonEmpty).toSet) + .orElse(defaults.map(_.enabledAlgorithms)) + .getOrElse(Set.empty) + + new SSLOptions( + enabled, + keyStore, + keyStorePassword, + keyPassword, + trustStore, + trustStorePassword, + protocol, + enabledAlgorithms) + } + +} + diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index ec82d09cd0..88d35a4bac 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -18,7 +18,11 @@ package org.apache.spark import java.net.{Authenticator, PasswordAuthentication} +import java.security.KeyStore +import java.security.cert.X509Certificate +import javax.net.ssl._ +import com.google.common.io.Files import org.apache.hadoop.io.Text import org.apache.spark.deploy.SparkHadoopUtil @@ -55,7 +59,7 @@ import org.apache.spark.network.sasl.SecretKeyHolder * Spark also has a set of admin acls (`spark.admin.acls`) which is a set of users/administrators * who always have permission to view or modify the Spark application. * - * Spark does not currently support encryption after authentication. + * Starting from version 1.3, Spark has partial support for encrypted connections with SSL. * * At this point spark has multiple communication protocols that need to be secured and * different underlying mechanisms are used depending on the protocol: @@ -67,8 +71,9 @@ import org.apache.spark.network.sasl.SecretKeyHolder * to connect to the server. There is no control of the underlying * authentication mechanism so its not clear if the password is passed in * plaintext or uses DIGEST-MD5 or some other mechanism. - * Akka also has an option to turn on SSL, this option is not currently supported - * but we could add a configuration option in the future. + * + * Akka also has an option to turn on SSL, this option is currently supported (see + * the details below). * * - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty * for the HttpServer. Jetty supports multiple authentication mechanisms - @@ -77,8 +82,9 @@ import org.apache.spark.network.sasl.SecretKeyHolder * to authenticate using DIGEST-MD5 via a single user and the shared secret. * Since we are using DIGEST-MD5, the shared secret is not passed on the wire * in plaintext. - * We currently do not support SSL (https), but Jetty can be configured to use it - * so we could add a configuration option for this in the future. + * + * We currently support SSL (https) for this communication protocol (see the details + * below). * * The Spark HttpServer installs the HashLoginServer and configures it to DIGEST-MD5. * Any clients must specify the user and password. There is a default @@ -142,9 +148,40 @@ import org.apache.spark.network.sasl.SecretKeyHolder * authentication. Spark will then use that user to compare against the view acls to do * authorization. If not filter is in place the user is generally null and no authorization * can take place. + * + * Connection encryption (SSL) configuration is organized hierarchically. The user can configure + * the default SSL settings which will be used for all the supported communication protocols unless + * they are overwritten by protocol specific settings. This way the user can easily provide the + * common settings for all the protocols without disabling the ability to configure each one + * individually. + * + * All the SSL settings like `spark.ssl.xxx` where `xxx` is a particular configuration property, + * denote the global configuration for all the supported protocols. In order to override the global + * configuration for the particular protocol, the properties must be overwritten in the + * protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global + * configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for + * Akka based connections or `fs` for broadcast and file server. + * + * Refer to [[org.apache.spark.SSLOptions]] documentation for the list of + * options that can be specified. + * + * SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions + * object parses Spark configuration at a given namespace and builds the common representation + * of SSL settings. SSLOptions is then used to provide protocol-specific configuration like + * TypeSafe configuration for Akka or SSLContextFactory for Jetty. + * + * SSL must be configured on each node and configured for each component involved in + * communication using the particular protocol. In YARN clusters, the key-store can be prepared on + * the client side then distributed and used by the executors as the part of the application + * (YARN allows the user to deploy files before the application is started). + * In standalone deployment, the user needs to provide key-stores and configuration + * options for master and workers. In this mode, the user may allow the executors to use the SSL + * settings inherited from the worker which spawned that executor. It can be accomplished by + * setting `spark.ssl.useNodeLocalConf` to `true`. */ -private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder { +private[spark] class SecurityManager(sparkConf: SparkConf) + extends Logging with SecretKeyHolder { // key used to store the spark secret in the Hadoop UGI private val sparkSecretLookupKey = "sparkCookie" @@ -196,6 +233,57 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with ) } + // the default SSL configuration - it will be used by all communication layers unless overwritten + private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None) + + // SSL configuration for different communication layers - they can override the default + // configuration at a specified namespace. The namespace *must* start with spark.ssl. + val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions)) + val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions)) + + logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions") + logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions") + + val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) { + val trustStoreManagers = + for (trustStore <- fileServerSSLOptions.trustStore) yield { + val input = Files.asByteSource(fileServerSSLOptions.trustStore.get).openStream() + + try { + val ks = KeyStore.getInstance(KeyStore.getDefaultType) + ks.load(input, fileServerSSLOptions.trustStorePassword.get.toCharArray) + + val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) + tmf.init(ks) + tmf.getTrustManagers + } finally { + input.close() + } + } + + lazy val credulousTrustStoreManagers = Array({ + logWarning("Using 'accept-all' trust manager for SSL connections.") + new X509TrustManager { + override def getAcceptedIssuers: Array[X509Certificate] = null + + override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {} + + override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {} + }: TrustManager + }) + + val sslContext = SSLContext.getInstance(fileServerSSLOptions.protocol.getOrElse("Default")) + sslContext.init(null, trustStoreManagers.getOrElse(credulousTrustStoreManagers), null) + + val hostVerifier = new HostnameVerifier { + override def verify(s: String, sslSession: SSLSession): Boolean = true + } + + (Some(sslContext.getSocketFactory), Some(hostVerifier)) + } else { + (None, None) + } + /** * Split a comma separated String, filter out any empty items, and return a Set of strings */ diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 4d4c69d42d..13aa9960ac 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -370,6 +370,7 @@ private[spark] object SparkConf { isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth") || + name.startsWith("spark.ssl") || isSparkPortConf(name) } diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index ea98051532..1444c0dd3d 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -199,6 +199,7 @@ private[broadcast] object HttpBroadcast extends Logging { uc = new URL(url).openConnection() uc.setConnectTimeout(httpReadTimeout) } + Utils.setupSecureURLConnection(uc, securityManager) val in = { uc.setReadTimeout(httpReadTimeout) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 65a1a8fd7e..ae55b4ff40 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -28,5 +28,14 @@ private[spark] class ApplicationDescription( val user = System.getProperty("user.name", "") + def copy( + name: String = name, + maxCores: Option[Int] = maxCores, + memoryPerSlave: Int = memoryPerSlave, + command: Command = command, + appUiUrl: String = appUiUrl, + eventLogDir: Option[String] = eventLogDir): ApplicationDescription = + new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir) + override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 7c1c831c24..38b3da0b13 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -39,7 +39,8 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) val timeout = AkkaUtils.askTimeout(conf) override def preStart() = { - masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master)) + masterActor = context.actorSelection( + Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(context.system))) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) @@ -161,7 +162,7 @@ object Client { "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) // Verify driverArgs.master is a valid url so that we can use it in ClientActor safely - Master.toAkkaUrl(driverArgs.master) + Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(actorSystem)) actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf)) actorSystem.awaitTermination() diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala index 58c95dc4f9..b056a19ce6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -25,5 +25,13 @@ private[spark] class DriverDescription( val command: Command) extends Serializable { + def copy( + jarUrl: String = jarUrl, + mem: Int = mem, + cores: Int = cores, + supervise: Boolean = supervise, + command: Command = command): DriverDescription = + new DriverDescription(jarUrl, mem, cores, supervise, command) + override def toString: String = s"DriverDescription (${command.mainClass})" } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 39a7b0319b..ffe940fbda 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -47,7 +47,7 @@ private[spark] class AppClient( conf: SparkConf) extends Logging { - val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl) + val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem))) val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 @@ -107,8 +107,9 @@ private[spark] class AppClient( def changeMaster(url: String) { // activeMasterUrl is a valid Spark url since we receive it from master. activeMasterUrl = url - master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) - masterAddress = Master.toAkkaAddress(activeMasterUrl) + master = context.actorSelection( + Master.toAkkaUrl(activeMasterUrl, AkkaUtils.protocol(actorSystem))) + masterAddress = Master.toAkkaAddress(activeMasterUrl, AkkaUtils.protocol(actorSystem)) } private def isPossibleMaster(remoteUrl: Address) = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index d92d99310a..5eeb9fe526 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -860,9 +860,9 @@ private[spark] object Master extends Logging { * * @throws SparkException if the url is invalid */ - def toAkkaUrl(sparkUrl: String): String = { + def toAkkaUrl(sparkUrl: String, protocol: String): String = { val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl) - "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName) + AkkaUtils.address(protocol, systemName, host, port, actorName) } /** @@ -870,9 +870,9 @@ private[spark] object Master extends Logging { * * @throws SparkException if the url is invalid */ - def toAkkaAddress(sparkUrl: String): Address = { + def toAkkaAddress(sparkUrl: String, protocol: String): Address = { val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl) - Address("akka.tcp", systemName, host, port) + Address(protocol, systemName, host, port) } def startSystemAndActor( diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index acbdf0d8bd..bc9f78b9e5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -26,7 +26,7 @@ import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.spark.{SparkConf, Logging} -import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState} +import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged import org.apache.spark.util.logging.FileAppender diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 1359983012..b20f5c0c82 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -31,8 +31,8 @@ import scala.util.Random import akka.actor._ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} -import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} +import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI @@ -93,7 +93,12 @@ private[spark] class Worker( var masterAddress: Address = null var activeMasterUrl: String = "" var activeMasterWebUiUrl : String = "" - val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName) + val akkaUrl = AkkaUtils.address( + AkkaUtils.protocol(context.system), + actorSystemName, + host, + port, + actorName) @volatile var registered = false @volatile var connected = false val workerId = generateWorkerId() @@ -174,8 +179,9 @@ private[spark] class Worker( // activeMasterUrl it's a valid Spark url since we receive it from master. activeMasterUrl = url activeMasterWebUiUrl = uiUrl - master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) - masterAddress = Master.toAkkaAddress(activeMasterUrl) + master = context.actorSelection( + Master.toAkkaUrl(activeMasterUrl, AkkaUtils.protocol(context.system))) + masterAddress = Master.toAkkaAddress(activeMasterUrl, AkkaUtils.protocol(context.system)) connected = true // Cancel any outstanding re-registration attempts because we found a new master registrationRetryTimer.foreach(_.cancel()) @@ -347,10 +353,20 @@ private[spark] class Worker( }.toSeq } appDirectories(appId) = appLocalDirs - - val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs, - ExecutorState.LOADING) + val manager = new ExecutorRunner( + appId, + execId, + appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)), + cores_, + memory_, + self, + workerId, + host, + sparkHome, + executorDir, + akkaUrl, + conf, + appLocalDirs, ExecutorState.LOADING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ @@ -406,7 +422,14 @@ private[spark] class Worker( case LaunchDriver(driverId, driverDesc) => { logInfo(s"Asked to launch driver $driverId") - val driver = new DriverRunner(conf, driverId, workDir, sparkHome, driverDesc, self, akkaUrl) + val driver = new DriverRunner( + conf, + driverId, + workDir, + sparkHome, + driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), + self, + akkaUrl) drivers(driverId) = driver driver.start() @@ -523,10 +546,32 @@ private[spark] object Worker extends Logging { val securityMgr = new SecurityManager(conf) val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) - val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl) + val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem))) actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory, masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName) (actorSystem, boundPort) } + private[spark] def isUseLocalNodeSSLConfig(cmd: Command): Boolean = { + val pattern = """\-Dspark\.ssl\.useNodeLocalConf\=(.+)""".r + val result = cmd.javaOpts.collectFirst { + case pattern(_result) => _result.toBoolean + } + result.getOrElse(false) + } + + private[spark] def maybeUpdateSSLSettings(cmd: Command, conf: SparkConf): Command = { + val prefix = "spark.ssl." + val useNLC = "spark.ssl.useNodeLocalConf" + if (isUseLocalNodeSSLConfig(cmd)) { + val newJavaOpts = cmd.javaOpts + .filter(opt => !opt.startsWith(s"-D$prefix")) ++ + conf.getAll.collect { case (key, value) if key.startsWith(prefix) => s"-D$key=$value" } :+ + s"-D$useNLC=true" + cmd.copy(javaOpts = newJavaOpts) + } else { + cmd + } + } + } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 8238253026..bc72c89703 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -123,7 +123,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val executorConf = new SparkConf val port = executorConf.getInt("spark.executor.port", 0) val (fetcher, _) = AkkaUtils.createActorSystem( - "driverPropsFetcher", hostname, port, executorConf, new SecurityManager(executorConf)) + "driverPropsFetcher", + hostname, + port, + executorConf, + new SecurityManager(executorConf)) val driver = fetcher.actorSelection(driverUrl) val timeout = AkkaUtils.askTimeout(executorConf) val fut = Patterns.ask(driver, RetrieveSparkProps, timeout) @@ -132,7 +136,15 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { fetcher.shutdown() // Create SparkEnv using properties we fetched from the driver. - val driverConf = new SparkConf().setAll(props) + val driverConf = new SparkConf() + for ((key, value) <- props) { + // this is required for SSL in standalone mode + if (SparkConf.isExecutorStartupConf(key)) { + driverConf.setIfMissing(key, value) + } else { + driverConf.set(key, value) + } + } val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, port, cores, isLocal = false) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index ee10aa061f..06786a5952 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.spark.{Logging, SparkContext, SparkEnv} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.AkkaUtils private[spark] class SimrSchedulerBackend( scheduler: TaskSchedulerImpl, @@ -38,7 +39,8 @@ private[spark] class SimrSchedulerBackend( override def start() { super.start() - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + val driverUrl = AkkaUtils.address( + AkkaUtils.protocol(actorSystem), SparkEnv.driverActorSystemName, sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"), diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 7eb87a564d..d2e1680a5f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -21,7 +21,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv} import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{AppClient, AppClientListener} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl} -import org.apache.spark.util.Utils +import org.apache.spark.util.{AkkaUtils, Utils} private[spark] class SparkDeploySchedulerBackend( scheduler: TaskSchedulerImpl, @@ -46,7 +46,8 @@ private[spark] class SparkDeploySchedulerBackend( super.start() // The endpoint for executors to talk to us - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + val driverUrl = AkkaUtils.address( + AkkaUtils.protocol(actorSystem), SparkEnv.driverActorSystemName, conf.get("spark.driver.host"), conf.get("spark.driver.port"), diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 5289661eb8..0d1c2a916c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -31,7 +31,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTas import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -import org.apache.spark.util.Utils +import org.apache.spark.util.{Utils, AkkaUtils} /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds @@ -143,7 +143,8 @@ private[spark] class CoarseMesosSchedulerBackend( } val command = CommandInfo.newBuilder() .setEnvironment(environment) - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + val driverUrl = AkkaUtils.address( + AkkaUtils.protocol(sc.env.actorSystem), SparkEnv.driverActorSystemName, conf.get("spark.driver.host"), conf.get("spark.driver.port"), diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 4c9b1e3c46..3d9c6192ff 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.util import scala.collection.JavaConversions.mapAsJavaMap import scala.concurrent.Await import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.util.Try import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem} import akka.pattern.ask @@ -91,8 +92,11 @@ private[spark] object AkkaUtils extends Logging { val secureCookie = if (isAuthOn) secretKey else "" logDebug(s"In createActorSystem, requireCookie is: $requireCookie") - val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback( - ConfigFactory.parseString( + val akkaSslConfig = securityManager.akkaSSLOptions.createAkkaConfig + .getOrElse(ConfigFactory.empty()) + + val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]) + .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString( s""" |akka.daemonic = on |akka.loggers = [""akka.event.slf4j.Slf4jLogger""] @@ -214,7 +218,7 @@ private[spark] object AkkaUtils extends Logging { val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost, "Expected hostname") - val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name" + val url = address(protocol(actorSystem), driverActorSystemName, driverHost, driverPort, name) val timeout = AkkaUtils.lookupTimeout(conf) logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) @@ -228,9 +232,33 @@ private[spark] object AkkaUtils extends Logging { actorSystem: ActorSystem): ActorRef = { val executorActorSystemName = SparkEnv.executorActorSystemName Utils.checkHost(host, "Expected hostname") - val url = s"akka.tcp://$executorActorSystemName@$host:$port/user/$name" + val url = address(protocol(actorSystem), executorActorSystemName, host, port, name) val timeout = AkkaUtils.lookupTimeout(conf) logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) } + + def protocol(actorSystem: ActorSystem): String = { + val akkaConf = actorSystem.settings.config + val sslProp = "akka.remote.netty.tcp.enable-ssl" + protocol(akkaConf.hasPath(sslProp) && akkaConf.getBoolean(sslProp)) + } + + def protocol(ssl: Boolean = false): String = { + if (ssl) { + "akka.ssl.tcp" + } else { + "akka.tcp" + } + } + + def address( + protocol: String, + systemName: String, + host: String, + port: Any, + actorName: String): String = { + s"$protocol://$systemName@$host:$port/user/$actorName" + } + } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 31850b50bd..e9f2aed9ff 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -21,8 +21,9 @@ import java.io._ import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer -import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor} -import java.util.{Locale, Properties, Random, UUID} +import java.util.{Properties, Locale, Random, UUID} +import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor} +import javax.net.ssl.HttpsURLConnection import scala.collection.JavaConversions._ import scala.collection.Map @@ -575,6 +576,7 @@ private[spark] object Utils extends Logging { logDebug("fetchFile not using security") uc = new URL(url).openConnection() } + Utils.setupSecureURLConnection(uc, securityMgr) val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000 uc.setConnectTimeout(timeout) @@ -1820,6 +1822,20 @@ private[spark] object Utils extends Logging { PropertyConfigurator.configure(pro) } + /** + * If the given URL connection is HttpsURLConnection, it sets the SSL socket factory and + * the host verifier from the given security manager. + */ + def setupSecureURLConnection(urlConnection: URLConnection, sm: SecurityManager): URLConnection = { + urlConnection match { + case https: HttpsURLConnection => + sm.sslSocketFactory.foreach(https.setSSLSocketFactory) + sm.hostnameVerifier.foreach(https.setHostnameVerifier) + https + case connection => connection + } + } + def invoke( clazz: Class[_], obj: AnyRef, -- cgit v1.2.3