aboutsummaryrefslogtreecommitdiff
path: root/network/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'network/yarn')
-rw-r--r--network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java15
1 files changed, 9 insertions, 6 deletions
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 63b21222e7..463f99ef33 100644
--- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -17,9 +17,10 @@
package org.apache.spark.network.yarn;
-import java.lang.Override;
import java.nio.ByteBuffer;
+import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -32,10 +33,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.network.TransportContext;
-import org.apache.spark.network.sasl.SaslRpcHandler;
+import org.apache.spark.network.sasl.SaslServerBootstrap;
import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.yarn.util.HadoopConfigProvider;
@@ -103,16 +105,17 @@ public class YarnShuffleService extends AuxiliaryService {
// special RPC handler that filters out unauthenticated fetch requests
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
blockHandler = new ExternalShuffleBlockHandler(transportConf);
- RpcHandler rpcHandler = blockHandler;
+
+ List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
if (authEnabled) {
secretManager = new ShuffleSecretManager();
- rpcHandler = new SaslRpcHandler(rpcHandler, secretManager);
+ bootstraps.add(new SaslServerBootstrap(transportConf, secretManager));
}
int port = conf.getInt(
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
- TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
- shuffleServer = transportContext.createServer(port);
+ TransportContext transportContext = new TransportContext(transportConf, blockHandler);
+ shuffleServer = transportContext.createServer(port, bootstraps);
String authEnabledString = authEnabled ? "enabled" : "not enabled";
logger.info("Started YARN shuffle service for Spark on port {}. " +
"Authentication is {}.", port, authEnabledString);