aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-05-10 10:28:36 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-05-10 10:28:36 -0500
commitaab99d31a927adfa9216dd14e76493a187b6d6e7 (patch)
tree0b78c7ef829cd8e08bdbae2874e473f12bc4302b /yarn
parenta019e6efb71e4dce51ca91e41c3d293cf3a6ccb8 (diff)
downloadspark-aab99d31a927adfa9216dd14e76493a187b6d6e7.tar.gz
spark-aab99d31a927adfa9216dd14e76493a187b6d6e7.tar.bz2
spark-aab99d31a927adfa9216dd14e76493a187b6d6e7.zip
[SPARK-14963][YARN] Using recoveryPath if NM recovery is enabled
## What changes were proposed in this pull request? From Hadoop 2.5+, Yarn NM supports NM recovery which using recovery path for auxiliary services such as spark_shuffle, mapreduce_shuffle. So here change to use this path install of NM local dir if NM recovery is enabled. ## How was this patch tested? Unit test + local test. Author: jerryshao <sshao@hortonworks.com> Closes #12994 from jerryshao/SPARK-14963.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala101
1 files changed, 90 insertions, 11 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 0e433f6c1b..749e656e6d 100644
--- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -19,16 +19,20 @@ package org.apache.spark.network.yarn
import java.io.{DataOutputStream, File, FileOutputStream}
import scala.annotation.tailrec
+import scala.concurrent.duration._
-import org.apache.commons.io.FileUtils
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.api.records.ApplicationId
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, ApplicationTerminationContext}
import org.scalatest.{BeforeAndAfterEach, Matchers}
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.Timeouts
import org.apache.spark.SparkFunSuite
import org.apache.spark.network.shuffle.ShuffleTestAccessor
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
+import org.apache.spark.util.Utils
class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
@@ -40,15 +44,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
classOf[YarnShuffleService].getCanonicalName)
yarnConfig.setInt("spark.shuffle.service.port", 0)
-
- yarnConfig.get("yarn.nodemanager.local-dirs").split(",").foreach { dir =>
- val d = new File(dir)
- if (d.exists()) {
- FileUtils.deleteDirectory(d)
- }
- FileUtils.forceMkdir(d)
- logInfo(s"creating yarn.nodemanager.local-dirs: $d")
- }
+ val localDir = Utils.createTempDir()
+ yarnConfig.set("yarn.nodemanager.local-dirs", localDir.getAbsolutePath)
}
var s1: YarnShuffleService = null
@@ -234,7 +231,89 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s3.initializeApplication(app2Data)
ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver3) should be (Some(shuffleInfo2))
s3.stop()
+ }
+
+ test("get correct recovery path") {
+ // Test recovery path is set outside the shuffle service, this is to simulate NM recovery
+ // enabled scenario, where recovery path will be set by yarn.
+ s1 = new YarnShuffleService
+ val recoveryPath = new Path(Utils.createTempDir().toURI)
+ s1.setRecoveryPath(recoveryPath)
+
+ s1.init(yarnConfig)
+ s1._recoveryPath should be (recoveryPath)
+ s1.stop()
+ // Test recovery path is set inside the shuffle service, this will be happened when NM
+ // recovery is not enabled or there's no NM recovery (Hadoop 2.5-).
+ s2 = new YarnShuffleService
+ s2.init(yarnConfig)
+ s2._recoveryPath should be
+ (new Path(yarnConfig.getTrimmedStrings("yarn.nodemanager.local-dirs")(0)))
+ s2.stop()
}
-}
+ test("moving recovery file form NM local dir to recovery path") {
+ // This is to test when Hadoop is upgrade to 2.5+ and NM recovery is enabled, we should move
+ // old recovery file to the new path to keep compatibility
+
+ // Simulate s1 is running on old version of Hadoop in which recovery file is in the NM local
+ // dir.
+ s1 = new YarnShuffleService
+ s1.init(yarnConfig)
+ val app1Id = ApplicationId.newInstance(0, 1)
+ val app1Data: ApplicationInitializationContext =
+ new ApplicationInitializationContext("user", app1Id, null)
+ s1.initializeApplication(app1Data)
+ val app2Id = ApplicationId.newInstance(0, 2)
+ val app2Data: ApplicationInitializationContext =
+ new ApplicationInitializationContext("user", app2Id, null)
+ s1.initializeApplication(app2Data)
+
+ val execStateFile = s1.registeredExecutorFile
+ execStateFile should not be (null)
+ val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
+ val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
+
+ val blockHandler = s1.blockHandler
+ val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
+ ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be (execStateFile)
+
+ blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1)
+ blockResolver.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
+ ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should
+ be (Some(shuffleInfo1))
+ ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", blockResolver) should
+ be (Some(shuffleInfo2))
+
+ assert(execStateFile.exists(), s"$execStateFile did not exist")
+
+ s1.stop()
+
+ // Simulate s2 is running on Hadoop 2.5+ with NM recovery is enabled.
+ assert(execStateFile.exists())
+ val recoveryPath = new Path(Utils.createTempDir().toURI)
+ s2 = new YarnShuffleService
+ s2.setRecoveryPath(recoveryPath)
+ s2.init(yarnConfig)
+
+ val execStateFile2 = s2.registeredExecutorFile
+ recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString)
+ eventually(timeout(10 seconds), interval(5 millis)) {
+ assert(!execStateFile.exists())
+ }
+
+ val handler2 = s2.blockHandler
+ val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)
+
+ // now we reinitialize only one of the apps, and expect yarn to tell us that app2 was stopped
+ // during the restart
+ // Since recovery file is got from old path, so the previous state should be stored.
+ s2.initializeApplication(app1Data)
+ s2.stopApplication(new ApplicationTerminationContext(app2Id))
+ ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should be (Some(shuffleInfo1))
+ ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be (None)
+
+ s2.stop()
+ }
+ }