aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/SecurityManager.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala3
8 files changed, 41 insertions, 21 deletions
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 9bdc5096b6..cde768281f 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.io.Text
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.network.sasl.SecretKeyHolder
import org.apache.spark.util.Utils
@@ -191,7 +192,7 @@ private[spark] class SecurityManager(
// allow all users/groups to have view/modify permissions
private val WILDCARD_ACL = "*"
- private val authOn = sparkConf.getBoolean(SecurityManager.SPARK_AUTH_CONF, false)
+ private val authOn = sparkConf.get(NETWORK_AUTH_ENABLED)
// keep spark.ui.acls.enable for backwards compatibility with 1.0
private var aclsOn =
sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false))
@@ -516,11 +517,11 @@ private[spark] class SecurityManager(
def isAuthenticationEnabled(): Boolean = authOn
/**
- * Checks whether SASL encryption should be enabled.
- * @return Whether to enable SASL encryption when connecting to services that support it.
+ * Checks whether network encryption should be enabled.
+ * @return Whether to enable encryption when connecting to services that support it.
*/
- def isSaslEncryptionEnabled(): Boolean = {
- sparkConf.getBoolean("spark.authenticate.enableSaslEncryption", false)
+ def isEncryptionEnabled(): Boolean = {
+ sparkConf.get(NETWORK_ENCRYPTION_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 601d24191e..308a1ed5fa 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -607,6 +607,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
"\"client\".")
}
}
+
+ val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED)
+ require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
+ s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
}
/**
@@ -726,6 +730,7 @@ private[spark] object SparkConf extends Logging {
(name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
name.startsWith("spark.ssl") ||
name.startsWith("spark.rpc") ||
+ name.startsWith("spark.network") ||
isSparkPortConf(name)
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 1296386ac9..539dbb55ee 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -235,7 +235,7 @@ object SparkEnv extends Logging {
val securityManager = new SecurityManager(conf, ioEncryptionKey)
ioEncryptionKey.foreach { _ =>
- if (!securityManager.isSaslEncryptionEnabled()) {
+ if (!securityManager.isEncryptionEnabled()) {
logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
"wire.")
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 13eadbe44f..8d491ddf6e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -25,8 +25,8 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.TransportContext
+import org.apache.spark.network.crypto.AuthServerBootstrap
import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.network.sasl.SaslServerBootstrap
import org.apache.spark.network.server.{TransportServer, TransportServerBootstrap}
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
import org.apache.spark.network.util.TransportConf
@@ -47,7 +47,6 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
- private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
private val transportConf =
SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
@@ -74,10 +73,11 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
/** Start the external shuffle service */
def start() {
require(server == null, "Shuffle server already started")
- logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
+ val authEnabled = securityManager.isAuthenticationEnabled()
+ logInfo(s"Starting shuffle service on port $port (auth enabled = $authEnabled)")
val bootstraps: Seq[TransportServerBootstrap] =
- if (useSasl) {
- Seq(new SaslServerBootstrap(transportConf, securityManager))
+ if (authEnabled) {
+ Seq(new AuthServerBootstrap(transportConf, securityManager))
} else {
Nil
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index aba429bcdc..536f493b41 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -243,4 +243,20 @@ package object config {
"and event logs.")
.stringConf
.createWithDefault("(?i)secret|password")
+
+ private[spark] val NETWORK_AUTH_ENABLED =
+ ConfigBuilder("spark.authenticate")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val SASL_ENCRYPTION_ENABLED =
+ ConfigBuilder("spark.authenticate.enableSaslEncryption")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val NETWORK_ENCRYPTION_ENABLED =
+ ConfigBuilder("spark.network.crypto.enabled")
+ .booleanConf
+ .createWithDefault(false)
+
}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 3d4ea3cccc..b75e91b660 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -27,7 +27,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.network._
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap, TransportClientFactory}
-import org.apache.spark.network.sasl.{SaslClientBootstrap, SaslServerBootstrap}
+import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap}
import org.apache.spark.network.server._
import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher, RetryingBlockFetcher}
import org.apache.spark.network.shuffle.protocol.UploadBlock
@@ -63,9 +63,8 @@ private[spark] class NettyBlockTransferService(
var serverBootstrap: Option[TransportServerBootstrap] = None
var clientBootstrap: Option[TransportClientBootstrap] = None
if (authEnabled) {
- serverBootstrap = Some(new SaslServerBootstrap(transportConf, securityManager))
- clientBootstrap = Some(new SaslClientBootstrap(transportConf, conf.getAppId, securityManager,
- securityManager.isSaslEncryptionEnabled()))
+ serverBootstrap = Some(new AuthServerBootstrap(transportConf, securityManager))
+ clientBootstrap = Some(new AuthClientBootstrap(transportConf, conf.getAppId, securityManager))
}
transportContext = new TransportContext(transportConf, rpcHandler)
clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava)
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index e56943da13..1e448b2f1a 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -33,8 +33,8 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.network.TransportContext
import org.apache.spark.network.client._
+import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap}
import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.network.sasl.{SaslClientBootstrap, SaslServerBootstrap}
import org.apache.spark.network.server._
import org.apache.spark.rpc._
import org.apache.spark.serializer.{JavaSerializer, JavaSerializerInstance}
@@ -60,8 +60,8 @@ private[netty] class NettyRpcEnv(
private def createClientBootstraps(): java.util.List[TransportClientBootstrap] = {
if (securityManager.isAuthenticationEnabled()) {
- java.util.Arrays.asList(new SaslClientBootstrap(transportConf, "", securityManager,
- securityManager.isSaslEncryptionEnabled()))
+ java.util.Arrays.asList(new AuthClientBootstrap(transportConf,
+ securityManager.getSaslUser(), securityManager))
} else {
java.util.Collections.emptyList[TransportClientBootstrap]
}
@@ -111,7 +111,7 @@ private[netty] class NettyRpcEnv(
def startServer(bindAddress: String, port: Int): Unit = {
val bootstraps: java.util.List[TransportServerBootstrap] =
if (securityManager.isAuthenticationEnabled()) {
- java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager))
+ java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
} else {
java.util.Collections.emptyList()
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 04521c9159..c40186756f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -125,8 +125,7 @@ private[spark] class BlockManager(
// standard BlockTransferService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
- new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
- securityManager.isSaslEncryptionEnabled())
+ new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled())
} else {
blockTransferService
}