aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-05-10 10:28:36 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-05-10 10:28:36 -0500
commitaab99d31a927adfa9216dd14e76493a187b6d6e7 (patch)
tree0b78c7ef829cd8e08bdbae2874e473f12bc4302b
parenta019e6efb71e4dce51ca91e41c3d293cf3a6ccb8 (diff)
downloadspark-aab99d31a927adfa9216dd14e76493a187b6d6e7.tar.gz
spark-aab99d31a927adfa9216dd14e76493a187b6d6e7.tar.bz2
spark-aab99d31a927adfa9216dd14e76493a187b6d6e7.zip
[SPARK-14963][YARN] Using recoveryPath if NM recovery is enabled
## What changes were proposed in this pull request? From Hadoop 2.5+, Yarn NM supports NM recovery which using recovery path for auxiliary services such as spark_shuffle, mapreduce_shuffle. So here change to use this path install of NM local dir if NM recovery is enabled. ## How was this patch tested? Unit test + local test. Author: jerryshao <sshao@hortonworks.com> Closes #12994 from jerryshao/SPARK-14963.
-rw-r--r--common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java64
-rw-r--r--yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala101
2 files changed, 143 insertions, 22 deletions
diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 4bc3c1a3c8..9807383ec3 100644
--- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -68,6 +68,8 @@ public class YarnShuffleService extends AuxiliaryService {
private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
+ private static final String RECOVERY_FILE_NAME = "registeredExecutor.ldb";
+
// An entity that manages the shuffle secret per application
// This is used only if authentication is enabled
private ShuffleSecretManager secretManager;
@@ -75,6 +77,12 @@ public class YarnShuffleService extends AuxiliaryService {
// The actual server that serves shuffle files
private TransportServer shuffleServer = null;
+ private Configuration _conf = null;
+
+ // The recovery path used to shuffle service recovery
+ @VisibleForTesting
+ Path _recoveryPath = null;
+
// Handles registering executors and opening shuffle blocks
@VisibleForTesting
ExternalShuffleBlockHandler blockHandler;
@@ -112,6 +120,7 @@ public class YarnShuffleService extends AuxiliaryService {
*/
@Override
protected void serviceInit(Configuration conf) {
+ _conf = conf;
// In case this NM was killed while there were running spark applications, we need to restore
// lost state for the existing executors. We look for an existing file in the NM's local dirs.
@@ -119,7 +128,7 @@ public class YarnShuffleService extends AuxiliaryService {
// an application was stopped while the NM was down, we expect yarn to call stopApplication()
// when it comes back
registeredExecutorFile =
- findRegisteredExecutorFile(conf.getTrimmedStrings("yarn.nodemanager.local-dirs"));
+ new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
// If authentication is enabled, set up the shuffle server to use a
@@ -190,16 +199,6 @@ public class YarnShuffleService extends AuxiliaryService {
logger.info("Stopping container {}", containerId);
}
- private File findRegisteredExecutorFile(String[] localDirs) {
- for (String dir: localDirs) {
- File f = new File(new Path(dir).toUri().getPath(), "registeredExecutors.ldb");
- if (f.exists()) {
- return f;
- }
- }
- return new File(new Path(localDirs[0]).toUri().getPath(), "registeredExecutors.ldb");
- }
-
/**
* Close the shuffle server to clean up any associated state.
*/
@@ -222,4 +221,47 @@ public class YarnShuffleService extends AuxiliaryService {
public ByteBuffer getMetaData() {
return ByteBuffer.allocate(0);
}
+
+ /**
+ * Set the recovery path for shuffle service recovery when NM is restarted. The method will be
+ * overrode and called when Hadoop version is 2.5+ and NM recovery is enabled, otherwise we
+ * have to manually call this to set our own recovery path.
+ */
+ public void setRecoveryPath(Path recoveryPath) {
+ _recoveryPath = recoveryPath;
+ }
+
+ /**
+ * Get the recovery path, this will override the default one to get our own maintained
+ * recovery path.
+ */
+ protected Path getRecoveryPath() {
+ String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
+ for (String dir : localDirs) {
+ File f = new File(new Path(dir).toUri().getPath(), RECOVERY_FILE_NAME);
+ if (f.exists()) {
+ if (_recoveryPath == null) {
+ // If NM recovery is not enabled, we should specify the recovery path using NM local
+ // dirs, which is compatible with the old code.
+ _recoveryPath = new Path(dir);
+ } else {
+ // If NM recovery is enabled and the recovery file exists in old NM local dirs, which
+ // means old version of Spark already generated the recovery file, we should copy the
+ // old file in to a new recovery path for the compatibility.
+ if (!f.renameTo(new File(_recoveryPath.toUri().getPath(), RECOVERY_FILE_NAME))) {
+ // Fail to move recovery file to new path
+ logger.error("Failed to move recovery file {} to the path {}",
+ RECOVERY_FILE_NAME, _recoveryPath.toString());
+ }
+ }
+ break;
+ }
+ }
+
+ if (_recoveryPath == null) {
+ _recoveryPath = new Path(localDirs[0]);
+ }
+
+ return _recoveryPath;
+ }
}
diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 0e433f6c1b..749e656e6d 100644
--- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -19,16 +19,20 @@ package org.apache.spark.network.yarn
import java.io.{DataOutputStream, File, FileOutputStream}
import scala.annotation.tailrec
+import scala.concurrent.duration._
-import org.apache.commons.io.FileUtils
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.api.records.ApplicationId
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, ApplicationTerminationContext}
import org.scalatest.{BeforeAndAfterEach, Matchers}
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.Timeouts
import org.apache.spark.SparkFunSuite
import org.apache.spark.network.shuffle.ShuffleTestAccessor
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
+import org.apache.spark.util.Utils
class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
@@ -40,15 +44,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
classOf[YarnShuffleService].getCanonicalName)
yarnConfig.setInt("spark.shuffle.service.port", 0)
-
- yarnConfig.get("yarn.nodemanager.local-dirs").split(",").foreach { dir =>
- val d = new File(dir)
- if (d.exists()) {
- FileUtils.deleteDirectory(d)
- }
- FileUtils.forceMkdir(d)
- logInfo(s"creating yarn.nodemanager.local-dirs: $d")
- }
+ val localDir = Utils.createTempDir()
+ yarnConfig.set("yarn.nodemanager.local-dirs", localDir.getAbsolutePath)
}
var s1: YarnShuffleService = null
@@ -234,7 +231,89 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s3.initializeApplication(app2Data)
ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver3) should be (Some(shuffleInfo2))
s3.stop()
+ }
+
+ test("get correct recovery path") {
+ // Test recovery path is set outside the shuffle service, this is to simulate NM recovery
+ // enabled scenario, where recovery path will be set by yarn.
+ s1 = new YarnShuffleService
+ val recoveryPath = new Path(Utils.createTempDir().toURI)
+ s1.setRecoveryPath(recoveryPath)
+
+ s1.init(yarnConfig)
+ s1._recoveryPath should be (recoveryPath)
+ s1.stop()
+ // Test recovery path is set inside the shuffle service, this will be happened when NM
+ // recovery is not enabled or there's no NM recovery (Hadoop 2.5-).
+ s2 = new YarnShuffleService
+ s2.init(yarnConfig)
+ s2._recoveryPath should be
+ (new Path(yarnConfig.getTrimmedStrings("yarn.nodemanager.local-dirs")(0)))
+ s2.stop()
}
-}
+ test("moving recovery file form NM local dir to recovery path") {
+ // This is to test when Hadoop is upgrade to 2.5+ and NM recovery is enabled, we should move
+ // old recovery file to the new path to keep compatibility
+
+ // Simulate s1 is running on old version of Hadoop in which recovery file is in the NM local
+ // dir.
+ s1 = new YarnShuffleService
+ s1.init(yarnConfig)
+ val app1Id = ApplicationId.newInstance(0, 1)
+ val app1Data: ApplicationInitializationContext =
+ new ApplicationInitializationContext("user", app1Id, null)
+ s1.initializeApplication(app1Data)
+ val app2Id = ApplicationId.newInstance(0, 2)
+ val app2Data: ApplicationInitializationContext =
+ new ApplicationInitializationContext("user", app2Id, null)
+ s1.initializeApplication(app2Data)
+
+ val execStateFile = s1.registeredExecutorFile
+ execStateFile should not be (null)
+ val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
+ val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
+
+ val blockHandler = s1.blockHandler
+ val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
+ ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be (execStateFile)
+
+ blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1)
+ blockResolver.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
+ ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should
+ be (Some(shuffleInfo1))
+ ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", blockResolver) should
+ be (Some(shuffleInfo2))
+
+ assert(execStateFile.exists(), s"$execStateFile did not exist")
+
+ s1.stop()
+
+ // Simulate s2 is running on Hadoop 2.5+ with NM recovery is enabled.
+ assert(execStateFile.exists())
+ val recoveryPath = new Path(Utils.createTempDir().toURI)
+ s2 = new YarnShuffleService
+ s2.setRecoveryPath(recoveryPath)
+ s2.init(yarnConfig)
+
+ val execStateFile2 = s2.registeredExecutorFile
+ recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString)
+ eventually(timeout(10 seconds), interval(5 millis)) {
+ assert(!execStateFile.exists())
+ }
+
+ val handler2 = s2.blockHandler
+ val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)
+
+ // now we reinitialize only one of the apps, and expect yarn to tell us that app2 was stopped
+ // during the restart
+ // Since recovery file is got from old path, so the previous state should be stored.
+ s2.initializeApplication(app1Data)
+ s2.stopApplication(new ApplicationTerminationContext(app2Id))
+ ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should be (Some(shuffleInfo1))
+ ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be (None)
+
+ s2.stop()
+ }
+ }