aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java56
-rw-r--r--yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala12
2 files changed, 50 insertions, 18 deletions
diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index df082e4a92..43c8df721d 100644
--- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.ByteBuffer;
+import java.nio.file.Files;
import java.util.List;
import java.util.Map;
@@ -159,8 +160,7 @@ public class YarnShuffleService extends AuxiliaryService {
// If we don't find one, then we choose a file to use to save the state next time. Even if
// an application was stopped while the NM was down, we expect yarn to call stopApplication()
// when it comes back
- registeredExecutorFile =
- new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
+ registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
@@ -196,7 +196,7 @@ public class YarnShuffleService extends AuxiliaryService {
private void createSecretManager() throws IOException {
secretManager = new ShuffleSecretManager();
- secretsFile = new File(getRecoveryPath().toUri().getPath(), SECRETS_RECOVERY_FILE_NAME);
+ secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);
// Make sure this is protected in case its not in the NM recovery dir
FileSystem fs = FileSystem.getLocal(_conf);
@@ -328,37 +328,59 @@ public class YarnShuffleService extends AuxiliaryService {
}
/**
- * Get the recovery path, this will override the default one to get our own maintained
- * recovery path.
+ * Get the path specific to this auxiliary service to use for recovery.
+ */
+ protected Path getRecoveryPath(String fileName) {
+ return _recoveryPath;
+ }
+
+ /**
+ * Figure out the recovery path and handle moving the DB if YARN NM recovery gets enabled
+ * when it previously was not. If YARN NM recovery is enabled it uses that path, otherwise
+ * it will uses a YARN local dir.
*/
- protected Path getRecoveryPath() {
+ protected File initRecoveryDb(String dbFileName) {
+ if (_recoveryPath != null) {
+ File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbFileName);
+ if (recoveryFile.exists()) {
+ return recoveryFile;
+ }
+ }
+ // db doesn't exist in recovery path go check local dirs for it
String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
for (String dir : localDirs) {
- File f = new File(new Path(dir).toUri().getPath(), RECOVERY_FILE_NAME);
+ File f = new File(new Path(dir).toUri().getPath(), dbFileName);
if (f.exists()) {
if (_recoveryPath == null) {
// If NM recovery is not enabled, we should specify the recovery path using NM local
// dirs, which is compatible with the old code.
_recoveryPath = new Path(dir);
+ return f;
} else {
- // If NM recovery is enabled and the recovery file exists in old NM local dirs, which
- // means old version of Spark already generated the recovery file, we should copy the
- // old file in to a new recovery path for the compatibility.
- if (!f.renameTo(new File(_recoveryPath.toUri().getPath(), RECOVERY_FILE_NAME))) {
- // Fail to move recovery file to new path
- logger.error("Failed to move recovery file {} to the path {}",
- RECOVERY_FILE_NAME, _recoveryPath.toString());
+ // If the recovery path is set then either NM recovery is enabled or another recovery
+ // DB has been initialized. If NM recovery is enabled and had set the recovery path
+ // make sure to move all DBs to the recovery path from the old NM local dirs.
+ // If another DB was initialized first just make sure all the DBs are in the same
+ // location.
+ File newLoc = new File(_recoveryPath.toUri().getPath(), dbFileName);
+ if (!newLoc.equals(f)) {
+ try {
+ Files.move(f.toPath(), newLoc.toPath());
+ } catch (Exception e) {
+ // Fail to move recovery file to new path, just continue on with new DB location
+ logger.error("Failed to move recovery file {} to the path {}",
+ dbFileName, _recoveryPath.toString(), e);
+ }
}
+ return newLoc;
}
- break;
}
}
-
if (_recoveryPath == null) {
_recoveryPath = new Path(localDirs[0]);
}
- return _recoveryPath;
+ return new File(_recoveryPath.toUri().getPath(), dbFileName);
}
/**
diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 9a071862bb..c86bf7f70c 100644
--- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -267,13 +267,15 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s2.stop()
}
- test("moving recovery file form NM local dir to recovery path") {
+ test("moving recovery file from NM local dir to recovery path") {
// This is to test when Hadoop is upgrade to 2.5+ and NM recovery is enabled, we should move
// old recovery file to the new path to keep compatibility
// Simulate s1 is running on old version of Hadoop in which recovery file is in the NM local
// dir.
s1 = new YarnShuffleService
+ // set auth to true to test the secrets recovery
+ yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
s1.init(yarnConfig)
val app1Id = ApplicationId.newInstance(0, 1)
val app1Data: ApplicationInitializationContext =
@@ -286,6 +288,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
val execStateFile = s1.registeredExecutorFile
execStateFile should not be (null)
+ val secretsFile = s1.secretsFile
+ secretsFile should not be (null)
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
@@ -312,10 +316,16 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s2.init(yarnConfig)
val execStateFile2 = s2.registeredExecutorFile
+ val secretsFile2 = s2.secretsFile
+
recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString)
+ recoveryPath.toString should be (new Path(secretsFile2.getParentFile.toURI).toString)
eventually(timeout(10 seconds), interval(5 millis)) {
assert(!execStateFile.exists())
}
+ eventually(timeout(10 seconds), interval(5 millis)) {
+ assert(!secretsFile.exists())
+ }
val handler2 = s2.blockHandler
val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)