diff options
author | Thomas Graves <tgraves@apache.org> | 2014-01-09 09:53:51 -0600 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2014-01-09 10:24:35 -0600 |
commit | c617083e478e3cfbddc4232060aa7b7a0c5812d4 (patch) | |
tree | 1614e57257d4fccef5aff928a9faa89a8a387861 /yarn | |
parent | 365cac94652cd012bf3783f74eed98c95b884bbb (diff) | |
download | spark-c617083e478e3cfbddc4232060aa7b7a0c5812d4.tar.gz spark-c617083e478e3cfbddc4232060aa7b7a0c5812d4.tar.bz2 spark-c617083e478e3cfbddc4232060aa7b7a0c5812d4.zip |
yarn-client addJar fix and misc other
Diffstat (limited to 'yarn')
3 files changed, 77 insertions, 31 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index ddfec1a4ac..66e38ee840 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -76,6 +76,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar def run() { + // Setup the directories so things go to yarn approved directories rather + // then user specified and /tmp. + System.setProperty("spark.local.dir", getLocalDirs()) + appAttemptId = getApplicationAttemptId() resourceManager = registerWithResourceManager() val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() @@ -103,10 +107,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - // must be <= timeoutInterval/ 2. - // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. - // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. - val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L)) + // we want to be reasonably responsive without causing too many requests to RM. + val schedulerInterval = + System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + // must be <= timeoutInterval / 2. + val interval = math.min(timeoutInterval / 2, schedulerInterval) + reporterThread = launchReporterThread(interval) // Wait for the reporter thread to Finish. @@ -119,6 +125,21 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar System.exit(0) } + /** Get the Yarn approved local directories. */ + private def getLocalDirs(): String = { + // Hadoop 0.23 and 2.x have different Environment variable names for the + // local dirs, so lets check both. We assume one of the 2 is set. + // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X + val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) + .getOrElse(Option(System.getenv("LOCAL_DIRS")) + .getOrElse("")) + + if (localDirs.isEmpty()) { + throw new Exception("Yarn Local dirs can't be empty") + } + localDirs + } + private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV) diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 4b1b5da048..22e55e0c60 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -22,6 +22,8 @@ import org.apache.spark.{SparkException, Logging, SparkContext} import org.apache.spark.deploy.yarn.{Client, ClientArguments} import org.apache.spark.scheduler.TaskSchedulerImpl +import scala.collection.mutable.ArrayBuffer + private[spark] class YarnClientSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) @@ -31,45 +33,47 @@ private[spark] class YarnClientSchedulerBackend( var client: Client = null var appId: ApplicationId = null + private[spark] def addArg(optionName: String, optionalParam: String, arrayBuf: ArrayBuffer[String]) { + Option(System.getenv(optionalParam)) foreach { + optParam => { + arrayBuf += (optionName, optParam) + } + } + } + override def start() { super.start() - val defalutWorkerCores = "2" - val defalutWorkerMemory = "512m" - val defaultWorkerNumber = "1" - val userJar = System.getenv("SPARK_YARN_APP_JAR") - val distFiles = System.getenv("SPARK_YARN_DIST_FILES") - var workerCores = System.getenv("SPARK_WORKER_CORES") - var workerMemory = System.getenv("SPARK_WORKER_MEMORY") - var workerNumber = System.getenv("SPARK_WORKER_INSTANCES") - if (userJar == null) throw new SparkException("env SPARK_YARN_APP_JAR is not set") - if (workerCores == null) - workerCores = defalutWorkerCores - if (workerMemory == null) - workerMemory = defalutWorkerMemory - if (workerNumber == null) - workerNumber = defaultWorkerNumber - val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort - val argsArray = Array[String]( + val argsArrayBuf = new ArrayBuffer[String]() + argsArrayBuf += ( "--class", "notused", "--jar", userJar, "--args", hostport, - "--worker-memory", workerMemory, - "--worker-cores", workerCores, - "--num-workers", workerNumber, - "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher", - "--files", distFiles + "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher" ) - val args = new ClientArguments(argsArray, conf) + // process any optional arguments, use the defaults already defined in ClientArguments + // if things aren't specified + Map("--master-memory" -> "SPARK_MASTER_MEMORY", + "--num-workers" -> "SPARK_WORKER_INSTANCES", + "--worker-memory" -> "SPARK_WORKER_MEMORY", + "--worker-cores" -> "SPARK_WORKER_CORES", + "--queue" -> "SPARK_YARN_QUEUE", + "--name" -> "SPARK_YARN_APP_NAME", + "--files" -> "SPARK_YARN_DIST_FILES", + "--archives" -> "SPARK_YARN_DIST_ARCHIVES") + .foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) } + + logDebug("ClientArguments called with: " + argsArrayBuf) + val args = new ClientArguments(argsArrayBuf.toArray, conf) client = new Client(args, conf) appId = client.runApp() waitForApp() diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 49248a8516..3e3a4672b4 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -78,6 +78,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar def run() { + // Setup the directories so things go to yarn approved directories rather + // then user specified and /tmp. + System.setProperty("spark.local.dir", getLocalDirs()) + amClient = AMRMClient.createAMRMClient() amClient.init(yarnConf) amClient.start() @@ -94,10 +98,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - // must be <= timeoutInterval/ 2. - // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. - // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. - val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L)) + // we want to be reasonably responsive without causing too many requests to RM. + val schedulerInterval = + System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + // must be <= timeoutInterval / 2. + val interval = math.min(timeoutInterval / 2, schedulerInterval) + reporterThread = launchReporterThread(interval) // Wait for the reporter thread to Finish. @@ -110,6 +116,21 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar System.exit(0) } + /** Get the Yarn approved local directories. */ + private def getLocalDirs(): String = { + // Hadoop 0.23 and 2.x have different Environment variable names for the + // local dirs, so lets check both. We assume one of the 2 is set. + // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X + val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) + .getOrElse(Option(System.getenv("LOCAL_DIRS")) + .getOrElse("")) + + if (localDirs.isEmpty()) { + throw new Exception("Yarn Local dirs can't be empty") + } + localDirs + } + private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name()) |