aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
Diffstat (limited to 'network')
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java117
-rw-r--r--network/yarn/pom.xml58
-rw-r--r--network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java176
-rw-r--r--network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java42
4 files changed, 393 insertions, 0 deletions
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
new file mode 100644
index 0000000000..e66c4af0f1
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.sasl;
+
+import java.lang.Override;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.sasl.SecretKeyHolder;
+
+/**
+ * A class that manages shuffle secret used by the external shuffle service.
+ */
+public class ShuffleSecretManager implements SecretKeyHolder {
+ private final Logger logger = LoggerFactory.getLogger(ShuffleSecretManager.class);
+ private final ConcurrentHashMap<String, String> shuffleSecretMap;
+
+ private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
+ // Spark user used for authenticating SASL connections
+ // Note that this must match the value in org.apache.spark.SecurityManager
+ private static final String SPARK_SASL_USER = "sparkSaslUser";
+
+ /**
+ * Convert the given string to a byte buffer. The resulting buffer can be converted back to
+ * the same string through {@link #bytesToString(ByteBuffer)}. This is used if the external
+ * shuffle service represents shuffle secrets as bytes buffers instead of strings.
+ */
+ public static ByteBuffer stringToBytes(String s) {
+ return ByteBuffer.wrap(s.getBytes(UTF8_CHARSET));
+ }
+
+ /**
+ * Convert the given byte buffer to a string. The resulting string can be converted back to
+ * the same byte buffer through {@link #stringToBytes(String)}. This is used if the external
+ * shuffle service represents shuffle secrets as bytes buffers instead of strings.
+ */
+ public static String bytesToString(ByteBuffer b) {
+ return new String(b.array(), UTF8_CHARSET);
+ }
+
+ public ShuffleSecretManager() {
+ shuffleSecretMap = new ConcurrentHashMap<String, String>();
+ }
+
+ /**
+ * Register an application with its secret.
+ * Executors need to first authenticate themselves with the same secret before
+ * fetching shuffle files written by other executors in this application.
+ */
+ public void registerApp(String appId, String shuffleSecret) {
+ if (!shuffleSecretMap.contains(appId)) {
+ shuffleSecretMap.put(appId, shuffleSecret);
+ logger.info("Registered shuffle secret for application {}", appId);
+ } else {
+ logger.debug("Application {} already registered", appId);
+ }
+ }
+
+ /**
+ * Register an application with its secret specified as a byte buffer.
+ */
+ public void registerApp(String appId, ByteBuffer shuffleSecret) {
+ registerApp(appId, bytesToString(shuffleSecret));
+ }
+
+ /**
+ * Unregister an application along with its secret.
+ * This is called when the application terminates.
+ */
+ public void unregisterApp(String appId) {
+ if (shuffleSecretMap.contains(appId)) {
+ shuffleSecretMap.remove(appId);
+ logger.info("Unregistered shuffle secret for application {}", appId);
+ } else {
+ logger.warn("Attempted to unregister application {} when it is not registered", appId);
+ }
+ }
+
+ /**
+ * Return the Spark user for authenticating SASL connections.
+ */
+ @Override
+ public String getSaslUser(String appId) {
+ return SPARK_SASL_USER;
+ }
+
+ /**
+ * Return the secret key registered with the given application.
+ * This key is used to authenticate the executors before they can fetch shuffle files
+ * written by this application from the external shuffle service. If the specified
+ * application is not registered, return null.
+ */
+ @Override
+ public String getSecretKey(String appId) {
+ return shuffleSecretMap.get(appId);
+ }
+}
diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml
new file mode 100644
index 0000000000..e60d8c1f78
--- /dev/null
+++ b/network/yarn/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-network-yarn_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project Yarn Shuffle Service Code</name>
+ <url>http://spark.apache.org/</url>
+ <properties>
+ <sbt.project.name>network-yarn</sbt.project.name>
+ </properties>
+
+ <dependencies>
+ <!-- Core dependencies -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-network-shuffle_2.10</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Provided dependencies -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+</project>
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
new file mode 100644
index 0000000000..bb0b8f7e6c
--- /dev/null
+++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.yarn;
+
+import java.lang.Override;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.sasl.SaslRpcHandler;
+import org.apache.spark.network.sasl.ShuffleSecretManager;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
+import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.network.yarn.util.HadoopConfigProvider;
+
+/**
+ * An external shuffle service used by Spark on Yarn.
+ *
+ * This is intended to be a long-running auxiliary service that runs in the NodeManager process.
+ * A Spark application may connect to this service by setting `spark.shuffle.service.enabled`.
+ * The application also automatically derives the service port through `spark.shuffle.service.port`
+ * specified in the Yarn configuration. This is so that both the clients and the server agree on
+ * the same port to communicate on.
+ *
+ * The service also optionally supports authentication. This ensures that executors from one
+ * application cannot read the shuffle files written by those from another. This feature can be
+ * enabled by setting `spark.authenticate` in the Yarn configuration before starting the NM.
+ * Note that the Spark application must also set `spark.authenticate` manually and, unlike in
+ * the case of the service port, will not inherit this setting from the Yarn configuration. This
+ * is because an application running on the same Yarn cluster may choose to not use the external
+ * shuffle service, in which case its setting of `spark.authenticate` should be independent of
+ * the service's.
+ */
+public class YarnShuffleService extends AuxiliaryService {
+ private final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class);
+
+ // Port on which the shuffle server listens for fetch requests
+ private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
+ private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
+
+ // Whether the shuffle server should authenticate fetch requests
+ private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
+ private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
+
+ // An entity that manages the shuffle secret per application
+ // This is used only if authentication is enabled
+ private ShuffleSecretManager secretManager;
+
+ // The actual server that serves shuffle files
+ private TransportServer shuffleServer = null;
+
+ public YarnShuffleService() {
+ super("spark_shuffle");
+ logger.info("Initializing YARN shuffle service for Spark");
+ }
+
+ /**
+ * Return whether authentication is enabled as specified by the configuration.
+ * If so, fetch requests will fail unless the appropriate authentication secret
+ * for the application is provided.
+ */
+ private boolean isAuthenticationEnabled() {
+ return secretManager != null;
+ }
+
+ /**
+ * Start the shuffle server with the given configuration.
+ */
+ @Override
+ protected void serviceInit(Configuration conf) {
+ // If authentication is enabled, set up the shuffle server to use a
+ // special RPC handler that filters out unauthenticated fetch requests
+ boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
+ RpcHandler rpcHandler = new ExternalShuffleBlockHandler();
+ if (authEnabled) {
+ secretManager = new ShuffleSecretManager();
+ rpcHandler = new SaslRpcHandler(rpcHandler, secretManager);
+ }
+
+ int port = conf.getInt(
+ SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
+ TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
+ TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
+ shuffleServer = transportContext.createServer(port);
+ String authEnabledString = authEnabled ? "enabled" : "not enabled";
+ logger.info("Started YARN shuffle service for Spark on port {}. " +
+ "Authentication is {}.", port, authEnabledString);
+ }
+
+ @Override
+ public void initializeApplication(ApplicationInitializationContext context) {
+ String appId = context.getApplicationId().toString();
+ try {
+ ByteBuffer shuffleSecret = context.getApplicationDataForService();
+ logger.info("Initializing application {}", appId);
+ if (isAuthenticationEnabled()) {
+ secretManager.registerApp(appId, shuffleSecret);
+ }
+ } catch (Exception e) {
+ logger.error("Exception when initializing application {}", appId, e);
+ }
+ }
+
+ @Override
+ public void stopApplication(ApplicationTerminationContext context) {
+ String appId = context.getApplicationId().toString();
+ try {
+ logger.info("Stopping application {}", appId);
+ if (isAuthenticationEnabled()) {
+ secretManager.unregisterApp(appId);
+ }
+ } catch (Exception e) {
+ logger.error("Exception when stopping application {}", appId, e);
+ }
+ }
+
+ @Override
+ public void initializeContainer(ContainerInitializationContext context) {
+ ContainerId containerId = context.getContainerId();
+ logger.info("Initializing container {}", containerId);
+ }
+
+ @Override
+ public void stopContainer(ContainerTerminationContext context) {
+ ContainerId containerId = context.getContainerId();
+ logger.info("Stopping container {}", containerId);
+ }
+
+ /**
+ * Close the shuffle server to clean up any associated state.
+ */
+ @Override
+ protected void serviceStop() {
+ try {
+ if (shuffleServer != null) {
+ shuffleServer.close();
+ }
+ } catch (Exception e) {
+ logger.error("Exception when stopping service", e);
+ }
+ }
+
+ // Not currently used
+ @Override
+ public ByteBuffer getMetaData() {
+ return ByteBuffer.allocate(0);
+ }
+
+}
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java
new file mode 100644
index 0000000000..884861752e
--- /dev/null
+++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.yarn.util;
+
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.spark.network.util.ConfigProvider;
+
+/** Use the Hadoop configuration to obtain config values. */
+public class HadoopConfigProvider extends ConfigProvider {
+ private final Configuration conf;
+
+ public HadoopConfigProvider(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public String get(String name) {
+ String value = conf.get(name);
+ if (value == null) {
+ throw new NoSuchElementException(name);
+ }
+ return value;
+ }
+}