aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/HttpServer.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/SSLOptions.scala178
-rw-r--r--core/src/main/scala/org/apache/spark/SecurityManager.scala100
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala2
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala67
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala20
18 files changed, 440 insertions, 43 deletions
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index fa22787ce7..09a9ccc226 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -19,6 +19,7 @@ package org.apache.spark
import java.io.File
+import org.eclipse.jetty.server.ssl.SslSocketConnector
import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
@@ -72,7 +73,10 @@ private[spark] class HttpServer(
*/
private def doStart(startPort: Int): (Server, Int) = {
val server = new Server()
- val connector = new SocketConnector
+
+ val connector = securityManager.fileServerSSLOptions.createJettySslContextFactory()
+ .map(new SslSocketConnector(_)).getOrElse(new SocketConnector)
+
connector.setMaxIdleTime(60 * 1000)
connector.setSoLingerTime(-1)
connector.setPort(startPort)
@@ -149,13 +153,14 @@ private[spark] class HttpServer(
}
/**
- * Get the URI of this HTTP server (http://host:port)
+ * Get the URI of this HTTP server (http://host:port or https://host:port)
*/
def uri: String = {
if (server == null) {
throw new ServerStateException("Server is not started")
} else {
- "http://" + Utils.localIpAddress + ":" + port
+ val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http"
+ s"$scheme://${Utils.localIpAddress}:$port"
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
new file mode 100644
index 0000000000..2cdc167f85
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.File
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
+import org.eclipse.jetty.util.ssl.SslContextFactory
+
+/**
+ * SSLOptions class is a common container for SSL configuration options. It offers methods to
+ * generate specific objects to configure SSL for different communication protocols.
+ *
+ * SSLOptions is intended to provide the maximum common set of SSL settings, which are supported
+ * by the protocol, which it can generate the configuration for. Since Akka doesn't support client
+ * authentication with SSL, SSLOptions cannot support it either.
+ *
+ * @param enabled enables or disables SSL; if it is set to false, the rest of the
+ * settings are disregarded
+ * @param keyStore a path to the key-store file
+ * @param keyStorePassword a password to access the key-store file
+ * @param keyPassword a password to access the private key in the key-store
+ * @param trustStore a path to the trust-store file
+ * @param trustStorePassword a password to access the trust-store file
+ * @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
+ * @param enabledAlgorithms a set of encryption algorithms to use
+ */
+private[spark] case class SSLOptions(
+ enabled: Boolean = false,
+ keyStore: Option[File] = None,
+ keyStorePassword: Option[String] = None,
+ keyPassword: Option[String] = None,
+ trustStore: Option[File] = None,
+ trustStorePassword: Option[String] = None,
+ protocol: Option[String] = None,
+ enabledAlgorithms: Set[String] = Set.empty) {
+
+ /**
+ * Creates a Jetty SSL context factory according to the SSL settings represented by this object.
+ */
+ def createJettySslContextFactory(): Option[SslContextFactory] = {
+ if (enabled) {
+ val sslContextFactory = new SslContextFactory()
+
+ keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath))
+ trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
+ keyStorePassword.foreach(sslContextFactory.setKeyStorePassword)
+ trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
+ keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
+ protocol.foreach(sslContextFactory.setProtocol)
+ sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*)
+
+ Some(sslContextFactory)
+ } else {
+ None
+ }
+ }
+
+ /**
+ * Creates an Akka configuration object which contains all the SSL settings represented by this
+ * object. It can be used then to compose the ultimate Akka configuration.
+ */
+ def createAkkaConfig: Option[Config] = {
+ import scala.collection.JavaConversions._
+ if (enabled) {
+ Some(ConfigFactory.empty()
+ .withValue("akka.remote.netty.tcp.security.key-store",
+ ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
+ .withValue("akka.remote.netty.tcp.security.key-store-password",
+ ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse("")))
+ .withValue("akka.remote.netty.tcp.security.trust-store",
+ ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse("")))
+ .withValue("akka.remote.netty.tcp.security.trust-store-password",
+ ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse("")))
+ .withValue("akka.remote.netty.tcp.security.key-password",
+ ConfigValueFactory.fromAnyRef(keyPassword.getOrElse("")))
+ .withValue("akka.remote.netty.tcp.security.random-number-generator",
+ ConfigValueFactory.fromAnyRef(""))
+ .withValue("akka.remote.netty.tcp.security.protocol",
+ ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
+ .withValue("akka.remote.netty.tcp.security.enabled-algorithms",
+ ConfigValueFactory.fromIterable(enabledAlgorithms.toSeq))
+ .withValue("akka.remote.netty.tcp.enable-ssl",
+ ConfigValueFactory.fromAnyRef(true)))
+ } else {
+ None
+ }
+ }
+
+ /** Returns a string representation of this SSLOptions with all the passwords masked. */
+ override def toString: String = s"SSLOptions{enabled=$enabled, " +
+ s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
+ s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
+ s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"
+
+}
+
+private[spark] object SSLOptions extends Logging {
+
+ /** Resolves SSLOptions settings from a given Spark configuration object at a given namespace.
+ *
+ * The following settings are allowed:
+ * $ - `[ns].enabled` - `true` or `false`, to enable or disable SSL respectively
+ * $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
+ * $ - `[ns].keyStorePassword` - a password to the key-store file
+ * $ - `[ns].keyPassword` - a password to the private key
+ * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
+ * directory
+ * $ - `[ns].trustStorePassword` - a password to the trust-store file
+ * $ - `[ns].protocol` - a protocol name supported by a particular Java version
+ * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
+ *
+ * For a list of protocols and ciphers supported by particular Java versions, you may go to
+ * [[https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https Oracle
+ * blog page]].
+ *
+ * You can optionally specify the default configuration. If you do, for each setting which is
+ * missing in SparkConf, the corresponding setting is used from the default configuration.
+ *
+ * @param conf Spark configuration object where the settings are collected from
+ * @param ns the namespace name
+ * @param defaults the default configuration
+ * @return [[org.apache.spark.SSLOptions]] object
+ */
+ def parse(conf: SparkConf, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = {
+ val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled))
+
+ val keyStore = conf.getOption(s"$ns.keyStore").map(new File(_))
+ .orElse(defaults.flatMap(_.keyStore))
+
+ val keyStorePassword = conf.getOption(s"$ns.keyStorePassword")
+ .orElse(defaults.flatMap(_.keyStorePassword))
+
+ val keyPassword = conf.getOption(s"$ns.keyPassword")
+ .orElse(defaults.flatMap(_.keyPassword))
+
+ val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_))
+ .orElse(defaults.flatMap(_.trustStore))
+
+ val trustStorePassword = conf.getOption(s"$ns.trustStorePassword")
+ .orElse(defaults.flatMap(_.trustStorePassword))
+
+ val protocol = conf.getOption(s"$ns.protocol")
+ .orElse(defaults.flatMap(_.protocol))
+
+ val enabledAlgorithms = conf.getOption(s"$ns.enabledAlgorithms")
+ .map(_.split(",").map(_.trim).filter(_.nonEmpty).toSet)
+ .orElse(defaults.map(_.enabledAlgorithms))
+ .getOrElse(Set.empty)
+
+ new SSLOptions(
+ enabled,
+ keyStore,
+ keyStorePassword,
+ keyPassword,
+ trustStore,
+ trustStorePassword,
+ protocol,
+ enabledAlgorithms)
+ }
+
+}
+
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index ec82d09cd0..88d35a4bac 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -18,7 +18,11 @@
package org.apache.spark
import java.net.{Authenticator, PasswordAuthentication}
+import java.security.KeyStore
+import java.security.cert.X509Certificate
+import javax.net.ssl._
+import com.google.common.io.Files
import org.apache.hadoop.io.Text
import org.apache.spark.deploy.SparkHadoopUtil
@@ -55,7 +59,7 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* Spark also has a set of admin acls (`spark.admin.acls`) which is a set of users/administrators
* who always have permission to view or modify the Spark application.
*
- * Spark does not currently support encryption after authentication.
+ * Starting from version 1.3, Spark has partial support for encrypted connections with SSL.
*
* At this point spark has multiple communication protocols that need to be secured and
* different underlying mechanisms are used depending on the protocol:
@@ -67,8 +71,9 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* to connect to the server. There is no control of the underlying
* authentication mechanism so its not clear if the password is passed in
* plaintext or uses DIGEST-MD5 or some other mechanism.
- * Akka also has an option to turn on SSL, this option is not currently supported
- * but we could add a configuration option in the future.
+ *
+ * Akka also has an option to turn on SSL, this option is currently supported (see
+ * the details below).
*
* - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
* for the HttpServer. Jetty supports multiple authentication mechanisms -
@@ -77,8 +82,9 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* to authenticate using DIGEST-MD5 via a single user and the shared secret.
* Since we are using DIGEST-MD5, the shared secret is not passed on the wire
* in plaintext.
- * We currently do not support SSL (https), but Jetty can be configured to use it
- * so we could add a configuration option for this in the future.
+ *
+ * We currently support SSL (https) for this communication protocol (see the details
+ * below).
*
* The Spark HttpServer installs the HashLoginServer and configures it to DIGEST-MD5.
* Any clients must specify the user and password. There is a default
@@ -142,9 +148,40 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* authentication. Spark will then use that user to compare against the view acls to do
* authorization. If not filter is in place the user is generally null and no authorization
* can take place.
+ *
+ * Connection encryption (SSL) configuration is organized hierarchically. The user can configure
+ * the default SSL settings which will be used for all the supported communication protocols unless
+ * they are overwritten by protocol specific settings. This way the user can easily provide the
+ * common settings for all the protocols without disabling the ability to configure each one
+ * individually.
+ *
+ * All the SSL settings like `spark.ssl.xxx` where `xxx` is a particular configuration property,
+ * denote the global configuration for all the supported protocols. In order to override the global
+ * configuration for the particular protocol, the properties must be overwritten in the
+ * protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global
+ * configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for
+ * Akka based connections or `fs` for broadcast and file server.
+ *
+ * Refer to [[org.apache.spark.SSLOptions]] documentation for the list of
+ * options that can be specified.
+ *
+ * SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions
+ * object parses Spark configuration at a given namespace and builds the common representation
+ * of SSL settings. SSLOptions is then used to provide protocol-specific configuration like
+ * TypeSafe configuration for Akka or SSLContextFactory for Jetty.
+ *
+ * SSL must be configured on each node and configured for each component involved in
+ * communication using the particular protocol. In YARN clusters, the key-store can be prepared on
+ * the client side then distributed and used by the executors as the part of the application
+ * (YARN allows the user to deploy files before the application is started).
+ * In standalone deployment, the user needs to provide key-stores and configuration
+ * options for master and workers. In this mode, the user may allow the executors to use the SSL
+ * settings inherited from the worker which spawned that executor. It can be accomplished by
+ * setting `spark.ssl.useNodeLocalConf` to `true`.
*/
-private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder {
+private[spark] class SecurityManager(sparkConf: SparkConf)
+ extends Logging with SecretKeyHolder {
// key used to store the spark secret in the Hadoop UGI
private val sparkSecretLookupKey = "sparkCookie"
@@ -196,6 +233,57 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with
)
}
+ // the default SSL configuration - it will be used by all communication layers unless overwritten
+ private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)
+
+ // SSL configuration for different communication layers - they can override the default
+ // configuration at a specified namespace. The namespace *must* start with spark.ssl.
+ val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions))
+ val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions))
+
+ logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions")
+ logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions")
+
+ val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
+ val trustStoreManagers =
+ for (trustStore <- fileServerSSLOptions.trustStore) yield {
+ val input = Files.asByteSource(fileServerSSLOptions.trustStore.get).openStream()
+
+ try {
+ val ks = KeyStore.getInstance(KeyStore.getDefaultType)
+ ks.load(input, fileServerSSLOptions.trustStorePassword.get.toCharArray)
+
+ val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
+ tmf.init(ks)
+ tmf.getTrustManagers
+ } finally {
+ input.close()
+ }
+ }
+
+ lazy val credulousTrustStoreManagers = Array({
+ logWarning("Using 'accept-all' trust manager for SSL connections.")
+ new X509TrustManager {
+ override def getAcceptedIssuers: Array[X509Certificate] = null
+
+ override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {}
+
+ override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {}
+ }: TrustManager
+ })
+
+ val sslContext = SSLContext.getInstance(fileServerSSLOptions.protocol.getOrElse("Default"))
+ sslContext.init(null, trustStoreManagers.getOrElse(credulousTrustStoreManagers), null)
+
+ val hostVerifier = new HostnameVerifier {
+ override def verify(s: String, sslSession: SSLSession): Boolean = true
+ }
+
+ (Some(sslContext.getSocketFactory), Some(hostVerifier))
+ } else {
+ (None, None)
+ }
+
/**
* Split a comma separated String, filter out any empty items, and return a Set of strings
*/
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 4d4c69d42d..13aa9960ac 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -370,6 +370,7 @@ private[spark] object SparkConf {
isAkkaConf(name) ||
name.startsWith("spark.akka") ||
name.startsWith("spark.auth") ||
+ name.startsWith("spark.ssl") ||
isSparkPortConf(name)
}
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index ea98051532..1444c0dd3d 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -199,6 +199,7 @@ private[broadcast] object HttpBroadcast extends Logging {
uc = new URL(url).openConnection()
uc.setConnectTimeout(httpReadTimeout)
}
+ Utils.setupSecureURLConnection(uc, securityManager)
val in = {
uc.setReadTimeout(httpReadTimeout)
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 65a1a8fd7e..ae55b4ff40 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -28,5 +28,14 @@ private[spark] class ApplicationDescription(
val user = System.getProperty("user.name", "<unknown>")
+ def copy(
+ name: String = name,
+ maxCores: Option[Int] = maxCores,
+ memoryPerSlave: Int = memoryPerSlave,
+ command: Command = command,
+ appUiUrl: String = appUiUrl,
+ eventLogDir: Option[String] = eventLogDir): ApplicationDescription =
+ new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir)
+
override def toString: String = "ApplicationDescription(" + name + ")"
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 7c1c831c24..38b3da0b13 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -39,7 +39,8 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
val timeout = AkkaUtils.askTimeout(conf)
override def preStart() = {
- masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
+ masterActor = context.actorSelection(
+ Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(context.system)))
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
@@ -161,7 +162,7 @@ object Client {
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
- Master.toAkkaUrl(driverArgs.master)
+ Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(actorSystem))
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
actorSystem.awaitTermination()
diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
index 58c95dc4f9..b056a19ce6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
@@ -25,5 +25,13 @@ private[spark] class DriverDescription(
val command: Command)
extends Serializable {
+ def copy(
+ jarUrl: String = jarUrl,
+ mem: Int = mem,
+ cores: Int = cores,
+ supervise: Boolean = supervise,
+ command: Command = command): DriverDescription =
+ new DriverDescription(jarUrl, mem, cores, supervise, command)
+
override def toString: String = s"DriverDescription (${command.mainClass})"
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 39a7b0319b..ffe940fbda 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -47,7 +47,7 @@ private[spark] class AppClient(
conf: SparkConf)
extends Logging {
- val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)
+ val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
@@ -107,8 +107,9 @@ private[spark] class AppClient(
def changeMaster(url: String) {
// activeMasterUrl is a valid Spark url since we receive it from master.
activeMasterUrl = url
- master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
- masterAddress = Master.toAkkaAddress(activeMasterUrl)
+ master = context.actorSelection(
+ Master.toAkkaUrl(activeMasterUrl, AkkaUtils.protocol(actorSystem)))
+ masterAddress = Master.toAkkaAddress(activeMasterUrl, AkkaUtils.protocol(actorSystem))
}
private def isPossibleMaster(remoteUrl: Address) = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index d92d99310a..5eeb9fe526 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -860,9 +860,9 @@ private[spark] object Master extends Logging {
*
* @throws SparkException if the url is invalid
*/
- def toAkkaUrl(sparkUrl: String): String = {
+ def toAkkaUrl(sparkUrl: String, protocol: String): String = {
val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
- "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
+ AkkaUtils.address(protocol, systemName, host, port, actorName)
}
/**
@@ -870,9 +870,9 @@ private[spark] object Master extends Logging {
*
* @throws SparkException if the url is invalid
*/
- def toAkkaAddress(sparkUrl: String): Address = {
+ def toAkkaAddress(sparkUrl: String, protocol: String): Address = {
val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
- Address("akka.tcp", systemName, host, port)
+ Address(protocol, systemName, host, port)
}
def startSystemAndActor(
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index acbdf0d8bd..bc9f78b9e5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -26,7 +26,7 @@ import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
import org.apache.spark.{SparkConf, Logging}
-import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
+import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import org.apache.spark.util.logging.FileAppender
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 1359983012..b20f5c0c82 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -31,8 +31,8 @@ import scala.util.Random
import akka.actor._
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
-import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
@@ -93,7 +93,12 @@ private[spark] class Worker(
var masterAddress: Address = null
var activeMasterUrl: String = ""
var activeMasterWebUiUrl : String = ""
- val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName)
+ val akkaUrl = AkkaUtils.address(
+ AkkaUtils.protocol(context.system),
+ actorSystemName,
+ host,
+ port,
+ actorName)
@volatile var registered = false
@volatile var connected = false
val workerId = generateWorkerId()
@@ -174,8 +179,9 @@ private[spark] class Worker(
// activeMasterUrl it's a valid Spark url since we receive it from master.
activeMasterUrl = url
activeMasterWebUiUrl = uiUrl
- master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
- masterAddress = Master.toAkkaAddress(activeMasterUrl)
+ master = context.actorSelection(
+ Master.toAkkaUrl(activeMasterUrl, AkkaUtils.protocol(context.system)))
+ masterAddress = Master.toAkkaAddress(activeMasterUrl, AkkaUtils.protocol(context.system))
connected = true
// Cancel any outstanding re-registration attempts because we found a new master
registrationRetryTimer.foreach(_.cancel())
@@ -347,10 +353,20 @@ private[spark] class Worker(
}.toSeq
}
appDirectories(appId) = appLocalDirs
-
- val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs,
- ExecutorState.LOADING)
+ val manager = new ExecutorRunner(
+ appId,
+ execId,
+ appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
+ cores_,
+ memory_,
+ self,
+ workerId,
+ host,
+ sparkHome,
+ executorDir,
+ akkaUrl,
+ conf,
+ appLocalDirs, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
@@ -406,7 +422,14 @@ private[spark] class Worker(
case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
- val driver = new DriverRunner(conf, driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
+ val driver = new DriverRunner(
+ conf,
+ driverId,
+ workDir,
+ sparkHome,
+ driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
+ self,
+ akkaUrl)
drivers(driverId) = driver
driver.start()
@@ -523,10 +546,32 @@ private[spark] object Worker extends Logging {
val securityMgr = new SecurityManager(conf)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
conf = conf, securityManager = securityMgr)
- val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)
+ val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
(actorSystem, boundPort)
}
+ private[spark] def isUseLocalNodeSSLConfig(cmd: Command): Boolean = {
+ val pattern = """\-Dspark\.ssl\.useNodeLocalConf\=(.+)""".r
+ val result = cmd.javaOpts.collectFirst {
+ case pattern(_result) => _result.toBoolean
+ }
+ result.getOrElse(false)
+ }
+
+ private[spark] def maybeUpdateSSLSettings(cmd: Command, conf: SparkConf): Command = {
+ val prefix = "spark.ssl."
+ val useNLC = "spark.ssl.useNodeLocalConf"
+ if (isUseLocalNodeSSLConfig(cmd)) {
+ val newJavaOpts = cmd.javaOpts
+ .filter(opt => !opt.startsWith(s"-D$prefix")) ++
+ conf.getAll.collect { case (key, value) if key.startsWith(prefix) => s"-D$key=$value" } :+
+ s"-D$useNLC=true"
+ cmd.copy(javaOpts = newJavaOpts)
+ } else {
+ cmd
+ }
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 8238253026..bc72c89703 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -123,7 +123,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val executorConf = new SparkConf
val port = executorConf.getInt("spark.executor.port", 0)
val (fetcher, _) = AkkaUtils.createActorSystem(
- "driverPropsFetcher", hostname, port, executorConf, new SecurityManager(executorConf))
+ "driverPropsFetcher",
+ hostname,
+ port,
+ executorConf,
+ new SecurityManager(executorConf))
val driver = fetcher.actorSelection(driverUrl)
val timeout = AkkaUtils.askTimeout(executorConf)
val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
@@ -132,7 +136,15 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
fetcher.shutdown()
// Create SparkEnv using properties we fetched from the driver.
- val driverConf = new SparkConf().setAll(props)
+ val driverConf = new SparkConf()
+ for ((key, value) <- props) {
+ // this is required for SSL in standalone mode
+ if (SparkConf.isExecutorStartupConf(key)) {
+ driverConf.setIfMissing(key, value)
+ } else {
+ driverConf.set(key, value)
+ }
+ }
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, isLocal = false)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index ee10aa061f..06786a5952 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.{Logging, SparkContext, SparkEnv}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.AkkaUtils
private[spark] class SimrSchedulerBackend(
scheduler: TaskSchedulerImpl,
@@ -38,7 +39,8 @@ private[spark] class SimrSchedulerBackend(
override def start() {
super.start()
- val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ val driverUrl = AkkaUtils.address(
+ AkkaUtils.protocol(actorSystem),
SparkEnv.driverActorSystemName,
sc.conf.get("spark.driver.host"),
sc.conf.get("spark.driver.port"),
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 7eb87a564d..d2e1680a5f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -21,7 +21,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class SparkDeploySchedulerBackend(
scheduler: TaskSchedulerImpl,
@@ -46,7 +46,8 @@ private[spark] class SparkDeploySchedulerBackend(
super.start()
// The endpoint for executors to talk to us
- val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ val driverUrl = AkkaUtils.address(
+ AkkaUtils.protocol(actorSystem),
SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 5289661eb8..0d1c2a916c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -31,7 +31,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTas
import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Utils, AkkaUtils}
/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -143,7 +143,8 @@ private[spark] class CoarseMesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
- val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ val driverUrl = AkkaUtils.address(
+ AkkaUtils.protocol(sc.env.actorSystem),
SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 4c9b1e3c46..3d9c6192ff 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.Await
import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.util.Try
import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
import akka.pattern.ask
@@ -91,8 +92,11 @@ private[spark] object AkkaUtils extends Logging {
val secureCookie = if (isAuthOn) secretKey else ""
logDebug(s"In createActorSystem, requireCookie is: $requireCookie")
- val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
- ConfigFactory.parseString(
+ val akkaSslConfig = securityManager.akkaSSLOptions.createAkkaConfig
+ .getOrElse(ConfigFactory.empty())
+
+ val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String])
+ .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString(
s"""
|akka.daemonic = on
|akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
@@ -214,7 +218,7 @@ private[spark] object AkkaUtils extends Logging {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
- val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name"
+ val url = address(protocol(actorSystem), driverActorSystemName, driverHost, driverPort, name)
val timeout = AkkaUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
@@ -228,9 +232,33 @@ private[spark] object AkkaUtils extends Logging {
actorSystem: ActorSystem): ActorRef = {
val executorActorSystemName = SparkEnv.executorActorSystemName
Utils.checkHost(host, "Expected hostname")
- val url = s"akka.tcp://$executorActorSystemName@$host:$port/user/$name"
+ val url = address(protocol(actorSystem), executorActorSystemName, host, port, name)
val timeout = AkkaUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
+
+ def protocol(actorSystem: ActorSystem): String = {
+ val akkaConf = actorSystem.settings.config
+ val sslProp = "akka.remote.netty.tcp.enable-ssl"
+ protocol(akkaConf.hasPath(sslProp) && akkaConf.getBoolean(sslProp))
+ }
+
+ def protocol(ssl: Boolean = false): String = {
+ if (ssl) {
+ "akka.ssl.tcp"
+ } else {
+ "akka.tcp"
+ }
+ }
+
+ def address(
+ protocol: String,
+ systemName: String,
+ host: String,
+ port: Any,
+ actorName: String): String = {
+ s"$protocol://$systemName@$host:$port/user/$actorName"
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 31850b50bd..e9f2aed9ff 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -21,8 +21,9 @@ import java.io._
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
-import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
-import java.util.{Locale, Properties, Random, UUID}
+import java.util.{Properties, Locale, Random, UUID}
+import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
+import javax.net.ssl.HttpsURLConnection
import scala.collection.JavaConversions._
import scala.collection.Map
@@ -575,6 +576,7 @@ private[spark] object Utils extends Logging {
logDebug("fetchFile not using security")
uc = new URL(url).openConnection()
}
+ Utils.setupSecureURLConnection(uc, securityMgr)
val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000
uc.setConnectTimeout(timeout)
@@ -1820,6 +1822,20 @@ private[spark] object Utils extends Logging {
PropertyConfigurator.configure(pro)
}
+ /**
+ * If the given URL connection is HttpsURLConnection, it sets the SSL socket factory and
+ * the host verifier from the given security manager.
+ */
+ def setupSecureURLConnection(urlConnection: URLConnection, sm: SecurityManager): URLConnection = {
+ urlConnection match {
+ case https: HttpsURLConnection =>
+ sm.sslSocketFactory.foreach(https.setSSLSocketFactory)
+ sm.hostnameVerifier.foreach(https.setHostnameVerifier)
+ https
+ case connection => connection
+ }
+ }
+
def invoke(
clazz: Class[_],
obj: AnyRef,