aboutsummaryrefslogtreecommitdiff
path: root/yarn/alpha
diff options
context:
space:
mode:
authorThomas Graves <tgraves@apache.org>2014-01-09 09:53:51 -0600
committerThomas Graves <tgraves@apache.org>2014-01-09 10:24:35 -0600
commitc617083e478e3cfbddc4232060aa7b7a0c5812d4 (patch)
tree1614e57257d4fccef5aff928a9faa89a8a387861 /yarn/alpha
parent365cac94652cd012bf3783f74eed98c95b884bbb (diff)
downloadspark-c617083e478e3cfbddc4232060aa7b7a0c5812d4.tar.gz
spark-c617083e478e3cfbddc4232060aa7b7a0c5812d4.tar.bz2
spark-c617083e478e3cfbddc4232060aa7b7a0c5812d4.zip
yarn-client addJar fix and misc other
Diffstat (limited to 'yarn/alpha')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala29
1 files changed, 25 insertions, 4 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index ddfec1a4ac..66e38ee840 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -76,6 +76,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
def run() {
+ // Setup the directories so things go to yarn approved directories rather
+ // then user specified and /tmp.
+ System.setProperty("spark.local.dir", getLocalDirs())
+
appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
@@ -103,10 +107,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
- // must be <= timeoutInterval/ 2.
- // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
- // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
- val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+ // we want to be reasonably responsive without causing too many requests to RM.
+ val schedulerInterval =
+ System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+ // must be <= timeoutInterval / 2.
+ val interval = math.min(timeoutInterval / 2, schedulerInterval)
+
reporterThread = launchReporterThread(interval)
// Wait for the reporter thread to Finish.
@@ -119,6 +125,21 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
System.exit(0)
}
+ /** Get the Yarn approved local directories. */
+ private def getLocalDirs(): String = {
+ // Hadoop 0.23 and 2.x have different Environment variable names for the
+ // local dirs, so lets check both. We assume one of the 2 is set.
+ // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+ val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+ .getOrElse(Option(System.getenv("LOCAL_DIRS"))
+ .getOrElse(""))
+
+ if (localDirs.isEmpty()) {
+ throw new Exception("Yarn Local dirs can't be empty")
+ }
+ localDirs
+ }
+
private def getApplicationAttemptId(): ApplicationAttemptId = {
val envs = System.getenv()
val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)