From aab99d31a927adfa9216dd14e76493a187b6d6e7 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 10 May 2016 10:28:36 -0500 Subject: [SPARK-14963][YARN] Using recoveryPath if NM recovery is enabled ## What changes were proposed in this pull request? From Hadoop 2.5+, Yarn NM supports NM recovery which using recovery path for auxiliary services such as spark_shuffle, mapreduce_shuffle. So here change to use this path install of NM local dir if NM recovery is enabled. ## How was this patch tested? Unit test + local test. Author: jerryshao Closes #12994 from jerryshao/SPARK-14963. --- .../spark/network/yarn/YarnShuffleService.java | 64 ++++++++++++++++++---- 1 file changed, 53 insertions(+), 11 deletions(-) (limited to 'common/network-yarn/src/main/java/org') diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 4bc3c1a3c8..9807383ec3 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -68,6 +68,8 @@ public class YarnShuffleService extends AuxiliaryService { private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate"; private static final boolean DEFAULT_SPARK_AUTHENTICATE = false; + private static final String RECOVERY_FILE_NAME = "registeredExecutor.ldb"; + // An entity that manages the shuffle secret per application // This is used only if authentication is enabled private ShuffleSecretManager secretManager; @@ -75,6 +77,12 @@ public class YarnShuffleService extends AuxiliaryService { // The actual server that serves shuffle files private TransportServer shuffleServer = null; + private Configuration _conf = null; + + // The recovery path used to shuffle service recovery + @VisibleForTesting + Path _recoveryPath = null; + // Handles registering executors and opening shuffle blocks @VisibleForTesting ExternalShuffleBlockHandler blockHandler; @@ -112,6 +120,7 @@ public class YarnShuffleService extends AuxiliaryService { */ @Override protected void serviceInit(Configuration conf) { + _conf = conf; // In case this NM was killed while there were running spark applications, we need to restore // lost state for the existing executors. We look for an existing file in the NM's local dirs. @@ -119,7 +128,7 @@ public class YarnShuffleService extends AuxiliaryService { // an application was stopped while the NM was down, we expect yarn to call stopApplication() // when it comes back registeredExecutorFile = - findRegisteredExecutorFile(conf.getTrimmedStrings("yarn.nodemanager.local-dirs")); + new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME); TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); // If authentication is enabled, set up the shuffle server to use a @@ -190,16 +199,6 @@ public class YarnShuffleService extends AuxiliaryService { logger.info("Stopping container {}", containerId); } - private File findRegisteredExecutorFile(String[] localDirs) { - for (String dir: localDirs) { - File f = new File(new Path(dir).toUri().getPath(), "registeredExecutors.ldb"); - if (f.exists()) { - return f; - } - } - return new File(new Path(localDirs[0]).toUri().getPath(), "registeredExecutors.ldb"); - } - /** * Close the shuffle server to clean up any associated state. */ @@ -222,4 +221,47 @@ public class YarnShuffleService extends AuxiliaryService { public ByteBuffer getMetaData() { return ByteBuffer.allocate(0); } + + /** + * Set the recovery path for shuffle service recovery when NM is restarted. The method will be + * overrode and called when Hadoop version is 2.5+ and NM recovery is enabled, otherwise we + * have to manually call this to set our own recovery path. + */ + public void setRecoveryPath(Path recoveryPath) { + _recoveryPath = recoveryPath; + } + + /** + * Get the recovery path, this will override the default one to get our own maintained + * recovery path. + */ + protected Path getRecoveryPath() { + String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs"); + for (String dir : localDirs) { + File f = new File(new Path(dir).toUri().getPath(), RECOVERY_FILE_NAME); + if (f.exists()) { + if (_recoveryPath == null) { + // If NM recovery is not enabled, we should specify the recovery path using NM local + // dirs, which is compatible with the old code. + _recoveryPath = new Path(dir); + } else { + // If NM recovery is enabled and the recovery file exists in old NM local dirs, which + // means old version of Spark already generated the recovery file, we should copy the + // old file in to a new recovery path for the compatibility. + if (!f.renameTo(new File(_recoveryPath.toUri().getPath(), RECOVERY_FILE_NAME))) { + // Fail to move recovery file to new path + logger.error("Failed to move recovery file {} to the path {}", + RECOVERY_FILE_NAME, _recoveryPath.toString()); + } + } + break; + } + } + + if (_recoveryPath == null) { + _recoveryPath = new Path(localDirs[0]); + } + + return _recoveryPath; + } } -- cgit v1.2.3