aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-09-02 12:53:24 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2015-09-02 12:53:24 -0700
commit2da3a9e98e5d129d4507b5db01bba5ee9558d28e (patch)
treec5197f543f18959d793db1caea4ee553acef4f97 /core
parentfc48307797912dc1d53893dce741ddda8630957b (diff)
downloadspark-2da3a9e98e5d129d4507b5db01bba5ee9558d28e.tar.gz
spark-2da3a9e98e5d129d4507b5db01bba5ee9558d28e.tar.bz2
spark-2da3a9e98e5d129d4507b5db01bba5ee9558d28e.zip
[SPARK-10004] [SHUFFLE] Perform auth checks when clients read shuffle data.
To correctly isolate applications, when requests to read shuffle data arrive at the shuffle service, proper authorization checks need to be performed. This change makes sure that only the application that created the shuffle data can read from it. Such checks are only enabled when "spark.authenticate" is enabled, otherwise there's no secure way to make sure that the client is really who it says it is. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #8218 from vanzin/SPARK-10004.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala2
2 files changed, 3 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index 7c170a742f..76968249fb 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -38,6 +38,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel}
* is equivalent to one Spark-level shuffle block.
*/
class NettyBlockRpcServer(
+ appId: String,
serializer: Serializer,
blockManager: BlockDataManager)
extends RpcHandler with Logging {
@@ -55,7 +56,7 @@ class NettyBlockRpcServer(
case openBlocks: OpenBlocks =>
val blocks: Seq[ManagedBuffer] =
openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
- val streamId = streamManager.registerStream(blocks.iterator.asJava)
+ val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteArray)
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index ff8aae9ebe..d5ad2c9ad0 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -49,7 +49,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
private[this] var appId: String = _
override def init(blockDataManager: BlockDataManager): Unit = {
- val rpcHandler = new NettyBlockRpcServer(serializer, blockDataManager)
+ val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager)
var serverBootstrap: Option[TransportServerBootstrap] = None
var clientBootstrap: Option[TransportClientBootstrap] = None
if (authEnabled) {