diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2015-09-02 12:53:24 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2015-09-02 12:53:24 -0700 |
commit | 2da3a9e98e5d129d4507b5db01bba5ee9558d28e (patch) | |
tree | c5197f543f18959d793db1caea4ee553acef4f97 /core | |
parent | fc48307797912dc1d53893dce741ddda8630957b (diff) | |
download | spark-2da3a9e98e5d129d4507b5db01bba5ee9558d28e.tar.gz spark-2da3a9e98e5d129d4507b5db01bba5ee9558d28e.tar.bz2 spark-2da3a9e98e5d129d4507b5db01bba5ee9558d28e.zip |
[SPARK-10004] [SHUFFLE] Perform auth checks when clients read shuffle data.
To correctly isolate applications, when requests to read shuffle data
arrive at the shuffle service, proper authorization checks need to
be performed. This change makes sure that only the application that
created the shuffle data can read from it.
Such checks are only enabled when "spark.authenticate" is enabled,
otherwise there's no secure way to make sure that the client is really
who it says it is.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #8218 from vanzin/SPARK-10004.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala | 3 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala | 2 |
2 files changed, 3 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala index 7c170a742f..76968249fb 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala @@ -38,6 +38,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel} * is equivalent to one Spark-level shuffle block. */ class NettyBlockRpcServer( + appId: String, serializer: Serializer, blockManager: BlockDataManager) extends RpcHandler with Logging { @@ -55,7 +56,7 @@ class NettyBlockRpcServer( case openBlocks: OpenBlocks => val blocks: Seq[ManagedBuffer] = openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData) - val streamId = streamManager.registerStream(blocks.iterator.asJava) + val streamId = streamManager.registerStream(appId, blocks.iterator.asJava) logTrace(s"Registered streamId $streamId with ${blocks.size} buffers") responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteArray) diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index ff8aae9ebe..d5ad2c9ad0 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -49,7 +49,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage private[this] var appId: String = _ override def init(blockDataManager: BlockDataManager): Unit = { - val rpcHandler = new NettyBlockRpcServer(serializer, blockDataManager) + val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager) var serverBootstrap: Option[TransportServerBootstrap] = None var clientBootstrap: Option[TransportClientBootstrap] = None if (authEnabled) { |