aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-05-01 19:01:46 -0700
committerReynold Xin <rxin@databricks.com>2015-05-01 19:01:46 -0700
commit38d4e9e446b425ca6a8fe8d8080f387b08683842 (patch)
tree091ca906d1281ce5f95772f1748ed79bd44ca655 /core/src/main/scala/org
parent8f50a07d2188ccc5315d979755188b1e5d5b5471 (diff)
downloadspark-38d4e9e446b425ca6a8fe8d8080f387b08683842.tar.gz
spark-38d4e9e446b425ca6a8fe8d8080f387b08683842.tar.bz2
spark-38d4e9e446b425ca6a8fe8d8080f387b08683842.zip
[SPARK-6229] Add SASL encryption to network library.
There are two main parts of this change: - Extending the bootstrap mechanism in the network library to add a server-side bootstrap (which works a little bit differently than the client-side bootstrap), and to allow the bootstraps to modify the underlying channel. - Use SASL to encrypt data going through the RPC channel. The second item requires some non-optimal code to be able to work around the fact that the outbound path in netty is not thread-safe, and ordering is very important when encryption is in the picture. A lot of the changes outside the network/common library are just to adjust to the changed API for initializing the RPC server. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #5377 from vanzin/SPARK-6229 and squashes the following commits: ff01966 [Marcelo Vanzin] Use fancy new size config style. be53f32 [Marcelo Vanzin] Merge branch 'master' into SPARK-6229 47d4aff [Marcelo Vanzin] Merge branch 'master' into SPARK-6229 7a2a805 [Marcelo Vanzin] Clean up some unneeded changes. 2f92237 [Marcelo Vanzin] Add comment. 67bb0c6 [Marcelo Vanzin] Revert "Avoid exposing ByteArrayWritableChannel outside of test code." 065f684 [Marcelo Vanzin] Add test to verify chunking. 3d1695d [Marcelo Vanzin] Minor cleanups. 73cff0e [Marcelo Vanzin] Skip bytes in decode path too. 318ad23 [Marcelo Vanzin] Avoid exposing ByteArrayWritableChannel outside of test code. 346f829 [Marcelo Vanzin] Avoid trip through channel selector by not reporting 0 bytes written. a4a5938 [Marcelo Vanzin] Review feedback. 4797519 [Marcelo Vanzin] Remove unused import. 9908ada [Marcelo Vanzin] Fix test, SASL backend disposal. 7fe1489 [Marcelo Vanzin] Add a test that makes sure encryption is actually enabled. adb6f9d [Marcelo Vanzin] Review feedback. cf2a605 [Marcelo Vanzin] Clean up some code. 8584323 [Marcelo Vanzin] Fix a comment. e98bc55 [Marcelo Vanzin] Add option to only allow encrypted connections to the server. dad42fc [Marcelo Vanzin] Make encryption thread-safe, less memory-intensive. b00999a [Marcelo Vanzin] Consolidate ByteArrayWritableChannel, fix SASL code to match master changes. b923cae [Marcelo Vanzin] Make SASL encryption handler thread-safe, handle FileRegion messages. 39539a7 [Marcelo Vanzin] Add config option to enable SASL encryption. 351a86f [Marcelo Vanzin] Add SASL encryption to network library. fbe6ccb [Marcelo Vanzin] Add TransportServerBootstrap, make SASL code use it.
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/SecurityManager.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala3
5 files changed, 41 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 3653f724ba..8aed1e20e0 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -150,8 +150,13 @@ import org.apache.spark.util.Utils
* authorization. If not filter is in place the user is generally null and no authorization
* can take place.
*
- * Connection encryption (SSL) configuration is organized hierarchically. The user can configure
- * the default SSL settings which will be used for all the supported communication protocols unless
+ * When authentication is being used, encryption can also be enabled by setting the option
+ * spark.authenticate.enableSaslEncryption to true. This is only supported by communication
+ * channels that use the network-common library, and can be used as an alternative to SSL in those
+ * cases.
+ *
+ * SSL can be used for encryption for certain communication channels. The user can configure the
+ * default SSL settings which will be used for all the supported communication protocols unless
* they are overwritten by protocol specific settings. This way the user can easily provide the
* common settings for all the protocols without disabling the ability to configure each one
* individually.
@@ -413,6 +418,14 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
def isAuthenticationEnabled(): Boolean = authOn
/**
+ * Checks whether SASL encryption should be enabled.
+ * @return Whether to enable SASL encryption when connecting to services that support it.
+ */
+ def isSaslEncryptionEnabled(): Boolean = {
+ sparkConf.getBoolean("spark.authenticate.enableSaslEncryption", false)
+ }
+
+ /**
* Gets the user used for authenticating HTTP connections.
* For now use a single hardcoded user.
* @return the HTTP user as a String
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index cd16f992a3..09973a0a2c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -19,10 +19,12 @@ package org.apache.spark.deploy
import java.util.concurrent.CountDownLatch
+import scala.collection.JavaConversions._
+
import org.apache.spark.{Logging, SparkConf, SecurityManager}
import org.apache.spark.network.TransportContext
import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.network.sasl.SaslRpcHandler
+import org.apache.spark.network.sasl.SaslServerBootstrap
import org.apache.spark.network.server.TransportServer
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
import org.apache.spark.util.Utils
@@ -44,10 +46,7 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
- private val transportContext: TransportContext = {
- val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler
- new TransportContext(transportConf, handler)
- }
+ private val transportContext: TransportContext = new TransportContext(transportConf, blockHandler)
private var server: TransportServer = _
@@ -62,7 +61,13 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
def start() {
require(server == null, "Shuffle server already started")
logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
- server = transportContext.createServer(port)
+ val bootstraps =
+ if (useSasl) {
+ Seq(new SaslServerBootstrap(transportConf, securityManager))
+ } else {
+ Nil
+ }
+ server = transportContext.createServer(port, bootstraps)
}
def stop() {
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 3f0950dae1..6181c0ee9f 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -24,7 +24,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.network._
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.network.client.{TransportClientBootstrap, RpcResponseCallback, TransportClientFactory}
-import org.apache.spark.network.sasl.{SaslRpcHandler, SaslClientBootstrap}
+import org.apache.spark.network.sasl.{SaslClientBootstrap, SaslServerBootstrap}
import org.apache.spark.network.server._
import org.apache.spark.network.shuffle.{RetryingBlockFetcher, BlockFetchingListener, OneForOneBlockFetcher}
import org.apache.spark.network.shuffle.protocol.UploadBlock
@@ -49,18 +49,18 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
private[this] var appId: String = _
override def init(blockDataManager: BlockDataManager): Unit = {
- val (rpcHandler: RpcHandler, bootstrap: Option[TransportClientBootstrap]) = {
- val nettyRpcHandler = new NettyBlockRpcServer(serializer, blockDataManager)
- if (!authEnabled) {
- (nettyRpcHandler, None)
- } else {
- (new SaslRpcHandler(nettyRpcHandler, securityManager),
- Some(new SaslClientBootstrap(transportConf, conf.getAppId, securityManager)))
- }
+ val rpcHandler = new NettyBlockRpcServer(serializer, blockDataManager)
+ var serverBootstrap: Option[TransportServerBootstrap] = None
+ var clientBootstrap: Option[TransportClientBootstrap] = None
+ if (authEnabled) {
+ serverBootstrap = Some(new SaslServerBootstrap(transportConf, securityManager))
+ clientBootstrap = Some(new SaslClientBootstrap(transportConf, conf.getAppId, securityManager,
+ securityManager.isSaslEncryptionEnabled()))
}
transportContext = new TransportContext(transportConf, rpcHandler)
- clientFactory = transportContext.createClientFactory(bootstrap.toList)
- server = transportContext.createServer(conf.getInt("spark.blockManager.port", 0))
+ clientFactory = transportContext.createClientFactory(clientBootstrap.toList)
+ server = transportContext.createServer(conf.getInt("spark.blockManager.port", 0),
+ serverBootstrap.toList)
appId = conf.getAppId
logInfo("Server created on " + server.getPort)
}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index 16e905982c..497871ed6d 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -656,7 +656,7 @@ private[nio] class ConnectionManager(
connection.synchronized {
if (connection.sparkSaslServer == null) {
logDebug("Creating sasl Server")
- connection.sparkSaslServer = new SparkSaslServer(conf.getAppId, securityManager)
+ connection.sparkSaslServer = new SparkSaslServer(conf.getAppId, securityManager, false)
}
}
replyToken = connection.sparkSaslServer.response(securityMsg.getToken)
@@ -800,7 +800,7 @@ private[nio] class ConnectionManager(
if (!conn.isSaslComplete()) {
conn.synchronized {
if (conn.sparkSaslClient == null) {
- conn.sparkSaslClient = new SparkSaslClient(conf.getAppId, securityManager)
+ conn.sparkSaslClient = new SparkSaslClient(conf.getAppId, securityManager, false)
var firstResponse: Array[Byte] = null
try {
firstResponse = conn.sparkSaslClient.firstToken()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 402ee1c764..a46fecd227 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -111,7 +111,8 @@ private[spark] class BlockManager(
// standard BlockTransferService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores)
- new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled())
+ new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
+ securityManager.isSaslEncryptionEnabled())
} else {
blockTransferService
}