From aab99d31a927adfa9216dd14e76493a187b6d6e7 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 10 May 2016 10:28:36 -0500 Subject: [SPARK-14963][YARN] Using recoveryPath if NM recovery is enabled ## What changes were proposed in this pull request? From Hadoop 2.5+, Yarn NM supports NM recovery which using recovery path for auxiliary services such as spark_shuffle, mapreduce_shuffle. So here change to use this path install of NM local dir if NM recovery is enabled. ## How was this patch tested? Unit test + local test. Author: jerryshao Closes #12994 from jerryshao/SPARK-14963. --- .../spark/network/yarn/YarnShuffleService.java | 64 ++++++++++--- .../network/yarn/YarnShuffleServiceSuite.scala | 101 ++++++++++++++++++--- 2 files changed, 143 insertions(+), 22 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 4bc3c1a3c8..9807383ec3 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -68,6 +68,8 @@ public class YarnShuffleService extends AuxiliaryService { private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate"; private static final boolean DEFAULT_SPARK_AUTHENTICATE = false; + private static final String RECOVERY_FILE_NAME = "registeredExecutor.ldb"; + // An entity that manages the shuffle secret per application // This is used only if authentication is enabled private ShuffleSecretManager secretManager; @@ -75,6 +77,12 @@ public class YarnShuffleService extends AuxiliaryService { // The actual server that serves shuffle files private TransportServer shuffleServer = null; + private Configuration _conf = null; + + // The recovery path used to shuffle service recovery + @VisibleForTesting + Path _recoveryPath = null; + // Handles registering executors and opening shuffle blocks @VisibleForTesting ExternalShuffleBlockHandler blockHandler; @@ -112,6 +120,7 @@ public class YarnShuffleService extends AuxiliaryService { */ @Override protected void serviceInit(Configuration conf) { + _conf = conf; // In case this NM was killed while there were running spark applications, we need to restore // lost state for the existing executors. We look for an existing file in the NM's local dirs. @@ -119,7 +128,7 @@ public class YarnShuffleService extends AuxiliaryService { // an application was stopped while the NM was down, we expect yarn to call stopApplication() // when it comes back registeredExecutorFile = - findRegisteredExecutorFile(conf.getTrimmedStrings("yarn.nodemanager.local-dirs")); + new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME); TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); // If authentication is enabled, set up the shuffle server to use a @@ -190,16 +199,6 @@ public class YarnShuffleService extends AuxiliaryService { logger.info("Stopping container {}", containerId); } - private File findRegisteredExecutorFile(String[] localDirs) { - for (String dir: localDirs) { - File f = new File(new Path(dir).toUri().getPath(), "registeredExecutors.ldb"); - if (f.exists()) { - return f; - } - } - return new File(new Path(localDirs[0]).toUri().getPath(), "registeredExecutors.ldb"); - } - /** * Close the shuffle server to clean up any associated state. */ @@ -222,4 +221,47 @@ public class YarnShuffleService extends AuxiliaryService { public ByteBuffer getMetaData() { return ByteBuffer.allocate(0); } + + /** + * Set the recovery path for shuffle service recovery when NM is restarted. The method will be + * overrode and called when Hadoop version is 2.5+ and NM recovery is enabled, otherwise we + * have to manually call this to set our own recovery path. + */ + public void setRecoveryPath(Path recoveryPath) { + _recoveryPath = recoveryPath; + } + + /** + * Get the recovery path, this will override the default one to get our own maintained + * recovery path. + */ + protected Path getRecoveryPath() { + String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs"); + for (String dir : localDirs) { + File f = new File(new Path(dir).toUri().getPath(), RECOVERY_FILE_NAME); + if (f.exists()) { + if (_recoveryPath == null) { + // If NM recovery is not enabled, we should specify the recovery path using NM local + // dirs, which is compatible with the old code. + _recoveryPath = new Path(dir); + } else { + // If NM recovery is enabled and the recovery file exists in old NM local dirs, which + // means old version of Spark already generated the recovery file, we should copy the + // old file in to a new recovery path for the compatibility. + if (!f.renameTo(new File(_recoveryPath.toUri().getPath(), RECOVERY_FILE_NAME))) { + // Fail to move recovery file to new path + logger.error("Failed to move recovery file {} to the path {}", + RECOVERY_FILE_NAME, _recoveryPath.toString()); + } + } + break; + } + } + + if (_recoveryPath == null) { + _recoveryPath = new Path(localDirs[0]); + } + + return _recoveryPath; + } } diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 0e433f6c1b..749e656e6d 100644 --- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -19,16 +19,20 @@ package org.apache.spark.network.yarn import java.io.{DataOutputStream, File, FileOutputStream} import scala.annotation.tailrec +import scala.concurrent.duration._ -import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.Path import org.apache.hadoop.yarn.api.records.ApplicationId import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, ApplicationTerminationContext} import org.scalatest.{BeforeAndAfterEach, Matchers} +import org.scalatest.concurrent.Eventually._ +import org.scalatest.concurrent.Timeouts import org.apache.spark.SparkFunSuite import org.apache.spark.network.shuffle.ShuffleTestAccessor import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo +import org.apache.spark.util.Utils class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach { private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration @@ -40,15 +44,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"), classOf[YarnShuffleService].getCanonicalName) yarnConfig.setInt("spark.shuffle.service.port", 0) - - yarnConfig.get("yarn.nodemanager.local-dirs").split(",").foreach { dir => - val d = new File(dir) - if (d.exists()) { - FileUtils.deleteDirectory(d) - } - FileUtils.forceMkdir(d) - logInfo(s"creating yarn.nodemanager.local-dirs: $d") - } + val localDir = Utils.createTempDir() + yarnConfig.set("yarn.nodemanager.local-dirs", localDir.getAbsolutePath) } var s1: YarnShuffleService = null @@ -234,7 +231,89 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd s3.initializeApplication(app2Data) ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver3) should be (Some(shuffleInfo2)) s3.stop() + } + + test("get correct recovery path") { + // Test recovery path is set outside the shuffle service, this is to simulate NM recovery + // enabled scenario, where recovery path will be set by yarn. + s1 = new YarnShuffleService + val recoveryPath = new Path(Utils.createTempDir().toURI) + s1.setRecoveryPath(recoveryPath) + + s1.init(yarnConfig) + s1._recoveryPath should be (recoveryPath) + s1.stop() + // Test recovery path is set inside the shuffle service, this will be happened when NM + // recovery is not enabled or there's no NM recovery (Hadoop 2.5-). + s2 = new YarnShuffleService + s2.init(yarnConfig) + s2._recoveryPath should be + (new Path(yarnConfig.getTrimmedStrings("yarn.nodemanager.local-dirs")(0))) + s2.stop() } -} + test("moving recovery file form NM local dir to recovery path") { + // This is to test when Hadoop is upgrade to 2.5+ and NM recovery is enabled, we should move + // old recovery file to the new path to keep compatibility + + // Simulate s1 is running on old version of Hadoop in which recovery file is in the NM local + // dir. + s1 = new YarnShuffleService + s1.init(yarnConfig) + val app1Id = ApplicationId.newInstance(0, 1) + val app1Data: ApplicationInitializationContext = + new ApplicationInitializationContext("user", app1Id, null) + s1.initializeApplication(app1Data) + val app2Id = ApplicationId.newInstance(0, 2) + val app2Data: ApplicationInitializationContext = + new ApplicationInitializationContext("user", app2Id, null) + s1.initializeApplication(app2Data) + + val execStateFile = s1.registeredExecutorFile + execStateFile should not be (null) + val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER) + val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER) + + val blockHandler = s1.blockHandler + val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler) + ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be (execStateFile) + + blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1) + blockResolver.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2) + ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should + be (Some(shuffleInfo1)) + ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", blockResolver) should + be (Some(shuffleInfo2)) + + assert(execStateFile.exists(), s"$execStateFile did not exist") + + s1.stop() + + // Simulate s2 is running on Hadoop 2.5+ with NM recovery is enabled. + assert(execStateFile.exists()) + val recoveryPath = new Path(Utils.createTempDir().toURI) + s2 = new YarnShuffleService + s2.setRecoveryPath(recoveryPath) + s2.init(yarnConfig) + + val execStateFile2 = s2.registeredExecutorFile + recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString) + eventually(timeout(10 seconds), interval(5 millis)) { + assert(!execStateFile.exists()) + } + + val handler2 = s2.blockHandler + val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2) + + // now we reinitialize only one of the apps, and expect yarn to tell us that app2 was stopped + // during the restart + // Since recovery file is got from old path, so the previous state should be stored. + s2.initializeApplication(app1Data) + s2.stopApplication(new ApplicationTerminationContext(app2Id)) + ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should be (Some(shuffleInfo1)) + ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be (None) + + s2.stop() + } + } -- cgit v1.2.3