aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala16
-rwxr-xr-xmake-distribution.sh3
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java117
-rw-r--r--network/yarn/pom.xml58
-rw-r--r--network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java176
-rw-r--r--network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java42
-rw-r--r--pom.xml2
-rw-r--r--project/SparkBuild.scala8
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala16
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala16
12 files changed, 483 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index c11f1db006..ef93009a07 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -66,7 +66,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// Lower and upper bounds on the number of executors. These are required.
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
- verifyBounds()
// How long there must be backlogged tasks for before an addition is triggered
private val schedulerBacklogTimeout = conf.getLong(
@@ -77,9 +76,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)
// How long an executor must be idle for before it is removed
- private val removeThresholdSeconds = conf.getLong(
+ private val executorIdleTimeout = conf.getLong(
"spark.dynamicAllocation.executorIdleTimeout", 600)
+ // During testing, the methods to actually kill and add executors are mocked out
+ private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
+
+ validateSettings()
+
// Number of executors to add in the next round
private var numExecutorsToAdd = 1
@@ -103,17 +107,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// Polling loop interval (ms)
private val intervalMillis: Long = 100
- // Whether we are testing this class. This should only be used internally.
- private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
-
// Clock used to schedule when executors should be added and removed
private var clock: Clock = new RealClock
/**
- * Verify that the lower and upper bounds on the number of executors are valid.
+ * Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
*/
- private def verifyBounds(): Unit = {
+ private def validateSettings(): Unit = {
if (minNumExecutors < 0 || maxNumExecutors < 0) {
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
}
@@ -124,6 +125,22 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
}
+ if (schedulerBacklogTimeout <= 0) {
+ throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
+ }
+ if (sustainedSchedulerBacklogTimeout <= 0) {
+ throw new SparkException(
+ "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
+ }
+ if (executorIdleTimeout <= 0) {
+ throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!")
+ }
+ // Require external shuffle service for dynamic allocation
+ // Otherwise, we may lose shuffle files when killing executors
+ if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) {
+ throw new SparkException("Dynamic allocation of executors requires the external " +
+ "shuffle service. You may enable this through spark.shuffle.service.enabled.")
+ }
}
/**
@@ -254,7 +271,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
val removeRequestAcknowledged = testing || sc.killExecutor(executorId)
if (removeRequestAcknowledged) {
logInfo(s"Removing executor $executorId because it has been idle for " +
- s"$removeThresholdSeconds seconds (new desired total will be ${numExistingExecutors - 1})")
+ s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")
executorsPendingToRemove.add(executorId)
true
} else {
@@ -329,8 +346,8 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
private def onExecutorIdle(executorId: String): Unit = synchronized {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
- s"scheduled to run on the executor (to expire in $removeThresholdSeconds seconds)")
- removeTimes(executorId) = clock.getTimeMillis + removeThresholdSeconds * 1000
+ s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
+ removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a5fb87b9b2..e48d7772d6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -40,7 +40,6 @@ import org.apache.spark.network.util.{ConfigProvider, TransportConf}
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.shuffle.hash.HashShuffleManager
-import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.util._
private[spark] sealed trait BlockValues
@@ -97,7 +96,12 @@ private[spark] class BlockManager(
private[spark]
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
- private val externalShuffleServicePort = conf.getInt("spark.shuffle.service.port", 7337)
+
+ // Port used by the external shuffle service. In Yarn mode, this may be already be
+ // set through the Hadoop configuration as the server is launched in the Yarn NM.
+ private val externalShuffleServicePort =
+ Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
+
// Check that we're not using external shuffle service with consolidated shuffle files.
if (externalShuffleServiceEnabled
&& conf.getBoolean("spark.shuffle.consolidateFiles", false)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 6ab94af9f3..7caf6bcf94 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -45,6 +45,7 @@ import org.json4s._
import tachyon.client.{TachyonFile,TachyonFS}
import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
/** CallSite represents a place in user code. It can have a short and a long form. */
@@ -1780,6 +1781,21 @@ private[spark] object Utils extends Logging {
val manifest = new JarManifest(manifestUrl.openStream())
manifest.getMainAttributes.getValue(Name.IMPLEMENTATION_VERSION)
}.getOrElse("Unknown")
+
+ /**
+ * Return the value of a config either through the SparkConf or the Hadoop configuration
+ * if this is Yarn mode. In the latter case, this defaults to the value set through SparkConf
+ * if the key is not set in the Hadoop configuration.
+ */
+ def getSparkOrYarnConfig(conf: SparkConf, key: String, default: String): String = {
+ val sparkValue = conf.get(key, default)
+ if (SparkHadoopUtil.get.isYarnMode) {
+ SparkHadoopUtil.get.newConfiguration(conf).get(key, sparkValue)
+ } else {
+ sparkValue
+ }
+ }
+
}
/**
diff --git a/make-distribution.sh b/make-distribution.sh
index 0bc839e1db..fac7f7e284 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -181,6 +181,9 @@ echo "Spark $VERSION$GITREVSTRING built for Hadoop $SPARK_HADOOP_VERSION" > "$DI
# Copy jars
cp "$FWDIR"/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/lib/"
cp "$FWDIR"/examples/target/scala*/spark-examples*.jar "$DISTDIR/lib/"
+cp "$FWDIR"/network/yarn/target/scala*/spark-network-yarn*.jar "$DISTDIR/lib/"
+cp "$FWDIR"/network/yarn/target/scala*/spark-network-shuffle*.jar "$DISTDIR/lib/"
+cp "$FWDIR"/network/yarn/target/scala*/spark-network-common*.jar "$DISTDIR/lib/"
# Copy example sources (needed for python and SQL)
mkdir -p "$DISTDIR/examples/src/main"
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
new file mode 100644
index 0000000000..e66c4af0f1
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.sasl;
+
+import java.lang.Override;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.sasl.SecretKeyHolder;
+
+/**
+ * A class that manages shuffle secret used by the external shuffle service.
+ */
+public class ShuffleSecretManager implements SecretKeyHolder {
+ private final Logger logger = LoggerFactory.getLogger(ShuffleSecretManager.class);
+ private final ConcurrentHashMap<String, String> shuffleSecretMap;
+
+ private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
+ // Spark user used for authenticating SASL connections
+ // Note that this must match the value in org.apache.spark.SecurityManager
+ private static final String SPARK_SASL_USER = "sparkSaslUser";
+
+ /**
+ * Convert the given string to a byte buffer. The resulting buffer can be converted back to
+ * the same string through {@link #bytesToString(ByteBuffer)}. This is used if the external
+ * shuffle service represents shuffle secrets as bytes buffers instead of strings.
+ */
+ public static ByteBuffer stringToBytes(String s) {
+ return ByteBuffer.wrap(s.getBytes(UTF8_CHARSET));
+ }
+
+ /**
+ * Convert the given byte buffer to a string. The resulting string can be converted back to
+ * the same byte buffer through {@link #stringToBytes(String)}. This is used if the external
+ * shuffle service represents shuffle secrets as bytes buffers instead of strings.
+ */
+ public static String bytesToString(ByteBuffer b) {
+ return new String(b.array(), UTF8_CHARSET);
+ }
+
+ public ShuffleSecretManager() {
+ shuffleSecretMap = new ConcurrentHashMap<String, String>();
+ }
+
+ /**
+ * Register an application with its secret.
+ * Executors need to first authenticate themselves with the same secret before
+ * fetching shuffle files written by other executors in this application.
+ */
+ public void registerApp(String appId, String shuffleSecret) {
+ if (!shuffleSecretMap.contains(appId)) {
+ shuffleSecretMap.put(appId, shuffleSecret);
+ logger.info("Registered shuffle secret for application {}", appId);
+ } else {
+ logger.debug("Application {} already registered", appId);
+ }
+ }
+
+ /**
+ * Register an application with its secret specified as a byte buffer.
+ */
+ public void registerApp(String appId, ByteBuffer shuffleSecret) {
+ registerApp(appId, bytesToString(shuffleSecret));
+ }
+
+ /**
+ * Unregister an application along with its secret.
+ * This is called when the application terminates.
+ */
+ public void unregisterApp(String appId) {
+ if (shuffleSecretMap.contains(appId)) {
+ shuffleSecretMap.remove(appId);
+ logger.info("Unregistered shuffle secret for application {}", appId);
+ } else {
+ logger.warn("Attempted to unregister application {} when it is not registered", appId);
+ }
+ }
+
+ /**
+ * Return the Spark user for authenticating SASL connections.
+ */
+ @Override
+ public String getSaslUser(String appId) {
+ return SPARK_SASL_USER;
+ }
+
+ /**
+ * Return the secret key registered with the given application.
+ * This key is used to authenticate the executors before they can fetch shuffle files
+ * written by this application from the external shuffle service. If the specified
+ * application is not registered, return null.
+ */
+ @Override
+ public String getSecretKey(String appId) {
+ return shuffleSecretMap.get(appId);
+ }
+}
diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml
new file mode 100644
index 0000000000..e60d8c1f78
--- /dev/null
+++ b/network/yarn/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-network-yarn_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project Yarn Shuffle Service Code</name>
+ <url>http://spark.apache.org/</url>
+ <properties>
+ <sbt.project.name>network-yarn</sbt.project.name>
+ </properties>
+
+ <dependencies>
+ <!-- Core dependencies -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-network-shuffle_2.10</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Provided dependencies -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+</project>
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
new file mode 100644
index 0000000000..bb0b8f7e6c
--- /dev/null
+++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.yarn;
+
+import java.lang.Override;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.sasl.SaslRpcHandler;
+import org.apache.spark.network.sasl.ShuffleSecretManager;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
+import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.network.yarn.util.HadoopConfigProvider;
+
+/**
+ * An external shuffle service used by Spark on Yarn.
+ *
+ * This is intended to be a long-running auxiliary service that runs in the NodeManager process.
+ * A Spark application may connect to this service by setting `spark.shuffle.service.enabled`.
+ * The application also automatically derives the service port through `spark.shuffle.service.port`
+ * specified in the Yarn configuration. This is so that both the clients and the server agree on
+ * the same port to communicate on.
+ *
+ * The service also optionally supports authentication. This ensures that executors from one
+ * application cannot read the shuffle files written by those from another. This feature can be
+ * enabled by setting `spark.authenticate` in the Yarn configuration before starting the NM.
+ * Note that the Spark application must also set `spark.authenticate` manually and, unlike in
+ * the case of the service port, will not inherit this setting from the Yarn configuration. This
+ * is because an application running on the same Yarn cluster may choose to not use the external
+ * shuffle service, in which case its setting of `spark.authenticate` should be independent of
+ * the service's.
+ */
+public class YarnShuffleService extends AuxiliaryService {
+ private final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class);
+
+ // Port on which the shuffle server listens for fetch requests
+ private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
+ private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
+
+ // Whether the shuffle server should authenticate fetch requests
+ private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
+ private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
+
+ // An entity that manages the shuffle secret per application
+ // This is used only if authentication is enabled
+ private ShuffleSecretManager secretManager;
+
+ // The actual server that serves shuffle files
+ private TransportServer shuffleServer = null;
+
+ public YarnShuffleService() {
+ super("spark_shuffle");
+ logger.info("Initializing YARN shuffle service for Spark");
+ }
+
+ /**
+ * Return whether authentication is enabled as specified by the configuration.
+ * If so, fetch requests will fail unless the appropriate authentication secret
+ * for the application is provided.
+ */
+ private boolean isAuthenticationEnabled() {
+ return secretManager != null;
+ }
+
+ /**
+ * Start the shuffle server with the given configuration.
+ */
+ @Override
+ protected void serviceInit(Configuration conf) {
+ // If authentication is enabled, set up the shuffle server to use a
+ // special RPC handler that filters out unauthenticated fetch requests
+ boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
+ RpcHandler rpcHandler = new ExternalShuffleBlockHandler();
+ if (authEnabled) {
+ secretManager = new ShuffleSecretManager();
+ rpcHandler = new SaslRpcHandler(rpcHandler, secretManager);
+ }
+
+ int port = conf.getInt(
+ SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
+ TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
+ TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
+ shuffleServer = transportContext.createServer(port);
+ String authEnabledString = authEnabled ? "enabled" : "not enabled";
+ logger.info("Started YARN shuffle service for Spark on port {}. " +
+ "Authentication is {}.", port, authEnabledString);
+ }
+
+ @Override
+ public void initializeApplication(ApplicationInitializationContext context) {
+ String appId = context.getApplicationId().toString();
+ try {
+ ByteBuffer shuffleSecret = context.getApplicationDataForService();
+ logger.info("Initializing application {}", appId);
+ if (isAuthenticationEnabled()) {
+ secretManager.registerApp(appId, shuffleSecret);
+ }
+ } catch (Exception e) {
+ logger.error("Exception when initializing application {}", appId, e);
+ }
+ }
+
+ @Override
+ public void stopApplication(ApplicationTerminationContext context) {
+ String appId = context.getApplicationId().toString();
+ try {
+ logger.info("Stopping application {}", appId);
+ if (isAuthenticationEnabled()) {
+ secretManager.unregisterApp(appId);
+ }
+ } catch (Exception e) {
+ logger.error("Exception when stopping application {}", appId, e);
+ }
+ }
+
+ @Override
+ public void initializeContainer(ContainerInitializationContext context) {
+ ContainerId containerId = context.getContainerId();
+ logger.info("Initializing container {}", containerId);
+ }
+
+ @Override
+ public void stopContainer(ContainerTerminationContext context) {
+ ContainerId containerId = context.getContainerId();
+ logger.info("Stopping container {}", containerId);
+ }
+
+ /**
+ * Close the shuffle server to clean up any associated state.
+ */
+ @Override
+ protected void serviceStop() {
+ try {
+ if (shuffleServer != null) {
+ shuffleServer.close();
+ }
+ } catch (Exception e) {
+ logger.error("Exception when stopping service", e);
+ }
+ }
+
+ // Not currently used
+ @Override
+ public ByteBuffer getMetaData() {
+ return ByteBuffer.allocate(0);
+ }
+
+}
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java
new file mode 100644
index 0000000000..884861752e
--- /dev/null
+++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.yarn.util;
+
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.spark.network.util.ConfigProvider;
+
+/** Use the Hadoop configuration to obtain config values. */
+public class HadoopConfigProvider extends ConfigProvider {
+ private final Configuration conf;
+
+ public HadoopConfigProvider(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public String get(String name) {
+ String value = conf.get(name);
+ if (value == null) {
+ throw new NoSuchElementException(name);
+ }
+ return value;
+ }
+}
diff --git a/pom.xml b/pom.xml
index eb613531b8..88ef67c515 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1229,6 +1229,7 @@
<id>yarn-alpha</id>
<modules>
<module>yarn</module>
+ <module>network/yarn</module>
</modules>
</profile>
@@ -1236,6 +1237,7 @@
<id>yarn</id>
<modules>
<module>yarn</module>
+ <module>network/yarn</module>
</modules>
</profile>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 33618f5401..657e4b4432 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -38,9 +38,9 @@ object BuildCommons {
"streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
"streaming-zeromq").map(ProjectRef(buildLocation, _))
- val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, java8Tests, sparkGangliaLgpl, sparkKinesisAsl) =
- Seq("yarn", "yarn-stable", "yarn-alpha", "java8-tests", "ganglia-lgpl", "kinesis-asl")
- .map(ProjectRef(buildLocation, _))
+ val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, networkYarn, java8Tests,
+ sparkGangliaLgpl, sparkKinesisAsl) = Seq("yarn", "yarn-stable", "yarn-alpha", "network-yarn",
+ "java8-tests", "ganglia-lgpl", "kinesis-asl").map(ProjectRef(buildLocation, _))
val assemblyProjects@Seq(assembly, examples) = Seq("assembly", "examples")
.map(ProjectRef(buildLocation, _))
@@ -143,7 +143,7 @@ object SparkBuild extends PomBuild {
// TODO: Add Sql to mima checks
allProjects.filterNot(x => Seq(spark, sql, hive, hiveThriftServer, catalyst, repl,
- streamingFlumeSink, networkCommon, networkShuffle).contains(x)).foreach {
+ streamingFlumeSink, networkCommon, networkShuffle, networkYarn).contains(x)).foreach {
x => enable(MimaBuild.mimaSettings(sparkHome, x))(x)
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 7ee4b5c842..5f47c79cab 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
import org.apache.spark.{SecurityManager, SparkConf, Logging}
+import org.apache.spark.network.sasl.ShuffleSecretManager
@deprecated("use yarn/stable", "1.2.0")
class ExecutorRunnable(
@@ -90,6 +91,21 @@ class ExecutorRunnable(
ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
+ // If external shuffle service is enabled, register with the Yarn shuffle service already
+ // started on the NodeManager and, if authentication is enabled, provide it with our secret
+ // key for fetching shuffle files later
+ if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
+ val secretString = securityMgr.getSecretKey()
+ val secretBytes =
+ if (secretString != null) {
+ ShuffleSecretManager.stringToBytes(secretString)
+ } else {
+ // Authentication is not enabled, so just provide dummy metadata
+ ByteBuffer.allocate(0)
+ }
+ ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes))
+ }
+
// Send the start request to the ContainerManager
val startReq = Records.newRecord(classOf[StartContainerRequest])
.asInstanceOf[StartContainerRequest]
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 0b5a92d87d..18f48b4b6c 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
import org.apache.spark.{SecurityManager, SparkConf, Logging}
+import org.apache.spark.network.sasl.ShuffleSecretManager
class ExecutorRunnable(
@@ -89,6 +90,21 @@ class ExecutorRunnable(
ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
+ // If external shuffle service is enabled, register with the Yarn shuffle service already
+ // started on the NodeManager and, if authentication is enabled, provide it with our secret
+ // key for fetching shuffle files later
+ if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
+ val secretString = securityMgr.getSecretKey()
+ val secretBytes =
+ if (secretString != null) {
+ ShuffleSecretManager.stringToBytes(secretString)
+ } else {
+ // Authentication is not enabled, so just provide dummy metadata
+ ByteBuffer.allocate(0)
+ }
+ ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes))
+ }
+
// Send the start request to the ContainerManager
nmClient.startContainer(container, ctx)
}