aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-11-05 14:38:43 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-11-05 14:38:43 -0800
commit4c42986cc070d9c5c55c7bf8a2a67585967b1082 (patch)
tree0c20263f4d5b7cca3be13e3f9a160e2eb8014a63 /core/src
parent5b3b6f6f5f029164d7749366506e142b104c1d43 (diff)
downloadspark-4c42986cc070d9c5c55c7bf8a2a67585967b1082.tar.gz
spark-4c42986cc070d9c5c55c7bf8a2a67585967b1082.tar.bz2
spark-4c42986cc070d9c5c55c7bf8a2a67585967b1082.zip
[SPARK-4242] [Core] Add SASL to external shuffle service
Does three things: (1) Adds SASL to ExternalShuffleClient, (2) puts SecurityManager in BlockManager's constructor, and (3) adds unit test. Author: Aaron Davidson <aaron@databricks.com> Closes #3108 from aarondav/sasl-client and squashes the following commits: 48b622d [Aaron Davidson] Screw it, let's just get LimitedInputStream 3543b70 [Aaron Davidson] Back out of pom change due to unknown test issue? b58518a [Aaron Davidson] ByteStreams.limit() not available :( cbe451a [Aaron Davidson] Address comments 2bf2908 [Aaron Davidson] [SPARK-4242] [Core] Add SASL to external shuffle service
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala4
4 files changed, 12 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 45e9d7f243..e7454beddb 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -287,7 +287,7 @@ object SparkEnv extends Logging {
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
- serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)
+ serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 655d16c65c..a5fb87b9b2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -72,7 +72,8 @@ private[spark] class BlockManager(
val conf: SparkConf,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
- blockTransferService: BlockTransferService)
+ blockTransferService: BlockTransferService,
+ securityManager: SecurityManager)
extends BlockDataManager with Logging {
val diskBlockManager = new DiskBlockManager(this, conf)
@@ -115,7 +116,8 @@ private[spark] class BlockManager(
// Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTranserService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
- new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf))
+ new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), securityManager,
+ securityManager.isAuthenticationEnabled())
} else {
blockTransferService
}
@@ -166,9 +168,10 @@ private[spark] class BlockManager(
conf: SparkConf,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
- blockTransferService: BlockTransferService) = {
+ blockTransferService: BlockTransferService,
+ securityManager: SecurityManager) = {
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
- conf, mapOutputTracker, shuffleManager, blockTransferService)
+ conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
}
/**
@@ -219,7 +222,6 @@ private[spark] class BlockManager(
return
} catch {
case e: Exception if i < MAX_ATTEMPTS =>
- val attemptsRemaining =
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}}"
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
Thread.sleep(SLEEP_TIME_SECS * 1000)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 1461fa69db..f63e772bf1 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -62,7 +62,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NioBlockTransferService(conf, securityMgr)
val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
- mapOutputTracker, shuffleManager, transfer)
+ mapOutputTracker, shuffleManager, transfer, securityMgr)
store.initialize("app-id")
allStores += store
store
@@ -263,7 +263,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
when(failableTransfer.hostName).thenReturn("some-hostname")
when(failableTransfer.port).thenReturn(1000)
val failableStore = new BlockManager("failable-store", actorSystem, master, serializer,
- 10000, conf, mapOutputTracker, shuffleManager, failableTransfer)
+ 10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr)
failableStore.initialize("app-id")
allStores += failableStore // so that this gets stopped after test
assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId))
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 0782876c8e..9529502bc8 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -74,7 +74,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NioBlockTransferService(conf, securityMgr)
val manager = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
- mapOutputTracker, shuffleManager, transfer)
+ mapOutputTracker, shuffleManager, transfer, securityMgr)
manager.initialize("app-id")
manager
}
@@ -795,7 +795,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
// Use Java serializer so we can create an unserializable error.
val transfer = new NioBlockTransferService(conf, securityMgr)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, actorSystem, master,
- new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer)
+ new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr)
// The put should fail since a1 is not serializable.
class UnserializableClass