aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--LICENSE21
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala4
-rw-r--r--network/common/pom.xml1
-rw-r--r--network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java3
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java87
-rw-r--r--network/shuffle/pom.xml1
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java1
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java9
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java31
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java4
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java113
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala2
15 files changed, 272 insertions, 23 deletions
diff --git a/LICENSE b/LICENSE
index f1732fb47a..3c667bf450 100644
--- a/LICENSE
+++ b/LICENSE
@@ -754,7 +754,7 @@ SUCH DAMAGE.
========================================================================
-For Timsort (core/src/main/java/org/apache/spark/util/collection/Sorter.java):
+For Timsort (core/src/main/java/org/apache/spark/util/collection/TimSort.java):
========================================================================
Copyright (C) 2008 The Android Open Source Project
@@ -772,6 +772,25 @@ limitations under the License.
========================================================================
+For LimitedInputStream
+ (network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java):
+========================================================================
+Copyright (C) 2007 The Guava Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+========================================================================
BSD-style licenses
========================================================================
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 45e9d7f243..e7454beddb 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -287,7 +287,7 @@ object SparkEnv extends Logging {
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
- serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)
+ serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 655d16c65c..a5fb87b9b2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -72,7 +72,8 @@ private[spark] class BlockManager(
val conf: SparkConf,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
- blockTransferService: BlockTransferService)
+ blockTransferService: BlockTransferService,
+ securityManager: SecurityManager)
extends BlockDataManager with Logging {
val diskBlockManager = new DiskBlockManager(this, conf)
@@ -115,7 +116,8 @@ private[spark] class BlockManager(
// Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTranserService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
- new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf))
+ new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), securityManager,
+ securityManager.isAuthenticationEnabled())
} else {
blockTransferService
}
@@ -166,9 +168,10 @@ private[spark] class BlockManager(
conf: SparkConf,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
- blockTransferService: BlockTransferService) = {
+ blockTransferService: BlockTransferService,
+ securityManager: SecurityManager) = {
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
- conf, mapOutputTracker, shuffleManager, blockTransferService)
+ conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
}
/**
@@ -219,7 +222,6 @@ private[spark] class BlockManager(
return
} catch {
case e: Exception if i < MAX_ATTEMPTS =>
- val attemptsRemaining =
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}}"
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
Thread.sleep(SLEEP_TIME_SECS * 1000)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 1461fa69db..f63e772bf1 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -62,7 +62,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NioBlockTransferService(conf, securityMgr)
val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
- mapOutputTracker, shuffleManager, transfer)
+ mapOutputTracker, shuffleManager, transfer, securityMgr)
store.initialize("app-id")
allStores += store
store
@@ -263,7 +263,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
when(failableTransfer.hostName).thenReturn("some-hostname")
when(failableTransfer.port).thenReturn(1000)
val failableStore = new BlockManager("failable-store", actorSystem, master, serializer,
- 10000, conf, mapOutputTracker, shuffleManager, failableTransfer)
+ 10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr)
failableStore.initialize("app-id")
allStores += failableStore // so that this gets stopped after test
assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId))
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 0782876c8e..9529502bc8 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -74,7 +74,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NioBlockTransferService(conf, securityMgr)
val manager = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
- mapOutputTracker, shuffleManager, transfer)
+ mapOutputTracker, shuffleManager, transfer, securityMgr)
manager.initialize("app-id")
manager
}
@@ -795,7 +795,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
// Use Java serializer so we can create an unserializable error.
val transfer = new NioBlockTransferService(conf, securityMgr)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, actorSystem, master,
- new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer)
+ new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr)
// The put should fail since a1 is not serializable.
class UnserializableClass
diff --git a/network/common/pom.xml b/network/common/pom.xml
index ea887148d9..6144548a8f 100644
--- a/network/common/pom.xml
+++ b/network/common/pom.xml
@@ -50,6 +50,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
+ <version>11.0.2</version> <!-- yarn 2.4.0's version -->
<scope>provided</scope>
</dependency>
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
index 89ed79bc63..5fa1527ddf 100644
--- a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
+++ b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
@@ -30,6 +30,7 @@ import com.google.common.io.ByteStreams;
import io.netty.channel.DefaultFileRegion;
import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.LimitedInputStream;
/**
* A {@link ManagedBuffer} backed by a segment in a file.
@@ -101,7 +102,7 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer {
try {
is = new FileInputStream(file);
ByteStreams.skipFully(is, offset);
- return ByteStreams.limit(is, length);
+ return new LimitedInputStream(is, length);
} catch (IOException e) {
try {
if (is != null) {
diff --git a/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java b/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java
new file mode 100644
index 0000000000..63ca43c046
--- /dev/null
+++ b/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.util;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Wraps a {@link InputStream}, limiting the number of bytes which can be read.
+ *
+ * This code is from Guava's 14.0 source code, because there is no compatible way to
+ * use this functionality in both a Guava 11 environment and a Guava >14 environment.
+ */
+public final class LimitedInputStream extends FilterInputStream {
+ private long left;
+ private long mark = -1;
+
+ public LimitedInputStream(InputStream in, long limit) {
+ super(in);
+ Preconditions.checkNotNull(in);
+ Preconditions.checkArgument(limit >= 0, "limit must be non-negative");
+ left = limit;
+ }
+ @Override public int available() throws IOException {
+ return (int) Math.min(in.available(), left);
+ }
+ // it's okay to mark even if mark isn't supported, as reset won't work
+ @Override public synchronized void mark(int readLimit) {
+ in.mark(readLimit);
+ mark = left;
+ }
+ @Override public int read() throws IOException {
+ if (left == 0) {
+ return -1;
+ }
+ int result = in.read();
+ if (result != -1) {
+ --left;
+ }
+ return result;
+ }
+ @Override public int read(byte[] b, int off, int len) throws IOException {
+ if (left == 0) {
+ return -1;
+ }
+ len = (int) Math.min(len, left);
+ int result = in.read(b, off, len);
+ if (result != -1) {
+ left -= result;
+ }
+ return result;
+ }
+ @Override public synchronized void reset() throws IOException {
+ if (!in.markSupported()) {
+ throw new IOException("Mark not supported");
+ }
+ if (mark == -1) {
+ throw new IOException("Mark not set");
+ }
+ in.reset();
+ left = mark;
+ }
+ @Override public long skip(long n) throws IOException {
+ n = Math.min(n, left);
+ long skipped = in.skip(n);
+ left -= skipped;
+ return skipped;
+ }
+}
diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml
index d271704d98..fe5681d463 100644
--- a/network/shuffle/pom.xml
+++ b/network/shuffle/pom.xml
@@ -51,6 +51,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
+ <version>11.0.2</version> <!-- yarn 2.4.0's version -->
<scope>provided</scope>
</dependency>
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java
index 72ba737b99..9abad1f30a 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java
@@ -126,7 +126,6 @@ public class SparkSaslClient {
logger.trace("SASL client callback: setting realm");
RealmCallback rc = (RealmCallback) callback;
rc.setText(rc.getDefaultText());
- logger.info("Realm callback");
} else if (callback instanceof RealmChoiceCallback) {
// ignore (?)
} else {
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
index 2c0ce40c75..e87b17ead1 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
@@ -34,7 +34,8 @@ import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
-import com.google.common.io.BaseEncoding;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.base64.Base64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -159,12 +160,14 @@ public class SparkSaslServer {
/* Encode a byte[] identifier as a Base64-encoded string. */
public static String encodeIdentifier(String identifier) {
Preconditions.checkNotNull(identifier, "User cannot be null if SASL is enabled");
- return BaseEncoding.base64().encode(identifier.getBytes(Charsets.UTF_8));
+ return Base64.encode(Unpooled.wrappedBuffer(identifier.getBytes(Charsets.UTF_8)))
+ .toString(Charsets.UTF_8);
}
/** Encode a password as a base64-encoded char[] array. */
public static char[] encodePassword(String password) {
Preconditions.checkNotNull(password, "Password cannot be null if SASL is enabled");
- return BaseEncoding.base64().encode(password.getBytes(Charsets.UTF_8)).toCharArray();
+ return Base64.encode(Unpooled.wrappedBuffer(password.getBytes(Charsets.UTF_8)))
+ .toString(Charsets.UTF_8).toCharArray();
}
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index b0b19ba67b..3aa95d00f6 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -17,12 +17,18 @@
package org.apache.spark.network.shuffle;
+import java.util.List;
+
+import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.client.TransportClientBootstrap;
import org.apache.spark.network.client.TransportClientFactory;
+import org.apache.spark.network.sasl.SaslClientBootstrap;
+import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.shuffle.ExternalShuffleMessages.RegisterExecutor;
import org.apache.spark.network.util.JavaUtils;
@@ -37,18 +43,35 @@ import org.apache.spark.network.util.TransportConf;
public class ExternalShuffleClient extends ShuffleClient {
private final Logger logger = LoggerFactory.getLogger(ExternalShuffleClient.class);
- private final TransportClientFactory clientFactory;
+ private final TransportConf conf;
+ private final boolean saslEnabled;
+ private final SecretKeyHolder secretKeyHolder;
+ private TransportClientFactory clientFactory;
private String appId;
- public ExternalShuffleClient(TransportConf conf) {
- TransportContext context = new TransportContext(conf, new NoOpRpcHandler());
- this.clientFactory = context.createClientFactory();
+ /**
+ * Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled,
+ * then secretKeyHolder may be null.
+ */
+ public ExternalShuffleClient(
+ TransportConf conf,
+ SecretKeyHolder secretKeyHolder,
+ boolean saslEnabled) {
+ this.conf = conf;
+ this.secretKeyHolder = secretKeyHolder;
+ this.saslEnabled = saslEnabled;
}
@Override
public void init(String appId) {
this.appId = appId;
+ TransportContext context = new TransportContext(conf, new NoOpRpcHandler());
+ List<TransportClientBootstrap> bootstraps = Lists.newArrayList();
+ if (saslEnabled) {
+ bootstraps.add(new SaslClientBootstrap(conf, appId, secretKeyHolder));
+ }
+ clientFactory = context.createClientFactory(bootstraps);
}
@Override
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index bc101f5384..71e017b9e4 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -135,7 +135,7 @@ public class ExternalShuffleIntegrationSuite {
final Semaphore requestsRemaining = new Semaphore(0);
- ExternalShuffleClient client = new ExternalShuffleClient(conf);
+ ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false);
client.init(APP_ID);
client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
new BlockFetchingListener() {
@@ -267,7 +267,7 @@ public class ExternalShuffleIntegrationSuite {
}
private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo) {
- ExternalShuffleClient client = new ExternalShuffleClient(conf);
+ ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false);
client.init(APP_ID);
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(),
executorId, executorInfo);
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
new file mode 100644
index 0000000000..4c18fcdfbc
--- /dev/null
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import org.apache.spark.network.TestUtils;
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.sasl.SaslRpcHandler;
+import org.apache.spark.network.sasl.SecretKeyHolder;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
+import org.apache.spark.network.util.TransportConf;
+
+public class ExternalShuffleSecuritySuite {
+
+ TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ TransportServer server;
+
+ @Before
+ public void beforeEach() {
+ RpcHandler handler = new SaslRpcHandler(new ExternalShuffleBlockHandler(),
+ new TestSecretKeyHolder("my-app-id", "secret"));
+ TransportContext context = new TransportContext(conf, handler);
+ this.server = context.createServer();
+ }
+
+ @After
+ public void afterEach() {
+ if (server != null) {
+ server.close();
+ server = null;
+ }
+ }
+
+ @Test
+ public void testValid() {
+ validate("my-app-id", "secret");
+ }
+
+ @Test
+ public void testBadAppId() {
+ try {
+ validate("wrong-app-id", "secret");
+ } catch (Exception e) {
+ assertTrue(e.getMessage(), e.getMessage().contains("Wrong appId!"));
+ }
+ }
+
+ @Test
+ public void testBadSecret() {
+ try {
+ validate("my-app-id", "bad-secret");
+ } catch (Exception e) {
+ assertTrue(e.getMessage(), e.getMessage().contains("Mismatched response"));
+ }
+ }
+
+ /** Creates an ExternalShuffleClient and attempts to register with the server. */
+ private void validate(String appId, String secretKey) {
+ ExternalShuffleClient client =
+ new ExternalShuffleClient(conf, new TestSecretKeyHolder(appId, secretKey), true);
+ client.init(appId);
+ // Registration either succeeds or throws an exception.
+ client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
+ new ExecutorShuffleInfo(new String[0], 0, ""));
+ client.close();
+ }
+
+ /** Provides a secret key holder which always returns the given secret key, for a single appId. */
+ static class TestSecretKeyHolder implements SecretKeyHolder {
+ private final String appId;
+ private final String secretKey;
+
+ TestSecretKeyHolder(String appId, String secretKey) {
+ this.appId = appId;
+ this.secretKey = secretKey;
+ }
+
+ @Override
+ public String getSaslUser(String appId) {
+ return "user";
+ }
+
+ @Override
+ public String getSecretKey(String appId) {
+ if (!appId.equals(this.appId)) {
+ throw new IllegalArgumentException("Wrong appId!");
+ }
+ return secretKey;
+ }
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 0f27f55fec..9efe15d01e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -73,7 +73,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, serializer,
blockManagerSize, conf, mapOutputTracker, shuffleManager,
- new NioBlockTransferService(conf, securityMgr))
+ new NioBlockTransferService(conf, securityMgr), securityMgr)
blockManager.initialize("app-id")
tempDirectory = Files.createTempDir()