aboutsummaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-05-10 10:28:36 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-05-10 10:28:36 -0500
commitaab99d31a927adfa9216dd14e76493a187b6d6e7 (patch)
tree0b78c7ef829cd8e08bdbae2874e473f12bc4302b /common
parenta019e6efb71e4dce51ca91e41c3d293cf3a6ccb8 (diff)
downloadspark-aab99d31a927adfa9216dd14e76493a187b6d6e7.tar.gz
spark-aab99d31a927adfa9216dd14e76493a187b6d6e7.tar.bz2
spark-aab99d31a927adfa9216dd14e76493a187b6d6e7.zip
[SPARK-14963][YARN] Using recoveryPath if NM recovery is enabled
## What changes were proposed in this pull request? From Hadoop 2.5+, Yarn NM supports NM recovery which using recovery path for auxiliary services such as spark_shuffle, mapreduce_shuffle. So here change to use this path install of NM local dir if NM recovery is enabled. ## How was this patch tested? Unit test + local test. Author: jerryshao <sshao@hortonworks.com> Closes #12994 from jerryshao/SPARK-14963.
Diffstat (limited to 'common')
-rw-r--r--common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java64
1 files changed, 53 insertions, 11 deletions
diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 4bc3c1a3c8..9807383ec3 100644
--- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -68,6 +68,8 @@ public class YarnShuffleService extends AuxiliaryService {
private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
+ private static final String RECOVERY_FILE_NAME = "registeredExecutor.ldb";
+
// An entity that manages the shuffle secret per application
// This is used only if authentication is enabled
private ShuffleSecretManager secretManager;
@@ -75,6 +77,12 @@ public class YarnShuffleService extends AuxiliaryService {
// The actual server that serves shuffle files
private TransportServer shuffleServer = null;
+ private Configuration _conf = null;
+
+ // The recovery path used to shuffle service recovery
+ @VisibleForTesting
+ Path _recoveryPath = null;
+
// Handles registering executors and opening shuffle blocks
@VisibleForTesting
ExternalShuffleBlockHandler blockHandler;
@@ -112,6 +120,7 @@ public class YarnShuffleService extends AuxiliaryService {
*/
@Override
protected void serviceInit(Configuration conf) {
+ _conf = conf;
// In case this NM was killed while there were running spark applications, we need to restore
// lost state for the existing executors. We look for an existing file in the NM's local dirs.
@@ -119,7 +128,7 @@ public class YarnShuffleService extends AuxiliaryService {
// an application was stopped while the NM was down, we expect yarn to call stopApplication()
// when it comes back
registeredExecutorFile =
- findRegisteredExecutorFile(conf.getTrimmedStrings("yarn.nodemanager.local-dirs"));
+ new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
// If authentication is enabled, set up the shuffle server to use a
@@ -190,16 +199,6 @@ public class YarnShuffleService extends AuxiliaryService {
logger.info("Stopping container {}", containerId);
}
- private File findRegisteredExecutorFile(String[] localDirs) {
- for (String dir: localDirs) {
- File f = new File(new Path(dir).toUri().getPath(), "registeredExecutors.ldb");
- if (f.exists()) {
- return f;
- }
- }
- return new File(new Path(localDirs[0]).toUri().getPath(), "registeredExecutors.ldb");
- }
-
/**
* Close the shuffle server to clean up any associated state.
*/
@@ -222,4 +221,47 @@ public class YarnShuffleService extends AuxiliaryService {
public ByteBuffer getMetaData() {
return ByteBuffer.allocate(0);
}
+
+ /**
+ * Set the recovery path for shuffle service recovery when NM is restarted. The method will be
+ * overrode and called when Hadoop version is 2.5+ and NM recovery is enabled, otherwise we
+ * have to manually call this to set our own recovery path.
+ */
+ public void setRecoveryPath(Path recoveryPath) {
+ _recoveryPath = recoveryPath;
+ }
+
+ /**
+ * Get the recovery path, this will override the default one to get our own maintained
+ * recovery path.
+ */
+ protected Path getRecoveryPath() {
+ String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
+ for (String dir : localDirs) {
+ File f = new File(new Path(dir).toUri().getPath(), RECOVERY_FILE_NAME);
+ if (f.exists()) {
+ if (_recoveryPath == null) {
+ // If NM recovery is not enabled, we should specify the recovery path using NM local
+ // dirs, which is compatible with the old code.
+ _recoveryPath = new Path(dir);
+ } else {
+ // If NM recovery is enabled and the recovery file exists in old NM local dirs, which
+ // means old version of Spark already generated the recovery file, we should copy the
+ // old file in to a new recovery path for the compatibility.
+ if (!f.renameTo(new File(_recoveryPath.toUri().getPath(), RECOVERY_FILE_NAME))) {
+ // Fail to move recovery file to new path
+ logger.error("Failed to move recovery file {} to the path {}",
+ RECOVERY_FILE_NAME, _recoveryPath.toString());
+ }
+ }
+ break;
+ }
+ }
+
+ if (_recoveryPath == null) {
+ _recoveryPath = new Path(localDirs[0]);
+ }
+
+ return _recoveryPath;
+ }
}