diff options
author | jerryshao <sshao@hortonworks.com> | 2016-05-10 10:28:36 -0500 |
---|---|---|
committer | Tom Graves <tgraves@yahoo-inc.com> | 2016-05-10 10:28:36 -0500 |
commit | aab99d31a927adfa9216dd14e76493a187b6d6e7 (patch) | |
tree | 0b78c7ef829cd8e08bdbae2874e473f12bc4302b /yarn/src/test/scala | |
parent | a019e6efb71e4dce51ca91e41c3d293cf3a6ccb8 (diff) | |
download | spark-aab99d31a927adfa9216dd14e76493a187b6d6e7.tar.gz spark-aab99d31a927adfa9216dd14e76493a187b6d6e7.tar.bz2 spark-aab99d31a927adfa9216dd14e76493a187b6d6e7.zip |
[SPARK-14963][YARN] Using recoveryPath if NM recovery is enabled
## What changes were proposed in this pull request?
From Hadoop 2.5+, Yarn NM supports NM recovery which using recovery path for auxiliary services such as spark_shuffle, mapreduce_shuffle. So here change to use this path install of NM local dir if NM recovery is enabled.
## How was this patch tested?
Unit test + local test.
Author: jerryshao <sshao@hortonworks.com>
Closes #12994 from jerryshao/SPARK-14963.
Diffstat (limited to 'yarn/src/test/scala')
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala | 101 |
1 files changed, 90 insertions, 11 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 0e433f6c1b..749e656e6d 100644 --- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -19,16 +19,20 @@ package org.apache.spark.network.yarn import java.io.{DataOutputStream, File, FileOutputStream} import scala.annotation.tailrec +import scala.concurrent.duration._ -import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.Path import org.apache.hadoop.yarn.api.records.ApplicationId import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, ApplicationTerminationContext} import org.scalatest.{BeforeAndAfterEach, Matchers} +import org.scalatest.concurrent.Eventually._ +import org.scalatest.concurrent.Timeouts import org.apache.spark.SparkFunSuite import org.apache.spark.network.shuffle.ShuffleTestAccessor import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo +import org.apache.spark.util.Utils class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach { private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration @@ -40,15 +44,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"), classOf[YarnShuffleService].getCanonicalName) yarnConfig.setInt("spark.shuffle.service.port", 0) - - yarnConfig.get("yarn.nodemanager.local-dirs").split(",").foreach { dir => - val d = new File(dir) - if (d.exists()) { - FileUtils.deleteDirectory(d) - } - FileUtils.forceMkdir(d) - logInfo(s"creating yarn.nodemanager.local-dirs: $d") - } + val localDir = Utils.createTempDir() + yarnConfig.set("yarn.nodemanager.local-dirs", localDir.getAbsolutePath) } var s1: YarnShuffleService = null @@ -234,7 +231,89 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd s3.initializeApplication(app2Data) ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver3) should be (Some(shuffleInfo2)) s3.stop() + } + + test("get correct recovery path") { + // Test recovery path is set outside the shuffle service, this is to simulate NM recovery + // enabled scenario, where recovery path will be set by yarn. + s1 = new YarnShuffleService + val recoveryPath = new Path(Utils.createTempDir().toURI) + s1.setRecoveryPath(recoveryPath) + + s1.init(yarnConfig) + s1._recoveryPath should be (recoveryPath) + s1.stop() + // Test recovery path is set inside the shuffle service, this will be happened when NM + // recovery is not enabled or there's no NM recovery (Hadoop 2.5-). + s2 = new YarnShuffleService + s2.init(yarnConfig) + s2._recoveryPath should be + (new Path(yarnConfig.getTrimmedStrings("yarn.nodemanager.local-dirs")(0))) + s2.stop() } -} + test("moving recovery file form NM local dir to recovery path") { + // This is to test when Hadoop is upgrade to 2.5+ and NM recovery is enabled, we should move + // old recovery file to the new path to keep compatibility + + // Simulate s1 is running on old version of Hadoop in which recovery file is in the NM local + // dir. + s1 = new YarnShuffleService + s1.init(yarnConfig) + val app1Id = ApplicationId.newInstance(0, 1) + val app1Data: ApplicationInitializationContext = + new ApplicationInitializationContext("user", app1Id, null) + s1.initializeApplication(app1Data) + val app2Id = ApplicationId.newInstance(0, 2) + val app2Data: ApplicationInitializationContext = + new ApplicationInitializationContext("user", app2Id, null) + s1.initializeApplication(app2Data) + + val execStateFile = s1.registeredExecutorFile + execStateFile should not be (null) + val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER) + val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER) + + val blockHandler = s1.blockHandler + val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler) + ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be (execStateFile) + + blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1) + blockResolver.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2) + ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should + be (Some(shuffleInfo1)) + ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", blockResolver) should + be (Some(shuffleInfo2)) + + assert(execStateFile.exists(), s"$execStateFile did not exist") + + s1.stop() + + // Simulate s2 is running on Hadoop 2.5+ with NM recovery is enabled. + assert(execStateFile.exists()) + val recoveryPath = new Path(Utils.createTempDir().toURI) + s2 = new YarnShuffleService + s2.setRecoveryPath(recoveryPath) + s2.init(yarnConfig) + + val execStateFile2 = s2.registeredExecutorFile + recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString) + eventually(timeout(10 seconds), interval(5 millis)) { + assert(!execStateFile.exists()) + } + + val handler2 = s2.blockHandler + val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2) + + // now we reinitialize only one of the apps, and expect yarn to tell us that app2 was stopped + // during the restart + // Since recovery file is got from old path, so the previous state should be stored. + s2.initializeApplication(app1Data) + s2.stopApplication(new ApplicationTerminationContext(app2Id)) + ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should be (Some(shuffleInfo1)) + ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be (None) + + s2.stop() + } + } |