aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/HttpServer.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/SSLOptions.scala178
-rw-r--r--core/src/main/scala/org/apache/spark/SecurityManager.scala100
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala2
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala67
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala20
-rw-r--r--core/src/test/resources/keystorebin0 -> 2247 bytes
-rw-r--r--core/src/test/resources/truststorebin0 -> 957 bytes
-rw-r--r--core/src/test/resources/untrusted-keystorebin0 -> 2246 bytes
-rw-r--r--core/src/test/scala/org/apache/spark/FileServerSuite.scala90
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala123
-rw-r--r--core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala55
-rw-r--r--core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala50
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala57
-rw-r--r--core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala197
-rw-r--r--docs/configuration.md80
-rw-r--r--docs/security.md24
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala8
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala4
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala7
36 files changed, 1145 insertions, 73 deletions
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index fa22787ce7..09a9ccc226 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -19,6 +19,7 @@ package org.apache.spark
import java.io.File
+import org.eclipse.jetty.server.ssl.SslSocketConnector
import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
@@ -72,7 +73,10 @@ private[spark] class HttpServer(
*/
private def doStart(startPort: Int): (Server, Int) = {
val server = new Server()
- val connector = new SocketConnector
+
+ val connector = securityManager.fileServerSSLOptions.createJettySslContextFactory()
+ .map(new SslSocketConnector(_)).getOrElse(new SocketConnector)
+
connector.setMaxIdleTime(60 * 1000)
connector.setSoLingerTime(-1)
connector.setPort(startPort)
@@ -149,13 +153,14 @@ private[spark] class HttpServer(
}
/**
- * Get the URI of this HTTP server (http://host:port)
+ * Get the URI of this HTTP server (http://host:port or https://host:port)
*/
def uri: String = {
if (server == null) {
throw new ServerStateException("Server is not started")
} else {
- "http://" + Utils.localIpAddress + ":" + port
+ val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http"
+ s"$scheme://${Utils.localIpAddress}:$port"
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
new file mode 100644
index 0000000000..2cdc167f85
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.File
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
+import org.eclipse.jetty.util.ssl.SslContextFactory
+
+/**
+ * SSLOptions class is a common container for SSL configuration options. It offers methods to
+ * generate specific objects to configure SSL for different communication protocols.
+ *
+ * SSLOptions is intended to provide the maximum common set of SSL settings, which are supported
+ * by the protocol, which it can generate the configuration for. Since Akka doesn't support client
+ * authentication with SSL, SSLOptions cannot support it either.
+ *
+ * @param enabled enables or disables SSL; if it is set to false, the rest of the
+ * settings are disregarded
+ * @param keyStore a path to the key-store file
+ * @param keyStorePassword a password to access the key-store file
+ * @param keyPassword a password to access the private key in the key-store
+ * @param trustStore a path to the trust-store file
+ * @param trustStorePassword a password to access the trust-store file
+ * @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
+ * @param enabledAlgorithms a set of encryption algorithms to use
+ */
+private[spark] case class SSLOptions(
+ enabled: Boolean = false,
+ keyStore: Option[File] = None,
+ keyStorePassword: Option[String] = None,
+ keyPassword: Option[String] = None,
+ trustStore: Option[File] = None,
+ trustStorePassword: Option[String] = None,
+ protocol: Option[String] = None,
+ enabledAlgorithms: Set[String] = Set.empty) {
+
+ /**
+ * Creates a Jetty SSL context factory according to the SSL settings represented by this object.
+ */
+ def createJettySslContextFactory(): Option[SslContextFactory] = {
+ if (enabled) {
+ val sslContextFactory = new SslContextFactory()
+
+ keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath))
+ trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
+ keyStorePassword.foreach(sslContextFactory.setKeyStorePassword)
+ trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
+ keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
+ protocol.foreach(sslContextFactory.setProtocol)
+ sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*)
+
+ Some(sslContextFactory)
+ } else {
+ None
+ }
+ }
+
+ /**
+ * Creates an Akka configuration object which contains all the SSL settings represented by this
+ * object. It can be used then to compose the ultimate Akka configuration.
+ */
+ def createAkkaConfig: Option[Config] = {
+ import scala.collection.JavaConversions._
+ if (enabled) {
+ Some(ConfigFactory.empty()
+ .withValue("akka.remote.netty.tcp.security.key-store",
+ ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
+ .withValue("akka.remote.netty.tcp.security.key-store-password",
+ ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse("")))
+ .withValue("akka.remote.netty.tcp.security.trust-store",
+ ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse("")))
+ .withValue("akka.remote.netty.tcp.security.trust-store-password",
+ ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse("")))
+ .withValue("akka.remote.netty.tcp.security.key-password",
+ ConfigValueFactory.fromAnyRef(keyPassword.getOrElse("")))
+ .withValue("akka.remote.netty.tcp.security.random-number-generator",
+ ConfigValueFactory.fromAnyRef(""))
+ .withValue("akka.remote.netty.tcp.security.protocol",
+ ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
+ .withValue("akka.remote.netty.tcp.security.enabled-algorithms",
+ ConfigValueFactory.fromIterable(enabledAlgorithms.toSeq))
+ .withValue("akka.remote.netty.tcp.enable-ssl",
+ ConfigValueFactory.fromAnyRef(true)))
+ } else {
+ None
+ }
+ }
+
+ /** Returns a string representation of this SSLOptions with all the passwords masked. */
+ override def toString: String = s"SSLOptions{enabled=$enabled, " +
+ s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
+ s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
+ s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"
+
+}
+
+private[spark] object SSLOptions extends Logging {
+
+ /** Resolves SSLOptions settings from a given Spark configuration object at a given namespace.
+ *
+ * The following settings are allowed:
+ * $ - `[ns].enabled` - `true` or `false`, to enable or disable SSL respectively
+ * $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
+ * $ - `[ns].keyStorePassword` - a password to the key-store file
+ * $ - `[ns].keyPassword` - a password to the private key
+ * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
+ * directory
+ * $ - `[ns].trustStorePassword` - a password to the trust-store file
+ * $ - `[ns].protocol` - a protocol name supported by a particular Java version
+ * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
+ *
+ * For a list of protocols and ciphers supported by particular Java versions, you may go to
+ * [[https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https Oracle
+ * blog page]].
+ *
+ * You can optionally specify the default configuration. If you do, for each setting which is
+ * missing in SparkConf, the corresponding setting is used from the default configuration.
+ *
+ * @param conf Spark configuration object where the settings are collected from
+ * @param ns the namespace name
+ * @param defaults the default configuration
+ * @return [[org.apache.spark.SSLOptions]] object
+ */
+ def parse(conf: SparkConf, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = {
+ val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled))
+
+ val keyStore = conf.getOption(s"$ns.keyStore").map(new File(_))
+ .orElse(defaults.flatMap(_.keyStore))
+
+ val keyStorePassword = conf.getOption(s"$ns.keyStorePassword")
+ .orElse(defaults.flatMap(_.keyStorePassword))
+
+ val keyPassword = conf.getOption(s"$ns.keyPassword")
+ .orElse(defaults.flatMap(_.keyPassword))
+
+ val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_))
+ .orElse(defaults.flatMap(_.trustStore))
+
+ val trustStorePassword = conf.getOption(s"$ns.trustStorePassword")
+ .orElse(defaults.flatMap(_.trustStorePassword))
+
+ val protocol = conf.getOption(s"$ns.protocol")
+ .orElse(defaults.flatMap(_.protocol))
+
+ val enabledAlgorithms = conf.getOption(s"$ns.enabledAlgorithms")
+ .map(_.split(",").map(_.trim).filter(_.nonEmpty).toSet)
+ .orElse(defaults.map(_.enabledAlgorithms))
+ .getOrElse(Set.empty)
+
+ new SSLOptions(
+ enabled,
+ keyStore,
+ keyStorePassword,
+ keyPassword,
+ trustStore,
+ trustStorePassword,
+ protocol,
+ enabledAlgorithms)
+ }
+
+}
+
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index ec82d09cd0..88d35a4bac 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -18,7 +18,11 @@
package org.apache.spark
import java.net.{Authenticator, PasswordAuthentication}
+import java.security.KeyStore
+import java.security.cert.X509Certificate
+import javax.net.ssl._
+import com.google.common.io.Files
import org.apache.hadoop.io.Text
import org.apache.spark.deploy.SparkHadoopUtil
@@ -55,7 +59,7 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* Spark also has a set of admin acls (`spark.admin.acls`) which is a set of users/administrators
* who always have permission to view or modify the Spark application.
*
- * Spark does not currently support encryption after authentication.
+ * Starting from version 1.3, Spark has partial support for encrypted connections with SSL.
*
* At this point spark has multiple communication protocols that need to be secured and
* different underlying mechanisms are used depending on the protocol:
@@ -67,8 +71,9 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* to connect to the server. There is no control of the underlying
* authentication mechanism so its not clear if the password is passed in
* plaintext or uses DIGEST-MD5 or some other mechanism.
- * Akka also has an option to turn on SSL, this option is not currently supported
- * but we could add a configuration option in the future.
+ *
+ * Akka also has an option to turn on SSL, this option is currently supported (see
+ * the details below).
*
* - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
* for the HttpServer. Jetty supports multiple authentication mechanisms -
@@ -77,8 +82,9 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* to authenticate using DIGEST-MD5 via a single user and the shared secret.
* Since we are using DIGEST-MD5, the shared secret is not passed on the wire
* in plaintext.
- * We currently do not support SSL (https), but Jetty can be configured to use it
- * so we could add a configuration option for this in the future.
+ *
+ * We currently support SSL (https) for this communication protocol (see the details
+ * below).
*
* The Spark HttpServer installs the HashLoginServer and configures it to DIGEST-MD5.
* Any clients must specify the user and password. There is a default
@@ -142,9 +148,40 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* authentication. Spark will then use that user to compare against the view acls to do
* authorization. If not filter is in place the user is generally null and no authorization
* can take place.
+ *
+ * Connection encryption (SSL) configuration is organized hierarchically. The user can configure
+ * the default SSL settings which will be used for all the supported communication protocols unless
+ * they are overwritten by protocol specific settings. This way the user can easily provide the
+ * common settings for all the protocols without disabling the ability to configure each one
+ * individually.
+ *
+ * All the SSL settings like `spark.ssl.xxx` where `xxx` is a particular configuration property,
+ * denote the global configuration for all the supported protocols. In order to override the global
+ * configuration for the particular protocol, the properties must be overwritten in the
+ * protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global
+ * configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for
+ * Akka based connections or `fs` for broadcast and file server.
+ *
+ * Refer to [[org.apache.spark.SSLOptions]] documentation for the list of
+ * options that can be specified.
+ *
+ * SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions
+ * object parses Spark configuration at a given namespace and builds the common representation
+ * of SSL settings. SSLOptions is then used to provide protocol-specific configuration like
+ * TypeSafe configuration for Akka or SSLContextFactory for Jetty.
+ *
+ * SSL must be configured on each node and configured for each component involved in
+ * communication using the particular protocol. In YARN clusters, the key-store can be prepared on
+ * the client side then distributed and used by the executors as the part of the application
+ * (YARN allows the user to deploy files before the application is started).
+ * In standalone deployment, the user needs to provide key-stores and configuration
+ * options for master and workers. In this mode, the user may allow the executors to use the SSL
+ * settings inherited from the worker which spawned that executor. It can be accomplished by
+ * setting `spark.ssl.useNodeLocalConf` to `true`.
*/
-private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder {
+private[spark] class SecurityManager(sparkConf: SparkConf)
+ extends Logging with SecretKeyHolder {
// key used to store the spark secret in the Hadoop UGI
private val sparkSecretLookupKey = "sparkCookie"
@@ -196,6 +233,57 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with
)
}
+ // the default SSL configuration - it will be used by all communication layers unless overwritten
+ private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)
+
+ // SSL configuration for different communication layers - they can override the default
+ // configuration at a specified namespace. The namespace *must* start with spark.ssl.
+ val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions))
+ val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions))
+
+ logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions")
+ logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions")
+
+ val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
+ val trustStoreManagers =
+ for (trustStore <- fileServerSSLOptions.trustStore) yield {
+ val input = Files.asByteSource(fileServerSSLOptions.trustStore.get).openStream()
+
+ try {
+ val ks = KeyStore.getInstance(KeyStore.getDefaultType)
+ ks.load(input, fileServerSSLOptions.trustStorePassword.get.toCharArray)
+
+ val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
+ tmf.init(ks)
+ tmf.getTrustManagers
+ } finally {
+ input.close()
+ }
+ }
+
+ lazy val credulousTrustStoreManagers = Array({
+ logWarning("Using 'accept-all' trust manager for SSL connections.")
+ new X509TrustManager {
+ override def getAcceptedIssuers: Array[X509Certificate] = null
+
+ override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {}
+
+ override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {}
+ }: TrustManager
+ })
+
+ val sslContext = SSLContext.getInstance(fileServerSSLOptions.protocol.getOrElse("Default"))
+ sslContext.init(null, trustStoreManagers.getOrElse(credulousTrustStoreManagers), null)
+
+ val hostVerifier = new HostnameVerifier {
+ override def verify(s: String, sslSession: SSLSession): Boolean = true
+ }
+
+ (Some(sslContext.getSocketFactory), Some(hostVerifier))
+ } else {
+ (None, None)
+ }
+
/**
* Split a comma separated String, filter out any empty items, and return a Set of strings
*/
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 4d4c69d42d..13aa9960ac 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -370,6 +370,7 @@ private[spark] object SparkConf {
isAkkaConf(name) ||
name.startsWith("spark.akka") ||
name.startsWith("spark.auth") ||
+ name.startsWith("spark.ssl") ||
isSparkPortConf(name)
}
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index ea98051532..1444c0dd3d 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -199,6 +199,7 @@ private[broadcast] object HttpBroadcast extends Logging {
uc = new URL(url).openConnection()
uc.setConnectTimeout(httpReadTimeout)
}
+ Utils.setupSecureURLConnection(uc, securityManager)
val in = {
uc.setReadTimeout(httpReadTimeout)
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 65a1a8fd7e..ae55b4ff40 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -28,5 +28,14 @@ private[spark] class ApplicationDescription(
val user = System.getProperty("user.name", "<unknown>")
+ def copy(
+ name: String = name,
+ maxCores: Option[Int] = maxCores,
+ memoryPerSlave: Int = memoryPerSlave,
+ command: Command = command,
+ appUiUrl: String = appUiUrl,
+ eventLogDir: Option[String] = eventLogDir): ApplicationDescription =
+ new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir)
+
override def toString: String = "ApplicationDescription(" + name + ")"
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 7c1c831c24..38b3da0b13 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -39,7 +39,8 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
val timeout = AkkaUtils.askTimeout(conf)
override def preStart() = {
- masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
+ masterActor = context.actorSelection(
+ Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(context.system)))
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
@@ -161,7 +162,7 @@ object Client {
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
- Master.toAkkaUrl(driverArgs.master)
+ Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(actorSystem))
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
actorSystem.awaitTermination()
diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
index 58c95dc4f9..b056a19ce6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
@@ -25,5 +25,13 @@ private[spark] class DriverDescription(
val command: Command)
extends Serializable {
+ def copy(
+ jarUrl: String = jarUrl,
+ mem: Int = mem,
+ cores: Int = cores,
+ supervise: Boolean = supervise,
+ command: Command = command): DriverDescription =
+ new DriverDescription(jarUrl, mem, cores, supervise, command)
+
override def toString: String = s"DriverDescription (${command.mainClass})"
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 39a7b0319b..ffe940fbda 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -47,7 +47,7 @@ private[spark] class AppClient(
conf: SparkConf)
extends Logging {
- val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)
+ val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
@@ -107,8 +107,9 @@ private[spark] class AppClient(
def changeMaster(url: String) {
// activeMasterUrl is a valid Spark url since we receive it from master.
activeMasterUrl = url
- master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
- masterAddress = Master.toAkkaAddress(activeMasterUrl)
+ master = context.actorSelection(
+ Master.toAkkaUrl(activeMasterUrl, AkkaUtils.protocol(actorSystem)))
+ masterAddress = Master.toAkkaAddress(activeMasterUrl, AkkaUtils.protocol(actorSystem))
}
private def isPossibleMaster(remoteUrl: Address) = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index d92d99310a..5eeb9fe526 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -860,9 +860,9 @@ private[spark] object Master extends Logging {
*
* @throws SparkException if the url is invalid
*/
- def toAkkaUrl(sparkUrl: String): String = {
+ def toAkkaUrl(sparkUrl: String, protocol: String): String = {
val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
- "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
+ AkkaUtils.address(protocol, systemName, host, port, actorName)
}
/**
@@ -870,9 +870,9 @@ private[spark] object Master extends Logging {
*
* @throws SparkException if the url is invalid
*/
- def toAkkaAddress(sparkUrl: String): Address = {
+ def toAkkaAddress(sparkUrl: String, protocol: String): Address = {
val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
- Address("akka.tcp", systemName, host, port)
+ Address(protocol, systemName, host, port)
}
def startSystemAndActor(
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index acbdf0d8bd..bc9f78b9e5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -26,7 +26,7 @@ import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
import org.apache.spark.{SparkConf, Logging}
-import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
+import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import org.apache.spark.util.logging.FileAppender
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 1359983012..b20f5c0c82 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -31,8 +31,8 @@ import scala.util.Random
import akka.actor._
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
-import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
@@ -93,7 +93,12 @@ private[spark] class Worker(
var masterAddress: Address = null
var activeMasterUrl: String = ""
var activeMasterWebUiUrl : String = ""
- val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName)
+ val akkaUrl = AkkaUtils.address(
+ AkkaUtils.protocol(context.system),
+ actorSystemName,
+ host,
+ port,
+ actorName)
@volatile var registered = false
@volatile var connected = false
val workerId = generateWorkerId()
@@ -174,8 +179,9 @@ private[spark] class Worker(
// activeMasterUrl it's a valid Spark url since we receive it from master.
activeMasterUrl = url
activeMasterWebUiUrl = uiUrl
- master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
- masterAddress = Master.toAkkaAddress(activeMasterUrl)
+ master = context.actorSelection(
+ Master.toAkkaUrl(activeMasterUrl, AkkaUtils.protocol(context.system)))
+ masterAddress = Master.toAkkaAddress(activeMasterUrl, AkkaUtils.protocol(context.system))
connected = true
// Cancel any outstanding re-registration attempts because we found a new master
registrationRetryTimer.foreach(_.cancel())
@@ -347,10 +353,20 @@ private[spark] class Worker(
}.toSeq
}
appDirectories(appId) = appLocalDirs
-
- val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs,
- ExecutorState.LOADING)
+ val manager = new ExecutorRunner(
+ appId,
+ execId,
+ appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
+ cores_,
+ memory_,
+ self,
+ workerId,
+ host,
+ sparkHome,
+ executorDir,
+ akkaUrl,
+ conf,
+ appLocalDirs, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
@@ -406,7 +422,14 @@ private[spark] class Worker(
case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
- val driver = new DriverRunner(conf, driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
+ val driver = new DriverRunner(
+ conf,
+ driverId,
+ workDir,
+ sparkHome,
+ driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
+ self,
+ akkaUrl)
drivers(driverId) = driver
driver.start()
@@ -523,10 +546,32 @@ private[spark] object Worker extends Logging {
val securityMgr = new SecurityManager(conf)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
conf = conf, securityManager = securityMgr)
- val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)
+ val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
(actorSystem, boundPort)
}
+ private[spark] def isUseLocalNodeSSLConfig(cmd: Command): Boolean = {
+ val pattern = """\-Dspark\.ssl\.useNodeLocalConf\=(.+)""".r
+ val result = cmd.javaOpts.collectFirst {
+ case pattern(_result) => _result.toBoolean
+ }
+ result.getOrElse(false)
+ }
+
+ private[spark] def maybeUpdateSSLSettings(cmd: Command, conf: SparkConf): Command = {
+ val prefix = "spark.ssl."
+ val useNLC = "spark.ssl.useNodeLocalConf"
+ if (isUseLocalNodeSSLConfig(cmd)) {
+ val newJavaOpts = cmd.javaOpts
+ .filter(opt => !opt.startsWith(s"-D$prefix")) ++
+ conf.getAll.collect { case (key, value) if key.startsWith(prefix) => s"-D$key=$value" } :+
+ s"-D$useNLC=true"
+ cmd.copy(javaOpts = newJavaOpts)
+ } else {
+ cmd
+ }
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 8238253026..bc72c89703 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -123,7 +123,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val executorConf = new SparkConf
val port = executorConf.getInt("spark.executor.port", 0)
val (fetcher, _) = AkkaUtils.createActorSystem(
- "driverPropsFetcher", hostname, port, executorConf, new SecurityManager(executorConf))
+ "driverPropsFetcher",
+ hostname,
+ port,
+ executorConf,
+ new SecurityManager(executorConf))
val driver = fetcher.actorSelection(driverUrl)
val timeout = AkkaUtils.askTimeout(executorConf)
val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
@@ -132,7 +136,15 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
fetcher.shutdown()
// Create SparkEnv using properties we fetched from the driver.
- val driverConf = new SparkConf().setAll(props)
+ val driverConf = new SparkConf()
+ for ((key, value) <- props) {
+ // this is required for SSL in standalone mode
+ if (SparkConf.isExecutorStartupConf(key)) {
+ driverConf.setIfMissing(key, value)
+ } else {
+ driverConf.set(key, value)
+ }
+ }
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, isLocal = false)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index ee10aa061f..06786a5952 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.{Logging, SparkContext, SparkEnv}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.AkkaUtils
private[spark] class SimrSchedulerBackend(
scheduler: TaskSchedulerImpl,
@@ -38,7 +39,8 @@ private[spark] class SimrSchedulerBackend(
override def start() {
super.start()
- val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ val driverUrl = AkkaUtils.address(
+ AkkaUtils.protocol(actorSystem),
SparkEnv.driverActorSystemName,
sc.conf.get("spark.driver.host"),
sc.conf.get("spark.driver.port"),
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 7eb87a564d..d2e1680a5f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -21,7 +21,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class SparkDeploySchedulerBackend(
scheduler: TaskSchedulerImpl,
@@ -46,7 +46,8 @@ private[spark] class SparkDeploySchedulerBackend(
super.start()
// The endpoint for executors to talk to us
- val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ val driverUrl = AkkaUtils.address(
+ AkkaUtils.protocol(actorSystem),
SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 5289661eb8..0d1c2a916c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -31,7 +31,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTas
import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Utils, AkkaUtils}
/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -143,7 +143,8 @@ private[spark] class CoarseMesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
- val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ val driverUrl = AkkaUtils.address(
+ AkkaUtils.protocol(sc.env.actorSystem),
SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 4c9b1e3c46..3d9c6192ff 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.Await
import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.util.Try
import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
import akka.pattern.ask
@@ -91,8 +92,11 @@ private[spark] object AkkaUtils extends Logging {
val secureCookie = if (isAuthOn) secretKey else ""
logDebug(s"In createActorSystem, requireCookie is: $requireCookie")
- val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
- ConfigFactory.parseString(
+ val akkaSslConfig = securityManager.akkaSSLOptions.createAkkaConfig
+ .getOrElse(ConfigFactory.empty())
+
+ val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String])
+ .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString(
s"""
|akka.daemonic = on
|akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
@@ -214,7 +218,7 @@ private[spark] object AkkaUtils extends Logging {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
- val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name"
+ val url = address(protocol(actorSystem), driverActorSystemName, driverHost, driverPort, name)
val timeout = AkkaUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
@@ -228,9 +232,33 @@ private[spark] object AkkaUtils extends Logging {
actorSystem: ActorSystem): ActorRef = {
val executorActorSystemName = SparkEnv.executorActorSystemName
Utils.checkHost(host, "Expected hostname")
- val url = s"akka.tcp://$executorActorSystemName@$host:$port/user/$name"
+ val url = address(protocol(actorSystem), executorActorSystemName, host, port, name)
val timeout = AkkaUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
+
+ def protocol(actorSystem: ActorSystem): String = {
+ val akkaConf = actorSystem.settings.config
+ val sslProp = "akka.remote.netty.tcp.enable-ssl"
+ protocol(akkaConf.hasPath(sslProp) && akkaConf.getBoolean(sslProp))
+ }
+
+ def protocol(ssl: Boolean = false): String = {
+ if (ssl) {
+ "akka.ssl.tcp"
+ } else {
+ "akka.tcp"
+ }
+ }
+
+ def address(
+ protocol: String,
+ systemName: String,
+ host: String,
+ port: Any,
+ actorName: String): String = {
+ s"$protocol://$systemName@$host:$port/user/$actorName"
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 31850b50bd..e9f2aed9ff 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -21,8 +21,9 @@ import java.io._
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
-import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
-import java.util.{Locale, Properties, Random, UUID}
+import java.util.{Properties, Locale, Random, UUID}
+import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
+import javax.net.ssl.HttpsURLConnection
import scala.collection.JavaConversions._
import scala.collection.Map
@@ -575,6 +576,7 @@ private[spark] object Utils extends Logging {
logDebug("fetchFile not using security")
uc = new URL(url).openConnection()
}
+ Utils.setupSecureURLConnection(uc, securityMgr)
val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000
uc.setConnectTimeout(timeout)
@@ -1820,6 +1822,20 @@ private[spark] object Utils extends Logging {
PropertyConfigurator.configure(pro)
}
+ /**
+ * If the given URL connection is HttpsURLConnection, it sets the SSL socket factory and
+ * the host verifier from the given security manager.
+ */
+ def setupSecureURLConnection(urlConnection: URLConnection, sm: SecurityManager): URLConnection = {
+ urlConnection match {
+ case https: HttpsURLConnection =>
+ sm.sslSocketFactory.foreach(https.setSSLSocketFactory)
+ sm.hostnameVerifier.foreach(https.setHostnameVerifier)
+ https
+ case connection => connection
+ }
+ }
+
def invoke(
clazz: Class[_],
obj: AnyRef,
diff --git a/core/src/test/resources/keystore b/core/src/test/resources/keystore
new file mode 100644
index 0000000000..f8310e39ba
--- /dev/null
+++ b/core/src/test/resources/keystore
Binary files differ
diff --git a/core/src/test/resources/truststore b/core/src/test/resources/truststore
new file mode 100644
index 0000000000..a6b1d46e1f
--- /dev/null
+++ b/core/src/test/resources/truststore
Binary files differ
diff --git a/core/src/test/resources/untrusted-keystore b/core/src/test/resources/untrusted-keystore
new file mode 100644
index 0000000000..6015b02caa
--- /dev/null
+++ b/core/src/test/resources/untrusted-keystore
Binary files differ
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 0f49ce4754..5fdf6bc277 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -18,13 +18,19 @@
package org.apache.spark
import java.io._
+import java.net.URI
import java.util.jar.{JarEntry, JarOutputStream}
+import javax.net.ssl.SSLHandshakeException
import com.google.common.io.ByteStreams
+import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.commons.lang3.RandomUtils
import org.scalatest.FunSuite
import org.apache.spark.util.Utils
+import SSLSampleConfigs._
+
class FileServerSuite extends FunSuite with LocalSparkContext {
@transient var tmpDir: File = _
@@ -168,4 +174,88 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}
}
+ test ("HttpFileServer should work with SSL") {
+ val sparkConf = sparkSSLConfig()
+ val sm = new SecurityManager(sparkConf)
+ val server = new HttpFileServer(sparkConf, sm, 0)
+ try {
+ server.initialize()
+
+ fileTransferTest(server, sm)
+ } finally {
+ server.stop()
+ }
+ }
+
+ test ("HttpFileServer should work with SSL and good credentials") {
+ val sparkConf = sparkSSLConfig()
+ sparkConf.set("spark.authenticate", "true")
+ sparkConf.set("spark.authenticate.secret", "good")
+
+ val sm = new SecurityManager(sparkConf)
+ val server = new HttpFileServer(sparkConf, sm, 0)
+ try {
+ server.initialize()
+
+ fileTransferTest(server, sm)
+ } finally {
+ server.stop()
+ }
+ }
+
+ test ("HttpFileServer should not work with valid SSL and bad credentials") {
+ val sparkConf = sparkSSLConfig()
+ sparkConf.set("spark.authenticate", "true")
+ sparkConf.set("spark.authenticate.secret", "bad")
+
+ val sm = new SecurityManager(sparkConf)
+ val server = new HttpFileServer(sparkConf, sm, 0)
+ try {
+ server.initialize()
+
+ intercept[IOException] {
+ fileTransferTest(server)
+ }
+ } finally {
+ server.stop()
+ }
+ }
+
+ test ("HttpFileServer should not work with SSL when the server is untrusted") {
+ val sparkConf = sparkSSLConfigUntrusted()
+ val sm = new SecurityManager(sparkConf)
+ val server = new HttpFileServer(sparkConf, sm, 0)
+ try {
+ server.initialize()
+
+ intercept[SSLHandshakeException] {
+ fileTransferTest(server)
+ }
+ } finally {
+ server.stop()
+ }
+ }
+
+ def fileTransferTest(server: HttpFileServer, sm: SecurityManager = null): Unit = {
+ val randomContent = RandomUtils.nextBytes(100)
+ val file = File.createTempFile("FileServerSuite", "sslTests", tmpDir)
+ FileUtils.writeByteArrayToFile(file, randomContent)
+ server.addFile(file)
+
+ val uri = new URI(server.serverUri + "/files/" + file.getName)
+
+ val connection = if (sm != null && sm.isAuthenticationEnabled()) {
+ Utils.constructURIForAuthentication(uri, sm).toURL.openConnection()
+ } else {
+ uri.toURL.openConnection()
+ }
+
+ if (sm != null) {
+ Utils.setupSecureURLConnection(connection, sm)
+ }
+
+ val buf = IOUtils.toByteArray(connection.getInputStream)
+ assert(buf === randomContent)
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index d27880f4bc..ccfe0678cb 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -120,7 +120,7 @@ class MapOutputTrackerSuite extends FunSuite {
securityManager = new SecurityManager(conf))
val slaveTracker = new MapOutputTrackerWorker(conf)
val selection = slaveSystem.actorSelection(
- s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+ AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
val timeout = AkkaUtils.lookupTimeout(conf)
slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
diff --git a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
new file mode 100644
index 0000000000..444a33371b
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.File
+
+import com.google.common.io.Files
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
+
+ test("test resolving property file as spark conf ") {
+ val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
+ val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+
+ val conf = new SparkConf
+ conf.set("spark.ssl.enabled", "true")
+ conf.set("spark.ssl.keyStore", keyStorePath)
+ conf.set("spark.ssl.keyStorePassword", "password")
+ conf.set("spark.ssl.keyPassword", "password")
+ conf.set("spark.ssl.trustStore", trustStorePath)
+ conf.set("spark.ssl.trustStorePassword", "password")
+ conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
+ conf.set("spark.ssl.protocol", "SSLv3")
+
+ val opts = SSLOptions.parse(conf, "spark.ssl")
+
+ assert(opts.enabled === true)
+ assert(opts.trustStore.isDefined === true)
+ assert(opts.trustStore.get.getName === "truststore")
+ assert(opts.trustStore.get.getAbsolutePath === trustStorePath)
+ assert(opts.keyStore.isDefined === true)
+ assert(opts.keyStore.get.getName === "keystore")
+ assert(opts.keyStore.get.getAbsolutePath === keyStorePath)
+ assert(opts.trustStorePassword === Some("password"))
+ assert(opts.keyStorePassword === Some("password"))
+ assert(opts.keyPassword === Some("password"))
+ assert(opts.protocol === Some("SSLv3"))
+ assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
+ }
+
+ test("test resolving property with defaults specified ") {
+ val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
+ val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+
+ val conf = new SparkConf
+ conf.set("spark.ssl.enabled", "true")
+ conf.set("spark.ssl.keyStore", keyStorePath)
+ conf.set("spark.ssl.keyStorePassword", "password")
+ conf.set("spark.ssl.keyPassword", "password")
+ conf.set("spark.ssl.trustStore", trustStorePath)
+ conf.set("spark.ssl.trustStorePassword", "password")
+ conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
+ conf.set("spark.ssl.protocol", "SSLv3")
+
+ val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None)
+ val opts = SSLOptions.parse(conf, "spark.ui.ssl", defaults = Some(defaultOpts))
+
+ assert(opts.enabled === true)
+ assert(opts.trustStore.isDefined === true)
+ assert(opts.trustStore.get.getName === "truststore")
+ assert(opts.trustStore.get.getAbsolutePath === trustStorePath)
+ assert(opts.keyStore.isDefined === true)
+ assert(opts.keyStore.get.getName === "keystore")
+ assert(opts.keyStore.get.getAbsolutePath === keyStorePath)
+ assert(opts.trustStorePassword === Some("password"))
+ assert(opts.keyStorePassword === Some("password"))
+ assert(opts.keyPassword === Some("password"))
+ assert(opts.protocol === Some("SSLv3"))
+ assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
+ }
+
+ test("test whether defaults can be overridden ") {
+ val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
+ val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+
+ val conf = new SparkConf
+ conf.set("spark.ssl.enabled", "true")
+ conf.set("spark.ui.ssl.enabled", "false")
+ conf.set("spark.ssl.keyStore", keyStorePath)
+ conf.set("spark.ssl.keyStorePassword", "password")
+ conf.set("spark.ui.ssl.keyStorePassword", "12345")
+ conf.set("spark.ssl.keyPassword", "password")
+ conf.set("spark.ssl.trustStore", trustStorePath)
+ conf.set("spark.ssl.trustStorePassword", "password")
+ conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
+ conf.set("spark.ui.ssl.enabledAlgorithms", "ABC, DEF")
+ conf.set("spark.ssl.protocol", "SSLv3")
+
+ val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None)
+ val opts = SSLOptions.parse(conf, "spark.ui.ssl", defaults = Some(defaultOpts))
+
+ assert(opts.enabled === false)
+ assert(opts.trustStore.isDefined === true)
+ assert(opts.trustStore.get.getName === "truststore")
+ assert(opts.trustStore.get.getAbsolutePath === trustStorePath)
+ assert(opts.keyStore.isDefined === true)
+ assert(opts.keyStore.get.getName === "keystore")
+ assert(opts.keyStore.get.getAbsolutePath === keyStorePath)
+ assert(opts.trustStorePassword === Some("password"))
+ assert(opts.keyStorePassword === Some("12345"))
+ assert(opts.keyPassword === Some("password"))
+ assert(opts.protocol === Some("SSLv3"))
+ assert(opts.enabledAlgorithms === Set("ABC", "DEF"))
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala b/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
new file mode 100644
index 0000000000..ace8123a89
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.File
+
+object SSLSampleConfigs {
+ val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
+ val untrustedKeyStorePath = new File(this.getClass.getResource("/untrusted-keystore").toURI).getAbsolutePath
+ val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+
+ def sparkSSLConfig() = {
+ val conf = new SparkConf(loadDefaults = false)
+ conf.set("spark.ssl.enabled", "true")
+ conf.set("spark.ssl.keyStore", keyStorePath)
+ conf.set("spark.ssl.keyStorePassword", "password")
+ conf.set("spark.ssl.keyPassword", "password")
+ conf.set("spark.ssl.trustStore", trustStorePath)
+ conf.set("spark.ssl.trustStorePassword", "password")
+ conf.set("spark.ssl.enabledAlgorithms",
+ "TLS_RSA_WITH_AES_128_CBC_SHA, SSL_RSA_WITH_DES_CBC_SHA")
+ conf.set("spark.ssl.protocol", "TLSv1")
+ conf
+ }
+
+ def sparkSSLConfigUntrusted() = {
+ val conf = new SparkConf(loadDefaults = false)
+ conf.set("spark.ssl.enabled", "true")
+ conf.set("spark.ssl.keyStore", untrustedKeyStorePath)
+ conf.set("spark.ssl.keyStorePassword", "password")
+ conf.set("spark.ssl.keyPassword", "password")
+ conf.set("spark.ssl.trustStore", trustStorePath)
+ conf.set("spark.ssl.trustStorePassword", "password")
+ conf.set("spark.ssl.enabledAlgorithms",
+ "TLS_RSA_WITH_AES_128_CBC_SHA, SSL_RSA_WITH_DES_CBC_SHA")
+ conf.set("spark.ssl.protocol", "TLSv1")
+ conf
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
index fcca0867b8..43fbd3ff3f 100644
--- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark
-import scala.collection.mutable.ArrayBuffer
+import java.io.File
import org.scalatest.FunSuite
@@ -125,6 +125,54 @@ class SecurityManagerSuite extends FunSuite {
}
+ test("ssl on setup") {
+ val conf = SSLSampleConfigs.sparkSSLConfig()
+
+ val securityManager = new SecurityManager(conf)
+
+ assert(securityManager.fileServerSSLOptions.enabled === true)
+ assert(securityManager.akkaSSLOptions.enabled === true)
+
+ assert(securityManager.sslSocketFactory.isDefined === true)
+ assert(securityManager.hostnameVerifier.isDefined === true)
+
+ assert(securityManager.fileServerSSLOptions.trustStore.isDefined === true)
+ assert(securityManager.fileServerSSLOptions.trustStore.get.getName === "truststore")
+ assert(securityManager.fileServerSSLOptions.keyStore.isDefined === true)
+ assert(securityManager.fileServerSSLOptions.keyStore.get.getName === "keystore")
+ assert(securityManager.fileServerSSLOptions.trustStorePassword === Some("password"))
+ assert(securityManager.fileServerSSLOptions.keyStorePassword === Some("password"))
+ assert(securityManager.fileServerSSLOptions.keyPassword === Some("password"))
+ assert(securityManager.fileServerSSLOptions.protocol === Some("TLSv1"))
+ assert(securityManager.fileServerSSLOptions.enabledAlgorithms ===
+ Set("TLS_RSA_WITH_AES_128_CBC_SHA", "SSL_RSA_WITH_DES_CBC_SHA"))
+
+ assert(securityManager.akkaSSLOptions.trustStore.isDefined === true)
+ assert(securityManager.akkaSSLOptions.trustStore.get.getName === "truststore")
+ assert(securityManager.akkaSSLOptions.keyStore.isDefined === true)
+ assert(securityManager.akkaSSLOptions.keyStore.get.getName === "keystore")
+ assert(securityManager.akkaSSLOptions.trustStorePassword === Some("password"))
+ assert(securityManager.akkaSSLOptions.keyStorePassword === Some("password"))
+ assert(securityManager.akkaSSLOptions.keyPassword === Some("password"))
+ assert(securityManager.akkaSSLOptions.protocol === Some("TLSv1"))
+ assert(securityManager.akkaSSLOptions.enabledAlgorithms ===
+ Set("TLS_RSA_WITH_AES_128_CBC_SHA", "SSL_RSA_WITH_DES_CBC_SHA"))
+ }
+
+ test("ssl off setup") {
+ val file = File.createTempFile("SSLOptionsSuite", "conf")
+ file.deleteOnExit()
+
+ System.setProperty("spark.ssl.configFile", file.getAbsolutePath)
+ val conf = new SparkConf()
+
+ val securityManager = new SecurityManager(conf)
+
+ assert(securityManager.fileServerSSLOptions.enabled === false)
+ assert(securityManager.akkaSSLOptions.enabled === false)
+ assert(securityManager.sslSocketFactory.isDefined === false)
+ assert(securityManager.hostnameVerifier.isDefined === false)
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala
index d2dae34be7..518073dcbb 100644
--- a/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.Matchers
class ClientSuite extends FunSuite with Matchers {
test("correctly validates driver jar URL's") {
ClientArguments.isValidJarUrl("http://someHost:8080/foo.jar") should be (true)
+ ClientArguments.isValidJarUrl("https://someHost:8080/foo.jar") should be (true)
// file scheme with authority and path is valid.
ClientArguments.isValidJarUrl("file://somehost/path/to/a/jarFile.jar") should be (true)
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 3d2335f9b3..34c74d87f0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -20,30 +20,46 @@ package org.apache.spark.deploy.master
import akka.actor.Address
import org.scalatest.FunSuite
-import org.apache.spark.SparkException
+import org.apache.spark.{SSLOptions, SparkConf, SparkException}
class MasterSuite extends FunSuite {
test("toAkkaUrl") {
- val akkaUrl = Master.toAkkaUrl("spark://1.2.3.4:1234")
+ val conf = new SparkConf(loadDefaults = false)
+ val akkaUrl = Master.toAkkaUrl("spark://1.2.3.4:1234", "akka.tcp")
assert("akka.tcp://sparkMaster@1.2.3.4:1234/user/Master" === akkaUrl)
}
+ test("toAkkaUrl with SSL") {
+ val conf = new SparkConf(loadDefaults = false)
+ val akkaUrl = Master.toAkkaUrl("spark://1.2.3.4:1234", "akka.ssl.tcp")
+ assert("akka.ssl.tcp://sparkMaster@1.2.3.4:1234/user/Master" === akkaUrl)
+ }
+
test("toAkkaUrl: a typo url") {
+ val conf = new SparkConf(loadDefaults = false)
val e = intercept[SparkException] {
- Master.toAkkaUrl("spark://1.2. 3.4:1234")
+ Master.toAkkaUrl("spark://1.2. 3.4:1234", "akka.tcp")
}
assert("Invalid master URL: spark://1.2. 3.4:1234" === e.getMessage)
}
test("toAkkaAddress") {
- val address = Master.toAkkaAddress("spark://1.2.3.4:1234")
+ val conf = new SparkConf(loadDefaults = false)
+ val address = Master.toAkkaAddress("spark://1.2.3.4:1234", "akka.tcp")
assert(Address("akka.tcp", "sparkMaster", "1.2.3.4", 1234) === address)
}
+ test("toAkkaAddress with SSL") {
+ val conf = new SparkConf(loadDefaults = false)
+ val address = Master.toAkkaAddress("spark://1.2.3.4:1234", "akka.ssl.tcp")
+ assert(Address("akka.ssl.tcp", "sparkMaster", "1.2.3.4", 1234) === address)
+ }
+
test("toAkkaAddress: a typo url") {
+ val conf = new SparkConf(loadDefaults = false)
val e = intercept[SparkException] {
- Master.toAkkaAddress("spark://1.2. 3.4:1234")
+ Master.toAkkaAddress("spark://1.2. 3.4:1234", "akka.tcp")
}
assert("Invalid master URL: spark://1.2. 3.4:1234" === e.getMessage)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
new file mode 100644
index 0000000000..84e2fd7ad9
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.worker
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.Command
+
+import org.scalatest.{Matchers, FunSuite}
+
+class WorkerSuite extends FunSuite with Matchers {
+
+ def cmd(javaOpts: String*) = Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq(javaOpts:_*))
+ def conf(opts: (String, String)*) = new SparkConf(loadDefaults = false).setAll(opts)
+
+ test("test isUseLocalNodeSSLConfig") {
+ Worker.isUseLocalNodeSSLConfig(cmd("-Dasdf=dfgh")) shouldBe false
+ Worker.isUseLocalNodeSSLConfig(cmd("-Dspark.ssl.useNodeLocalConf=true")) shouldBe true
+ Worker.isUseLocalNodeSSLConfig(cmd("-Dspark.ssl.useNodeLocalConf=false")) shouldBe false
+ Worker.isUseLocalNodeSSLConfig(cmd("-Dspark.ssl.useNodeLocalConf=")) shouldBe false
+ }
+
+ test("test maybeUpdateSSLSettings") {
+ Worker.maybeUpdateSSLSettings(
+ cmd("-Dasdf=dfgh", "-Dspark.ssl.opt1=x"),
+ conf("spark.ssl.opt1" -> "y", "spark.ssl.opt2" -> "z"))
+ .javaOpts should contain theSameElementsInOrderAs Seq(
+ "-Dasdf=dfgh", "-Dspark.ssl.opt1=x")
+
+ Worker.maybeUpdateSSLSettings(
+ cmd("-Dspark.ssl.useNodeLocalConf=false", "-Dspark.ssl.opt1=x"),
+ conf("spark.ssl.opt1" -> "y", "spark.ssl.opt2" -> "z"))
+ .javaOpts should contain theSameElementsInOrderAs Seq(
+ "-Dspark.ssl.useNodeLocalConf=false", "-Dspark.ssl.opt1=x")
+
+ Worker.maybeUpdateSSLSettings(
+ cmd("-Dspark.ssl.useNodeLocalConf=true", "-Dspark.ssl.opt1=x"),
+ conf("spark.ssl.opt1" -> "y", "spark.ssl.opt2" -> "z"))
+ .javaOpts should contain theSameElementsAs Seq(
+ "-Dspark.ssl.useNodeLocalConf=true", "-Dspark.ssl.opt1=y", "-Dspark.ssl.opt2=z")
+
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
index 6bbf72e929..39e5d367d6 100644
--- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.util
+import java.util.concurrent.TimeoutException
+
import scala.concurrent.Await
import akka.actor._
@@ -26,6 +28,7 @@ import org.scalatest.FunSuite
import org.apache.spark._
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.SSLSampleConfigs._
/**
@@ -47,7 +50,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
val masterTracker = new MapOutputTrackerMaster(conf)
masterTracker.trackerActor = actorSystem.actorOf(
- Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+ Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
val badconf = new SparkConf
badconf.set("spark.authenticate", "true")
@@ -60,7 +63,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
conf = conf, securityManager = securityManagerBad)
val slaveTracker = new MapOutputTrackerWorker(conf)
val selection = slaveSystem.actorSelection(
- s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+ AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
val timeout = AkkaUtils.lookupTimeout(conf)
intercept[akka.actor.ActorNotFound] {
slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
@@ -74,7 +77,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
val conf = new SparkConf
conf.set("spark.authenticate", "false")
conf.set("spark.authenticate.secret", "bad")
- val securityManager = new SecurityManager(conf);
+ val securityManager = new SecurityManager(conf)
val hostname = "localhost"
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
@@ -85,18 +88,18 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
val masterTracker = new MapOutputTrackerMaster(conf)
masterTracker.trackerActor = actorSystem.actorOf(
- Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+ Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
val badconf = new SparkConf
badconf.set("spark.authenticate", "false")
badconf.set("spark.authenticate.secret", "good")
- val securityManagerBad = new SecurityManager(badconf);
+ val securityManagerBad = new SecurityManager(badconf)
val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
conf = badconf, securityManager = securityManagerBad)
val slaveTracker = new MapOutputTrackerWorker(conf)
val selection = slaveSystem.actorSelection(
- s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+ AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
val timeout = AkkaUtils.lookupTimeout(conf)
slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
@@ -124,7 +127,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
val conf = new SparkConf
conf.set("spark.authenticate", "true")
conf.set("spark.authenticate.secret", "good")
- val securityManager = new SecurityManager(conf);
+ val securityManager = new SecurityManager(conf)
val hostname = "localhost"
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
@@ -135,12 +138,12 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
val masterTracker = new MapOutputTrackerMaster(conf)
masterTracker.trackerActor = actorSystem.actorOf(
- Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+ Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
val goodconf = new SparkConf
goodconf.set("spark.authenticate", "true")
goodconf.set("spark.authenticate.secret", "good")
- val securityManagerGood = new SecurityManager(goodconf);
+ val securityManagerGood = new SecurityManager(goodconf)
assert(securityManagerGood.isAuthenticationEnabled() === true)
@@ -148,7 +151,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
conf = goodconf, securityManager = securityManagerGood)
val slaveTracker = new MapOutputTrackerWorker(conf)
val selection = slaveSystem.actorSelection(
- s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+ AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
val timeout = AkkaUtils.lookupTimeout(conf)
slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
@@ -175,7 +178,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
conf.set("spark.authenticate", "true")
conf.set("spark.authenticate.secret", "good")
- val securityManager = new SecurityManager(conf);
+ val securityManager = new SecurityManager(conf)
val hostname = "localhost"
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
@@ -186,12 +189,12 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
val masterTracker = new MapOutputTrackerMaster(conf)
masterTracker.trackerActor = actorSystem.actorOf(
- Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+ Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
val badconf = new SparkConf
badconf.set("spark.authenticate", "false")
badconf.set("spark.authenticate.secret", "bad")
- val securityManagerBad = new SecurityManager(badconf);
+ val securityManagerBad = new SecurityManager(badconf)
assert(securityManagerBad.isAuthenticationEnabled() === false)
@@ -199,7 +202,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
conf = badconf, securityManager = securityManagerBad)
val slaveTracker = new MapOutputTrackerWorker(conf)
val selection = slaveSystem.actorSelection(
- s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+ AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
val timeout = AkkaUtils.lookupTimeout(conf)
intercept[akka.actor.ActorNotFound] {
slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
@@ -209,4 +212,170 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
slaveSystem.shutdown()
}
+ test("remote fetch ssl on") {
+ val conf = sparkSSLConfig()
+ val securityManager = new SecurityManager(conf)
+
+ val hostname = "localhost"
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+ conf = conf, securityManager = securityManager)
+ System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+
+ assert(securityManager.isAuthenticationEnabled() === false)
+
+ val masterTracker = new MapOutputTrackerMaster(conf)
+ masterTracker.trackerActor = actorSystem.actorOf(
+ Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+
+ val slaveConf = sparkSSLConfig()
+ val securityManagerBad = new SecurityManager(slaveConf)
+
+ val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+ conf = slaveConf, securityManager = securityManagerBad)
+ val slaveTracker = new MapOutputTrackerWorker(conf)
+ val selection = slaveSystem.actorSelection(
+ AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
+ val timeout = AkkaUtils.lookupTimeout(conf)
+ slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+
+ assert(securityManagerBad.isAuthenticationEnabled() === false)
+
+ masterTracker.registerShuffle(10, 1)
+ masterTracker.incrementEpoch()
+ slaveTracker.updateEpoch(masterTracker.getEpoch)
+
+ val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+ masterTracker.registerMapOutput(10, 0,
+ MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L)))
+ masterTracker.incrementEpoch()
+ slaveTracker.updateEpoch(masterTracker.getEpoch)
+
+ // this should succeed since security off
+ assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
+ Seq((BlockManagerId("a", "hostA", 1000), size1000)))
+
+ actorSystem.shutdown()
+ slaveSystem.shutdown()
+ }
+
+
+ test("remote fetch ssl on and security enabled") {
+ val conf = sparkSSLConfig()
+ conf.set("spark.authenticate", "true")
+ conf.set("spark.authenticate.secret", "good")
+ val securityManager = new SecurityManager(conf)
+
+ val hostname = "localhost"
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+ conf = conf, securityManager = securityManager)
+ System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+
+ assert(securityManager.isAuthenticationEnabled() === true)
+
+ val masterTracker = new MapOutputTrackerMaster(conf)
+ masterTracker.trackerActor = actorSystem.actorOf(
+ Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+
+ val slaveConf = sparkSSLConfig()
+ slaveConf.set("spark.authenticate", "true")
+ slaveConf.set("spark.authenticate.secret", "good")
+ val securityManagerBad = new SecurityManager(slaveConf)
+
+ val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+ conf = slaveConf, securityManager = securityManagerBad)
+ val slaveTracker = new MapOutputTrackerWorker(conf)
+ val selection = slaveSystem.actorSelection(
+ AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
+ val timeout = AkkaUtils.lookupTimeout(conf)
+ slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+
+ assert(securityManagerBad.isAuthenticationEnabled() === true)
+
+ masterTracker.registerShuffle(10, 1)
+ masterTracker.incrementEpoch()
+ slaveTracker.updateEpoch(masterTracker.getEpoch)
+
+ val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+ masterTracker.registerMapOutput(10, 0,
+ MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L)))
+ masterTracker.incrementEpoch()
+ slaveTracker.updateEpoch(masterTracker.getEpoch)
+
+ assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
+ Seq((BlockManagerId("a", "hostA", 1000), size1000)))
+
+ actorSystem.shutdown()
+ slaveSystem.shutdown()
+ }
+
+
+ test("remote fetch ssl on and security enabled - bad credentials") {
+ val conf = sparkSSLConfig()
+ conf.set("spark.authenticate", "true")
+ conf.set("spark.authenticate.secret", "good")
+ val securityManager = new SecurityManager(conf)
+
+ val hostname = "localhost"
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+ conf = conf, securityManager = securityManager)
+ System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+
+ assert(securityManager.isAuthenticationEnabled() === true)
+
+ val masterTracker = new MapOutputTrackerMaster(conf)
+ masterTracker.trackerActor = actorSystem.actorOf(
+ Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+
+ val slaveConf = sparkSSLConfig()
+ slaveConf.set("spark.authenticate", "true")
+ slaveConf.set("spark.authenticate.secret", "bad")
+ val securityManagerBad = new SecurityManager(slaveConf)
+
+ val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+ conf = slaveConf, securityManager = securityManagerBad)
+ val slaveTracker = new MapOutputTrackerWorker(conf)
+ val selection = slaveSystem.actorSelection(
+ AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
+ val timeout = AkkaUtils.lookupTimeout(conf)
+ intercept[akka.actor.ActorNotFound] {
+ slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+ }
+
+ actorSystem.shutdown()
+ slaveSystem.shutdown()
+ }
+
+
+ test("remote fetch ssl on - untrusted server") {
+ val conf = sparkSSLConfigUntrusted()
+ val securityManager = new SecurityManager(conf)
+
+ val hostname = "localhost"
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+ conf = conf, securityManager = securityManager)
+ System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+
+ assert(securityManager.isAuthenticationEnabled() === false)
+
+ val masterTracker = new MapOutputTrackerMaster(conf)
+ masterTracker.trackerActor = actorSystem.actorOf(
+ Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+
+ val slaveConf = sparkSSLConfig()
+ val securityManagerBad = new SecurityManager(slaveConf)
+
+ val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+ conf = slaveConf, securityManager = securityManagerBad)
+ val slaveTracker = new MapOutputTrackerWorker(conf)
+ val selection = slaveSystem.actorSelection(
+ AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
+ val timeout = AkkaUtils.lookupTimeout(conf)
+ intercept[TimeoutException] {
+ slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+ }
+
+ actorSystem.shutdown()
+ slaveSystem.shutdown()
+ }
+
}
diff --git a/docs/configuration.md b/docs/configuration.md
index 08c6befaf3..62d3fca937 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1242,6 +1242,86 @@ Apart from these, the following properties are also available, and may be useful
</tr>
</table>
+#### Encryption
+
+<table class="table">
+ <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+ <tr>
+ <td><code>spark.ssl.enabled</code></td>
+ <td>false</td>
+ <td>
+ <p>Whether to enable SSL connections on all supported protocols.</p>
+
+ <p>All the SSL settings like <code>spark.ssl.xxx</code> where <code>xxx</code> is a
+ particular configuration property, denote the global configuration for all the supported
+ protocols. In order to override the global configuration for the particular protocol,
+ the properties must be overwritten in the protocol-specific namespace.</p>
+
+ <p>Use <code>spark.ssl.YYY.XXX</code> settings to overwrite the global configuration for
+ particular protocol denoted by <code>YYY</code>. Currently <code>YYY</code> can be
+ either <code>akka</code> for Akka based connections or <code>fs</code> for broadcast and
+ file server.</p>
+ </td>
+ </tr>
+ <tr>
+ <td><code>spark.ssl.keyStore</code></td>
+ <td>None</td>
+ <td>
+ A path to a key-store file. The path can be absolute or relative to the directory where
+ the component is started in.
+ </td>
+ </tr>
+ <tr>
+ <td><code>spark.ssl.keyStorePassword</code></td>
+ <td>None</td>
+ <td>
+ A password to the key-store.
+ </td>
+ </tr>
+ <tr>
+ <td><code>spark.ssl.keyPassword</code></td>
+ <td>None</td>
+ <td>
+ A password to the private key in key-store.
+ </td>
+ </tr>
+ <tr>
+ <td><code>spark.ssl.trustStore</code></td>
+ <td>None</td>
+ <td>
+ A path to a trust-store file. The path can be absolute or relative to the directory
+ where the component is started in.
+ </td>
+ </tr>
+ <tr>
+ <td><code>spark.ssl.trustStorePassword</code></td>
+ <td>None</td>
+ <td>
+ A password to the trust-store.
+ </td>
+ </tr>
+ <tr>
+ <td><code>spark.ssl.protocol</code></td>
+ <td>None</td>
+ <td>
+ A protocol name. The protocol must be supported by JVM. The reference list of protocols
+ one can find on <a href="https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https">this</a>
+ page.
+ </td>
+ </tr>
+ <tr>
+ <td><code>spark.ssl.enabledAlgorithms</code></td>
+ <td>Empty</td>
+ <td>
+ A comma separated list of ciphers. The specified ciphers must be supported by JVM.
+ The reference list of protocols one can find on
+ <a href="https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https">this</a>
+ page.
+ </td>
+ </tr>
+</table>
+
+
#### Spark Streaming
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
diff --git a/docs/security.md b/docs/security.md
index 1e206a139f..6e0a54fbc4 100644
--- a/docs/security.md
+++ b/docs/security.md
@@ -20,6 +20,30 @@ Spark allows for a set of administrators to be specified in the acls who always
If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) should be manually created and have the proper permissions set on it. If you want those log files secured, the permissions should be set to `drwxrwxrwxt` for that directory. The owner of the directory should be the super user who is running the history server and the group permissions should be restricted to super user group. This will allow all users to write to the directory but will prevent unprivileged users from removing or renaming a file unless they own the file or directory. The event log files will be created by Spark with permissions such that only the user and group have read and write access.
+## Encryption
+
+Spark supports SSL for Akka and HTTP (for broadcast and file server) protocols. However SSL is not supported yet for WebUI and block transfer service.
+
+Connection encryption (SSL) configuration is organized hierarchically. The user can configure the default SSL settings which will be used for all the supported communication protocols unless they are overwritten by protocol-specific settings. This way the user can easily provide the common settings for all the protocols without disabling the ability to configure each one individually. The common SSL settings are at `spark.ssl` namespace in Spark configuration, while Akka SSL configuration is at `spark.ssl.akka` and HTTP for broadcast and file server SSL configuration is at `spark.ssl.fs`. The full breakdown can be found on the [configuration page](configuration.html).
+
+SSL must be configured on each node and configured for each component involved in communication using the particular protocol.
+
+### YARN mode
+The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark.
+
+### Standalone mode
+The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors.
+
+### Preparing the key-stores
+Key-stores can be generated by `keytool` program. The reference documentation for this tool is
+[here](https://docs.oracle.com/javase/7/docs/technotes/tools/solaris/keytool.html). The most basic
+steps to configure the key-stores and the trust-store for the standalone deployment mode is as
+follows:
+* Generate a keys pair for each node
+* Export the public key of the key pair to a file on each node
+* Import all exported public keys into a single trust-store
+* Distribute the trust-store over the nodes
+
## Configuring Ports for Network Security
Spark makes heavy use of the network, and some environments have strict requirements for using tight
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index b46df12da8..9805609120 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -45,7 +45,7 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
// Hadoop FileSystem object for our URI, if it isn't using HTTP
var fileSystem: FileSystem = {
- if (uri.getScheme() == "http") {
+ if (Set("http", "https", "ftp").contains(uri.getScheme)) {
null
} else {
FileSystem.get(uri, SparkHadoopUtil.get.newConfiguration(conf))
@@ -78,13 +78,16 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
if (fileSystem != null) {
fileSystem.open(new Path(directory, pathInDirectory))
} else {
- if (SparkEnv.get.securityManager.isAuthenticationEnabled()) {
+ val url = if (SparkEnv.get.securityManager.isAuthenticationEnabled()) {
val uri = new URI(classUri + "/" + urlEncode(pathInDirectory))
val newuri = Utils.constructURIForAuthentication(uri, SparkEnv.get.securityManager)
- newuri.toURL().openStream()
+ newuri.toURL
} else {
- new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream()
+ new URL(classUri + "/" + urlEncode(pathInDirectory))
}
+
+ Utils.setupSecureURLConnection(url.openConnection(), SparkEnv.get.securityManager)
+ .getInputStream
}
}
val bytes = readAndTransformClass(name, inputStream)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 716cf2c7f3..7d29ed88cf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -67,8 +67,12 @@ private[streaming] class ReceiverSupervisorImpl(
private val trackerActor = {
val ip = env.conf.get("spark.driver.host", "localhost")
val port = env.conf.getInt("spark.driver.port", 7077)
- val url = "akka.tcp://%s@%s:%s/user/ReceiverTracker".format(
- SparkEnv.driverActorSystemName, ip, port)
+ val url = AkkaUtils.address(
+ AkkaUtils.protocol(env.actorSystem),
+ SparkEnv.driverActorSystemName,
+ ip,
+ port,
+ "ReceiverTracker")
env.actorSystem.actorSelection(url)
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index eb328b2b8a..37e98e01fd 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -244,7 +244,9 @@ private[spark] class ApplicationMaster(
host: String,
port: String,
isDriver: Boolean): Unit = {
- val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+
+ val driverUrl = AkkaUtils.address(
+ AkkaUtils.protocol(actorSystem),
SparkEnv.driverActorSystemName,
host,
port,
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 040406c150..0dbb6154b3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -34,9 +34,10 @@ import org.apache.hadoop.yarn.util.RackResolver
import org.apache.log4j.{Level, Logger}
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.AkkaUtils
/**
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
@@ -106,7 +107,9 @@ private[yarn] class YarnAllocator(
new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
launcherPool.allowCoreThreadTimeOut(true)
- private val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
+ private val driverUrl = AkkaUtils.address(
+ AkkaUtils.protocol(securityMgr.akkaSSLOptions.enabled),
+ SparkEnv.driverActorSystemName,
sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)