diff options
author | Henry Saputra <hsaputra@apache.org> | 2014-01-12 10:34:13 -0800 |
---|---|---|
committer | Henry Saputra <hsaputra@apache.org> | 2014-01-12 10:34:13 -0800 |
commit | 91a563608e301bb243fca3765d569bde65ad747c (patch) | |
tree | 1dc3da48852d2677846d415b96b4ee0558a54cb5 /yarn/alpha | |
parent | 93a65e5fde64ffed3dbd2a050c1007e077ecd004 (diff) | |
parent | 288a878999848adb130041d1e40c14bfc879cec6 (diff) | |
download | spark-91a563608e301bb243fca3765d569bde65ad747c.tar.gz spark-91a563608e301bb243fca3765d569bde65ad747c.tar.bz2 spark-91a563608e301bb243fca3765d569bde65ad747c.zip |
Merge branch 'master' into remove_simpleredundantreturn_scala
Diffstat (limited to 'yarn/alpha')
-rw-r--r-- | yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 13 | ||||
-rw-r--r-- | yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala | 28 |
2 files changed, 30 insertions, 11 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 2bb11e54c5..2e46d750c4 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -127,14 +127,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // local dirs, so lets check both. We assume one of the 2 is set. // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) - .getOrElse(Option(System.getenv("LOCAL_DIRS")) - .getOrElse("")) - - if (localDirs.isEmpty()) { - throw new Exception("Yarn Local dirs can't be empty") + .orElse(Option(System.getenv("LOCAL_DIRS"))) + + localDirs match { + case None => throw new Exception("Yarn Local dirs can't be empty") + case Some(l) => l } - localDirs - } + } private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 0138d7ade1..9fe4d64a0f 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -76,6 +76,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar def run() { + // Setup the directories so things go to yarn approved directories rather + // then user specified and /tmp. + System.setProperty("spark.local.dir", getLocalDirs()) + appAttemptId = getApplicationAttemptId() resourceManager = registerWithResourceManager() val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() @@ -103,10 +107,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - // must be <= timeoutInterval/ 2. - // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. - // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. - val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L)) + // we want to be reasonably responsive without causing too many requests to RM. + val schedulerInterval = + System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + // must be <= timeoutInterval / 2. + val interval = math.min(timeoutInterval / 2, schedulerInterval) + reporterThread = launchReporterThread(interval) // Wait for the reporter thread to Finish. @@ -119,6 +125,20 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar System.exit(0) } + /** Get the Yarn approved local directories. */ + private def getLocalDirs(): String = { + // Hadoop 0.23 and 2.x have different Environment variable names for the + // local dirs, so lets check both. We assume one of the 2 is set. + // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X + val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) + .orElse(Option(System.getenv("LOCAL_DIRS"))) + + localDirs match { + case None => throw new Exception("Yarn Local dirs can't be empty") + case Some(l) => l + } + } + private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV) |