aboutsummaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java75
1 files changed, 43 insertions, 32 deletions
diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 8a05628c8f..df17dacdef 100644
--- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -70,6 +70,11 @@ public class YarnShuffleService extends AuxiliaryService {
private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
+ // Whether failure during service initialization should stop the NM.
+ @VisibleForTesting
+ static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
+ private static final boolean DEFAULT_STOP_ON_FAILURE = false;
+
// An entity that manages the shuffle secret per application
// This is used only if authentication is enabled
private ShuffleSecretManager secretManager;
@@ -119,44 +124,50 @@ public class YarnShuffleService extends AuxiliaryService {
* Start the shuffle server with the given configuration.
*/
@Override
- protected void serviceInit(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
_conf = conf;
- // In case this NM was killed while there were running spark applications, we need to restore
- // lost state for the existing executors. We look for an existing file in the NM's local dirs.
- // If we don't find one, then we choose a file to use to save the state next time. Even if
- // an application was stopped while the NM was down, we expect yarn to call stopApplication()
- // when it comes back
- registeredExecutorFile =
- new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
-
- TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
- // If authentication is enabled, set up the shuffle server to use a
- // special RPC handler that filters out unauthenticated fetch requests
- boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
+ boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);
+
try {
+ // In case this NM was killed while there were running spark applications, we need to restore
+ // lost state for the existing executors. We look for an existing file in the NM's local dirs.
+ // If we don't find one, then we choose a file to use to save the state next time. Even if
+ // an application was stopped while the NM was down, we expect yarn to call stopApplication()
+ // when it comes back
+ registeredExecutorFile =
+ new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
+
+ TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
- } catch (Exception e) {
- logger.error("Failed to initialize external shuffle service", e);
- }
- List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
- if (authEnabled) {
- secretManager = new ShuffleSecretManager();
- bootstraps.add(new SaslServerBootstrap(transportConf, secretManager));
- }
+ // If authentication is enabled, set up the shuffle server to use a
+ // special RPC handler that filters out unauthenticated fetch requests
+ boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
+ List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
+ if (authEnabled) {
+ secretManager = new ShuffleSecretManager();
+ bootstraps.add(new SaslServerBootstrap(transportConf, secretManager));
+ }
- int port = conf.getInt(
- SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
- TransportContext transportContext = new TransportContext(transportConf, blockHandler);
- shuffleServer = transportContext.createServer(port, bootstraps);
- // the port should normally be fixed, but for tests its useful to find an open port
- port = shuffleServer.getPort();
- boundPort = port;
- String authEnabledString = authEnabled ? "enabled" : "not enabled";
- logger.info("Started YARN shuffle service for Spark on port {}. " +
- "Authentication is {}. Registered executor file is {}", port, authEnabledString,
- registeredExecutorFile);
+ int port = conf.getInt(
+ SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
+ TransportContext transportContext = new TransportContext(transportConf, blockHandler);
+ shuffleServer = transportContext.createServer(port, bootstraps);
+ // the port should normally be fixed, but for tests its useful to find an open port
+ port = shuffleServer.getPort();
+ boundPort = port;
+ String authEnabledString = authEnabled ? "enabled" : "not enabled";
+ logger.info("Started YARN shuffle service for Spark on port {}. " +
+ "Authentication is {}. Registered executor file is {}", port, authEnabledString,
+ registeredExecutorFile);
+ } catch (Exception e) {
+ if (stopOnFailure) {
+ throw e;
+ } else {
+ noteFailure(e);
+ }
+ }
}
@Override