aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java75
-rw-r--r--docs/job-scheduling.md13
-rw-r--r--docs/running-on-yarn.md31
-rw-r--r--yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala34
4 files changed, 106 insertions, 47 deletions
diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 8a05628c8f..df17dacdef 100644
--- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -70,6 +70,11 @@ public class YarnShuffleService extends AuxiliaryService {
private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
+ // Whether failure during service initialization should stop the NM.
+ @VisibleForTesting
+ static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
+ private static final boolean DEFAULT_STOP_ON_FAILURE = false;
+
// An entity that manages the shuffle secret per application
// This is used only if authentication is enabled
private ShuffleSecretManager secretManager;
@@ -119,44 +124,50 @@ public class YarnShuffleService extends AuxiliaryService {
* Start the shuffle server with the given configuration.
*/
@Override
- protected void serviceInit(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
_conf = conf;
- // In case this NM was killed while there were running spark applications, we need to restore
- // lost state for the existing executors. We look for an existing file in the NM's local dirs.
- // If we don't find one, then we choose a file to use to save the state next time. Even if
- // an application was stopped while the NM was down, we expect yarn to call stopApplication()
- // when it comes back
- registeredExecutorFile =
- new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
-
- TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
- // If authentication is enabled, set up the shuffle server to use a
- // special RPC handler that filters out unauthenticated fetch requests
- boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
+ boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);
+
try {
+ // In case this NM was killed while there were running spark applications, we need to restore
+ // lost state for the existing executors. We look for an existing file in the NM's local dirs.
+ // If we don't find one, then we choose a file to use to save the state next time. Even if
+ // an application was stopped while the NM was down, we expect yarn to call stopApplication()
+ // when it comes back
+ registeredExecutorFile =
+ new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
+
+ TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
- } catch (Exception e) {
- logger.error("Failed to initialize external shuffle service", e);
- }
- List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
- if (authEnabled) {
- secretManager = new ShuffleSecretManager();
- bootstraps.add(new SaslServerBootstrap(transportConf, secretManager));
- }
+ // If authentication is enabled, set up the shuffle server to use a
+ // special RPC handler that filters out unauthenticated fetch requests
+ boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
+ List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
+ if (authEnabled) {
+ secretManager = new ShuffleSecretManager();
+ bootstraps.add(new SaslServerBootstrap(transportConf, secretManager));
+ }
- int port = conf.getInt(
- SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
- TransportContext transportContext = new TransportContext(transportConf, blockHandler);
- shuffleServer = transportContext.createServer(port, bootstraps);
- // the port should normally be fixed, but for tests its useful to find an open port
- port = shuffleServer.getPort();
- boundPort = port;
- String authEnabledString = authEnabled ? "enabled" : "not enabled";
- logger.info("Started YARN shuffle service for Spark on port {}. " +
- "Authentication is {}. Registered executor file is {}", port, authEnabledString,
- registeredExecutorFile);
+ int port = conf.getInt(
+ SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
+ TransportContext transportContext = new TransportContext(transportConf, blockHandler);
+ shuffleServer = transportContext.createServer(port, bootstraps);
+ // the port should normally be fixed, but for tests its useful to find an open port
+ port = shuffleServer.getPort();
+ boundPort = port;
+ String authEnabledString = authEnabled ? "enabled" : "not enabled";
+ logger.info("Started YARN shuffle service for Spark on port {}. " +
+ "Authentication is {}. Registered executor file is {}", port, authEnabledString,
+ registeredExecutorFile);
+ } catch (Exception e) {
+ if (stopOnFailure) {
+ throw e;
+ } else {
+ noteFailure(e);
+ }
+ }
}
@Override
diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md
index 40b6cd99cc..807944f20a 100644
--- a/docs/job-scheduling.md
+++ b/docs/job-scheduling.md
@@ -83,18 +83,7 @@ In Mesos coarse-grained mode, run `$SPARK_HOME/sbin/start-mesos-shuffle-service.
slave nodes with `spark.shuffle.service.enabled` set to `true`. For instance, you may do so
through Marathon.
-In YARN mode, start the shuffle service on each `NodeManager` as follows:
-
-1. Build Spark with the [YARN profile](building-spark.html). Skip this step if you are using a
-pre-packaged distribution.
-2. Locate the `spark-<version>-yarn-shuffle.jar`. This should be under
-`$SPARK_HOME/common/network-yarn/target/scala-<version>` if you are building Spark yourself, and under
-`lib` if you are using a distribution.
-2. Add this jar to the classpath of all `NodeManager`s in your cluster.
-3. In the `yarn-site.xml` on each node, add `spark_shuffle` to `yarn.nodemanager.aux-services`,
-then set `yarn.nodemanager.aux-services.spark_shuffle.class` to
-`org.apache.spark.network.yarn.YarnShuffleService`.
-4. Restart all `NodeManager`s in your cluster.
+In YARN mode, follow the instructions [here](running-on-yarn.html#configuring-the-external-shuffle-service).
All other relevant configurations are optional and under the `spark.dynamicAllocation.*` and
`spark.shuffle.service.*` namespaces. For more detail, see the
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 4e92042da6..befd3eaee9 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -539,6 +539,37 @@ launch time. This is done by listing them in the `spark.yarn.access.namenodes` p
spark.yarn.access.namenodes hdfs://ireland.example.org:8020/,hdfs://frankfurt.example.org:8020/
```
+## Configuring the External Shuffle Service
+
+To start the Spark Shuffle Service on each `NodeManager` in your YARN cluster, follow these
+instructions:
+
+1. Build Spark with the [YARN profile](building-spark.html). Skip this step if you are using a
+pre-packaged distribution.
+1. Locate the `spark-<version>-yarn-shuffle.jar`. This should be under
+`$SPARK_HOME/common/network-yarn/target/scala-<version>` if you are building Spark yourself, and under
+`lib` if you are using a distribution.
+1. Add this jar to the classpath of all `NodeManager`s in your cluster.
+1. In the `yarn-site.xml` on each node, add `spark_shuffle` to `yarn.nodemanager.aux-services`,
+then set `yarn.nodemanager.aux-services.spark_shuffle.class` to
+`org.apache.spark.network.yarn.YarnShuffleService`.
+1. Restart all `NodeManager`s in your cluster.
+
+The following extra configuration options are available when the shuffle service is running on YARN:
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr>
+ <td><code>spark.yarn.shuffle.stopOnFailure</code></td>
+ <td><code>false</code></td>
+ <td>
+ Whether to stop the NodeManager when there's a failure in the Spark Shuffle Service's
+ initialization. This prevents application failures caused by running containers on
+ NodeManagers where the Spark Shuffle Service is not running.
+ </td>
+</tr>
+</table>
+
## Launching your application with Apache Oozie
Apache Oozie can launch Spark applications as part of a workflow.
diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 5458fb9d2e..e123e78541 100644
--- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -16,13 +16,17 @@
*/
package org.apache.spark.network.yarn
-import java.io.{DataOutputStream, File, FileOutputStream}
+import java.io.{DataOutputStream, File, FileOutputStream, IOException}
+import java.nio.file.Files
+import java.nio.file.attribute.PosixFilePermission._
+import java.util.EnumSet
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.service.ServiceStateException
import org.apache.hadoop.yarn.api.records.ApplicationId
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, ApplicationTerminationContext}
@@ -45,7 +49,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
classOf[YarnShuffleService].getCanonicalName)
yarnConfig.setInt("spark.shuffle.service.port", 0)
val localDir = Utils.createTempDir()
- yarnConfig.set("yarn.nodemanager.local-dirs", localDir.getAbsolutePath)
+ yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath)
}
var s1: YarnShuffleService = null
@@ -316,4 +320,28 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s2.stop()
}
- }
+
+ test("service throws error if cannot start") {
+ // Create a different config with a read-only local dir.
+ val roConfig = new YarnConfiguration(yarnConfig)
+ val roDir = Utils.createTempDir()
+ Files.setPosixFilePermissions(roDir.toPath(), EnumSet.of(OWNER_READ, OWNER_EXECUTE))
+ roConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath())
+ roConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true)
+
+ // Try to start the shuffle service, it should fail.
+ val service = new YarnShuffleService()
+
+ try {
+ val error = intercept[ServiceStateException] {
+ service.init(roConfig)
+ }
+ assert(error.getCause().isInstanceOf[IOException])
+ } finally {
+ service.stop()
+ Files.setPosixFilePermissions(roDir.toPath(),
+ EnumSet.of(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE))
+ }
+ }
+
+}