diff options
author | Henry Saputra <hsaputra@apache.org> | 2014-01-12 10:34:13 -0800 |
---|---|---|
committer | Henry Saputra <hsaputra@apache.org> | 2014-01-12 10:34:13 -0800 |
commit | 91a563608e301bb243fca3765d569bde65ad747c (patch) | |
tree | 1dc3da48852d2677846d415b96b4ee0558a54cb5 /yarn/stable/src | |
parent | 93a65e5fde64ffed3dbd2a050c1007e077ecd004 (diff) | |
parent | 288a878999848adb130041d1e40c14bfc879cec6 (diff) | |
download | spark-91a563608e301bb243fca3765d569bde65ad747c.tar.gz spark-91a563608e301bb243fca3765d569bde65ad747c.tar.bz2 spark-91a563608e301bb243fca3765d569bde65ad747c.zip |
Merge branch 'master' into remove_simpleredundantreturn_scala
Diffstat (limited to 'yarn/stable/src')
3 files changed, 31 insertions, 11 deletions
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 69ae14ce83..4b777d5fa7 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -116,14 +116,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // local dirs, so lets check both. We assume one of the 2 is set. // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) - .getOrElse(Option(System.getenv("LOCAL_DIRS")) - .getOrElse("")) - - if (localDirs.isEmpty()) { - throw new Exception("Yarn Local dirs can't be empty") + .orElse(Option(System.getenv("LOCAL_DIRS"))) + + localDirs match { + case None => throw new Exception("Yarn Local dirs can't be empty") + case Some(l) => l } - localDirs - } + } private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index efeee31acd..c084485734 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -99,6 +99,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) appContext.setApplicationName(args.appName) appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(amContainer) + appContext.setApplicationType("SPARK") // Memory for the ApplicationMaster. val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 49248a8516..78353224fa 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -78,6 +78,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar def run() { + // Setup the directories so things go to yarn approved directories rather + // then user specified and /tmp. + System.setProperty("spark.local.dir", getLocalDirs()) + amClient = AMRMClient.createAMRMClient() amClient.init(yarnConf) amClient.start() @@ -94,10 +98,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - // must be <= timeoutInterval/ 2. - // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. - // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. - val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L)) + // we want to be reasonably responsive without causing too many requests to RM. + val schedulerInterval = + System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + // must be <= timeoutInterval / 2. + val interval = math.min(timeoutInterval / 2, schedulerInterval) + reporterThread = launchReporterThread(interval) // Wait for the reporter thread to Finish. @@ -110,6 +116,20 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar System.exit(0) } + /** Get the Yarn approved local directories. */ + private def getLocalDirs(): String = { + // Hadoop 0.23 and 2.x have different Environment variable names for the + // local dirs, so lets check both. We assume one of the 2 is set. + // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X + val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) + .orElse(Option(System.getenv("LOCAL_DIRS"))) + + localDirs match { + case None => throw new Exception("Yarn Local dirs can't be empty") + case Some(l) => l + } + } + private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name()) |