aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2017-01-24 10:44:04 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-01-24 10:44:04 -0800
commit8f3f73abc1fe62496722476460c174af0250e3fe (patch)
tree345e96eab2294792a867d6009cc9209d6ec0b27f /core/src/main/scala/org
parentd9783380ff0a6440117348dee3205826d0f9687e (diff)
downloadspark-8f3f73abc1fe62496722476460c174af0250e3fe.tar.gz
spark-8f3f73abc1fe62496722476460c174af0250e3fe.tar.bz2
spark-8f3f73abc1fe62496722476460c174af0250e3fe.zip
[SPARK-19139][CORE] New auth mechanism for transport library.
This change introduces a new auth mechanism to the transport library, to be used when users enable strong encryption. This auth mechanism has better security than the currently used DIGEST-MD5. The new protocol uses symmetric key encryption to mutually authenticate the endpoints, and is very loosely based on ISO/IEC 9798. The new protocol falls back to SASL when it thinks the remote end is old. Because SASL does not support asking the server for multiple auth protocols, which would mean we could re-use the existing SASL code by just adding a new SASL provider, the protocol is implemented outside of the SASL API to avoid the boilerplate of adding a new provider. Details of the auth protocol are discussed in the included README.md file. This change partly undos the changes added in SPARK-13331; AES encryption is now decoupled from SASL authentication. The encryption code itself, though, has been re-used as part of this change. ## How was this patch tested? - Unit tests - Tested Spark 2.2 against Spark 1.6 shuffle service with SASL enabled - Tested Spark 2.2 against Spark 2.2 shuffle service with SASL fallback disabled Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #16521 from vanzin/SPARK-19139.
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/SecurityManager.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala3
8 files changed, 41 insertions, 21 deletions
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 9bdc5096b6..cde768281f 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.io.Text
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.network.sasl.SecretKeyHolder
import org.apache.spark.util.Utils
@@ -191,7 +192,7 @@ private[spark] class SecurityManager(
// allow all users/groups to have view/modify permissions
private val WILDCARD_ACL = "*"
- private val authOn = sparkConf.getBoolean(SecurityManager.SPARK_AUTH_CONF, false)
+ private val authOn = sparkConf.get(NETWORK_AUTH_ENABLED)
// keep spark.ui.acls.enable for backwards compatibility with 1.0
private var aclsOn =
sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false))
@@ -516,11 +517,11 @@ private[spark] class SecurityManager(
def isAuthenticationEnabled(): Boolean = authOn
/**
- * Checks whether SASL encryption should be enabled.
- * @return Whether to enable SASL encryption when connecting to services that support it.
+ * Checks whether network encryption should be enabled.
+ * @return Whether to enable encryption when connecting to services that support it.
*/
- def isSaslEncryptionEnabled(): Boolean = {
- sparkConf.getBoolean("spark.authenticate.enableSaslEncryption", false)
+ def isEncryptionEnabled(): Boolean = {
+ sparkConf.get(NETWORK_ENCRYPTION_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 601d24191e..308a1ed5fa 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -607,6 +607,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
"\"client\".")
}
}
+
+ val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED)
+ require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
+ s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
}
/**
@@ -726,6 +730,7 @@ private[spark] object SparkConf extends Logging {
(name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
name.startsWith("spark.ssl") ||
name.startsWith("spark.rpc") ||
+ name.startsWith("spark.network") ||
isSparkPortConf(name)
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 1296386ac9..539dbb55ee 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -235,7 +235,7 @@ object SparkEnv extends Logging {
val securityManager = new SecurityManager(conf, ioEncryptionKey)
ioEncryptionKey.foreach { _ =>
- if (!securityManager.isSaslEncryptionEnabled()) {
+ if (!securityManager.isEncryptionEnabled()) {
logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
"wire.")
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 13eadbe44f..8d491ddf6e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -25,8 +25,8 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.TransportContext
+import org.apache.spark.network.crypto.AuthServerBootstrap
import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.network.sasl.SaslServerBootstrap
import org.apache.spark.network.server.{TransportServer, TransportServerBootstrap}
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
import org.apache.spark.network.util.TransportConf
@@ -47,7 +47,6 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
- private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
private val transportConf =
SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
@@ -74,10 +73,11 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
/** Start the external shuffle service */
def start() {
require(server == null, "Shuffle server already started")
- logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
+ val authEnabled = securityManager.isAuthenticationEnabled()
+ logInfo(s"Starting shuffle service on port $port (auth enabled = $authEnabled)")
val bootstraps: Seq[TransportServerBootstrap] =
- if (useSasl) {
- Seq(new SaslServerBootstrap(transportConf, securityManager))
+ if (authEnabled) {
+ Seq(new AuthServerBootstrap(transportConf, securityManager))
} else {
Nil
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index aba429bcdc..536f493b41 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -243,4 +243,20 @@ package object config {
"and event logs.")
.stringConf
.createWithDefault("(?i)secret|password")
+
+ private[spark] val NETWORK_AUTH_ENABLED =
+ ConfigBuilder("spark.authenticate")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val SASL_ENCRYPTION_ENABLED =
+ ConfigBuilder("spark.authenticate.enableSaslEncryption")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val NETWORK_ENCRYPTION_ENABLED =
+ ConfigBuilder("spark.network.crypto.enabled")
+ .booleanConf
+ .createWithDefault(false)
+
}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 3d4ea3cccc..b75e91b660 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -27,7 +27,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.network._
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap, TransportClientFactory}
-import org.apache.spark.network.sasl.{SaslClientBootstrap, SaslServerBootstrap}
+import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap}
import org.apache.spark.network.server._
import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher, RetryingBlockFetcher}
import org.apache.spark.network.shuffle.protocol.UploadBlock
@@ -63,9 +63,8 @@ private[spark] class NettyBlockTransferService(
var serverBootstrap: Option[TransportServerBootstrap] = None
var clientBootstrap: Option[TransportClientBootstrap] = None
if (authEnabled) {
- serverBootstrap = Some(new SaslServerBootstrap(transportConf, securityManager))
- clientBootstrap = Some(new SaslClientBootstrap(transportConf, conf.getAppId, securityManager,
- securityManager.isSaslEncryptionEnabled()))
+ serverBootstrap = Some(new AuthServerBootstrap(transportConf, securityManager))
+ clientBootstrap = Some(new AuthClientBootstrap(transportConf, conf.getAppId, securityManager))
}
transportContext = new TransportContext(transportConf, rpcHandler)
clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava)
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index e56943da13..1e448b2f1a 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -33,8 +33,8 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.network.TransportContext
import org.apache.spark.network.client._
+import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap}
import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.network.sasl.{SaslClientBootstrap, SaslServerBootstrap}
import org.apache.spark.network.server._
import org.apache.spark.rpc._
import org.apache.spark.serializer.{JavaSerializer, JavaSerializerInstance}
@@ -60,8 +60,8 @@ private[netty] class NettyRpcEnv(
private def createClientBootstraps(): java.util.List[TransportClientBootstrap] = {
if (securityManager.isAuthenticationEnabled()) {
- java.util.Arrays.asList(new SaslClientBootstrap(transportConf, "", securityManager,
- securityManager.isSaslEncryptionEnabled()))
+ java.util.Arrays.asList(new AuthClientBootstrap(transportConf,
+ securityManager.getSaslUser(), securityManager))
} else {
java.util.Collections.emptyList[TransportClientBootstrap]
}
@@ -111,7 +111,7 @@ private[netty] class NettyRpcEnv(
def startServer(bindAddress: String, port: Int): Unit = {
val bootstraps: java.util.List[TransportServerBootstrap] =
if (securityManager.isAuthenticationEnabled()) {
- java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager))
+ java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
} else {
java.util.Collections.emptyList()
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 04521c9159..c40186756f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -125,8 +125,7 @@ private[spark] class BlockManager(
// standard BlockTransferService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
- new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
- securityManager.isSaslEncryptionEnabled())
+ new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled())
} else {
blockTransferService
}