From b7b5e17876f65c6644505c356f1a0db24ce1d142 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 14 Jul 2016 09:42:32 -0500 Subject: [SPARK-16505][YARN] Optionally propagate error during shuffle service startup. This prevents the NM from starting when something is wrong, which would lead to later errors which are confusing and harder to debug. Added a unit test to verify startup fails if something is wrong. Author: Marcelo Vanzin Closes #14162 from vanzin/SPARK-16505. --- .../spark/network/yarn/YarnShuffleService.java | 75 +++++++++++++--------- 1 file changed, 43 insertions(+), 32 deletions(-) (limited to 'common/network-yarn') diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 8a05628c8f..df17dacdef 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -70,6 +70,11 @@ public class YarnShuffleService extends AuxiliaryService { private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb"; + // Whether failure during service initialization should stop the NM. + @VisibleForTesting + static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure"; + private static final boolean DEFAULT_STOP_ON_FAILURE = false; + // An entity that manages the shuffle secret per application // This is used only if authentication is enabled private ShuffleSecretManager secretManager; @@ -119,44 +124,50 @@ public class YarnShuffleService extends AuxiliaryService { * Start the shuffle server with the given configuration. */ @Override - protected void serviceInit(Configuration conf) { + protected void serviceInit(Configuration conf) throws Exception { _conf = conf; - // In case this NM was killed while there were running spark applications, we need to restore - // lost state for the existing executors. We look for an existing file in the NM's local dirs. - // If we don't find one, then we choose a file to use to save the state next time. Even if - // an application was stopped while the NM was down, we expect yarn to call stopApplication() - // when it comes back - registeredExecutorFile = - new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME); - - TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); - // If authentication is enabled, set up the shuffle server to use a - // special RPC handler that filters out unauthenticated fetch requests - boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); + boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE); + try { + // In case this NM was killed while there were running spark applications, we need to restore + // lost state for the existing executors. We look for an existing file in the NM's local dirs. + // If we don't find one, then we choose a file to use to save the state next time. Even if + // an application was stopped while the NM was down, we expect yarn to call stopApplication() + // when it comes back + registeredExecutorFile = + new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME); + + TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); - } catch (Exception e) { - logger.error("Failed to initialize external shuffle service", e); - } - List bootstraps = Lists.newArrayList(); - if (authEnabled) { - secretManager = new ShuffleSecretManager(); - bootstraps.add(new SaslServerBootstrap(transportConf, secretManager)); - } + // If authentication is enabled, set up the shuffle server to use a + // special RPC handler that filters out unauthenticated fetch requests + boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); + List bootstraps = Lists.newArrayList(); + if (authEnabled) { + secretManager = new ShuffleSecretManager(); + bootstraps.add(new SaslServerBootstrap(transportConf, secretManager)); + } - int port = conf.getInt( - SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); - TransportContext transportContext = new TransportContext(transportConf, blockHandler); - shuffleServer = transportContext.createServer(port, bootstraps); - // the port should normally be fixed, but for tests its useful to find an open port - port = shuffleServer.getPort(); - boundPort = port; - String authEnabledString = authEnabled ? "enabled" : "not enabled"; - logger.info("Started YARN shuffle service for Spark on port {}. " + - "Authentication is {}. Registered executor file is {}", port, authEnabledString, - registeredExecutorFile); + int port = conf.getInt( + SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); + TransportContext transportContext = new TransportContext(transportConf, blockHandler); + shuffleServer = transportContext.createServer(port, bootstraps); + // the port should normally be fixed, but for tests its useful to find an open port + port = shuffleServer.getPort(); + boundPort = port; + String authEnabledString = authEnabled ? "enabled" : "not enabled"; + logger.info("Started YARN shuffle service for Spark on port {}. " + + "Authentication is {}. Registered executor file is {}", port, authEnabledString, + registeredExecutorFile); + } catch (Exception e) { + if (stopOnFailure) { + throw e; + } else { + noteFailure(e); + } + } } @Override -- cgit v1.2.3