diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-01-02 19:06:40 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-01-02 19:06:40 -0800 |
commit | 498a5f0a1c6e82a33c2ad8c48b68bbdb8da57a95 (patch) | |
tree | 63e2bb81ba4ff47c72566c025a0d090f6d54f69d | |
parent | 0475ca8f81b6b8f21fdb841922cd9ab51cfc8cc3 (diff) | |
parent | fced7885cb6cd09761578f960540d739bcbb465a (diff) | |
download | spark-498a5f0a1c6e82a33c2ad8c48b68bbdb8da57a95.tar.gz spark-498a5f0a1c6e82a33c2ad8c48b68bbdb8da57a95.tar.bz2 spark-498a5f0a1c6e82a33c2ad8c48b68bbdb8da57a95.zip |
Merge pull request #323 from tgravescs/sparkconf_yarn_fix
fix spark on yarn after the sparkConf changes
This fixes it so that spark on yarn now compiles and works after the sparkConf changes.
There are also other issues I discovered along the way that are broken:
- mvn builds for yarn don't assemble correctly
- unset SPARK_EXAMPLES_JAR isn't handled properly anymore
- I'm pretty sure spark.conf doesn't actually work as its not distributed with yarn
those things can be fixed in separate pr unless others disagree.
10 files changed, 101 insertions, 113 deletions
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 91e35e2d34..7c32e0ab9b 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.Utils @@ -60,14 +60,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private var isLastAMRetry: Boolean = true private var amClient: AMRMClient[ContainerRequest] = _ + private val sparkConf = new SparkConf() // Default to numWorkers * 2, with minimum of 3 - private val maxNumWorkerFailures = conf.getOrElse("spark.yarn.max.worker.failures", - math.max(args.numWorkers * 2, 3).toString()).toInt + private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", + math.max(args.numWorkers * 2, 3)) def run() { // Setup the directories so things go to YARN approved directories rather // than user specified and /tmp. - conf.set("spark.local.dir", getLocalDirs()) + System.setProperty("spark.local.dir", getLocalDirs()) + + // set the web ui port to be ephemeral for yarn so we don't conflict with + // other spark processes running on the same box + System.setProperty("spark.ui.port", "0") // Use priority 30 as it's higher then HDFS. It's same priority as MapReduce is using. ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) @@ -89,8 +94,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. - waitForSparkMaster() - waitForSparkContextInitialized() // Do this after Spark master is up and SparkContext is created so that we can register UI Url. @@ -134,30 +137,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) } - private def waitForSparkMaster() { - logInfo("Waiting for Spark driver to be reachable.") - var driverUp = false - var tries = 0 - val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt - while (!driverUp && tries < numTries) { - val driverHost = conf.get("spark.driver.host") - val driverPort = conf.get("spark.driver.port") - try { - val socket = new Socket(driverHost, driverPort.toInt) - socket.close() - logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) - driverUp = true - } catch { - case e: Exception => { - logWarning("Failed to connect to driver at %s:%s, retrying ...". - format(driverHost, driverPort)) - Thread.sleep(100) - tries = tries + 1 - } - } - } - } - private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") val mainMethod = Class.forName( @@ -199,7 +178,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ApplicationMaster.sparkContextRef.synchronized { var numTries = 0 val waitTime = 10000L - val maxNumTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt + val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10) while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries) { logInfo("Waiting for Spark context initialization ... " + numTries) numTries = numTries + 1 @@ -215,7 +194,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e amClient, appAttemptId, args, - sparkContext.preferredNodeLocationData) + sparkContext.preferredNodeLocationData, + sparkContext.getConf) } else { logWarning("Unable to retrieve SparkContext inspite of waiting for %d, maxNumTries = %d". format(numTries * waitTime, maxNumTries)) @@ -223,7 +203,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e yarnConf, amClient, appAttemptId, - args) + args, + sparkContext.getConf) } } } finally { @@ -265,7 +246,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // we want to be reasonably responsive without causing too many requests to RM. val schedulerInterval = - conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) + // must be <= timeoutInterval / 2. val interval = math.min(timeoutInterval / 2, schedulerInterval) @@ -343,7 +325,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private def cleanupStagingDir() { var stagingDirPath: Path = null try { - val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean + val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean if (!preserveFiles) { stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) if (stagingDirPath == null) { diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1bba6a5ae4..a75066888c 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, Records} -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil @@ -57,6 +57,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val credentials = UserGroupInformation.getCurrentUser().getCredentials() private val SPARK_STAGING: String = ".sparkStaging" private val distCacheMgr = new ClientDistributedCacheManager() + private val sparkConf = new SparkConf + // Staging directory is private! -> rwx-------- val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700: Short) @@ -244,7 +246,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } } val dst = new Path(fs.getHomeDirectory(), appStagingDir) - val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort + val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort if (UserGroupInformation.isSecurityEnabled()) { val dstFs = dst.getFileSystem(conf) @@ -437,7 +439,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } def monitorApplication(appId: ApplicationId): Boolean = { - val interval = new SparkConf().getOrElse("spark.yarn.report.interval", "1000").toLong + val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) while (true) { Thread.sleep(interval) @@ -501,7 +503,7 @@ object Client { Path.SEPARATOR + LOG4J_PROP) } // Normally the users app.jar is last in case conflicts with spark jars - val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false") + val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false") .toBoolean if (userClasspathFirst) { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 1a9bb97b3e..7aac2328da 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn import scala.collection.mutable.{ArrayBuffer, HashMap} +import org.apache.spark.SparkConf import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo} import org.apache.spark.util.IntParam import org.apache.spark.util.MemoryParam @@ -35,7 +36,7 @@ class ClientArguments(val args: Array[String]) { var workerMemory = 1024 // MB var workerCores = 1 var numWorkers = 2 - var amQueue = conf.getOrElse("QUEUE", "default") + var amQueue = new SparkConf().get("QUEUE", "default") var amMemory: Int = 512 // MB var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster" var appName: String = "Spark" diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index f7d73f0d83..99b824e129 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ import akka.actor.Terminated -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo @@ -47,8 +47,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte private var driverClosed:Boolean = false private var amClient: AMRMClient[ContainerRequest] = _ + private val sparkConf = new SparkConf - val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0)._1 + val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, + conf = sparkConf)._1 var actor: ActorRef = _ // This actor just working as a monitor to watch on Driver Actor. @@ -136,8 +138,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte Thread.sleep(100) } } - conf.set("spark.driver.host", driverHost) - conf.set("spark.driver.port", driverPort.toString) + sparkConf.set("spark.driver.host", driverHost) + sparkConf.set("spark.driver.port", driverPort.toString) val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) @@ -157,7 +159,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte amClient, appAttemptId, args, - preferredNodeLocationData) + preferredNodeLocationData, + sparkConf) logInfo("Allocating " + args.numWorkers + " workers.") // Wait until all containers have finished diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index abc3447746..85ab08ef34 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -26,7 +26,7 @@ import scala.collection import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils @@ -64,7 +64,8 @@ private[yarn] class YarnAllocationHandler( val workerMemory: Int, val workerCores: Int, val preferredHostToCount: Map[String, Int], - val preferredRackToCount: Map[String, Int]) + val preferredRackToCount: Map[String, Int], + val sparkConf: SparkConf) extends Logging { // These three are locked on allocatedHostToContainersMap. Complementary data structures // allocatedHostToContainersMap : containers which are running : host, Set<containerid> @@ -254,8 +255,8 @@ private[yarn] class YarnAllocationHandler( } else { val workerId = workerIdCounter.incrementAndGet().toString val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( - conf.get("spark.driver.host"), - conf.get("spark.driver.port"), + sparkConf.get("spark.driver.host"), + sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) logInfo("Launching container %s for on host %s".format(containerId, workerHostname)) @@ -565,7 +566,8 @@ object YarnAllocationHandler { conf: Configuration, amClient: AMRMClient[ContainerRequest], appAttemptId: ApplicationAttemptId, - args: ApplicationMasterArguments + args: ApplicationMasterArguments, + sparkConf: SparkConf ): YarnAllocationHandler = { new YarnAllocationHandler( conf, @@ -575,7 +577,8 @@ object YarnAllocationHandler { args.workerMemory, args.workerCores, Map[String, Int](), - Map[String, Int]()) + Map[String, Int](), + sparkConf) } def newAllocator( @@ -584,7 +587,8 @@ object YarnAllocationHandler { appAttemptId: ApplicationAttemptId, args: ApplicationMasterArguments, map: collection.Map[String, - collection.Set[SplitInfo]] + collection.Set[SplitInfo]], + sparkConf: SparkConf ): YarnAllocationHandler = { val (hostToSplitCount, rackToSplitCount) = generateNodeToWeight(conf, map) new YarnAllocationHandler( @@ -595,7 +599,8 @@ object YarnAllocationHandler { args.workerMemory, args.workerCores, hostToSplitCount, - rackToSplitCount) + rackToSplitCount, + sparkConf) } def newAllocator( @@ -605,7 +610,8 @@ object YarnAllocationHandler { maxWorkers: Int, workerMemory: Int, workerCores: Int, - map: collection.Map[String, collection.Set[SplitInfo]] + map: collection.Map[String, collection.Set[SplitInfo]], + sparkConf: SparkConf ): YarnAllocationHandler = { val (hostToCount, rackToCount) = generateNodeToWeight(conf, map) new YarnAllocationHandler( @@ -616,7 +622,8 @@ object YarnAllocationHandler { workerMemory, workerCores, hostToCount, - rackToCount) + rackToCount, + sparkConf) } // A simple method to copy the split info map. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index dc9228180f..7cf120d3eb 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -36,10 +36,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.Utils - class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging { def this(args: ApplicationMasterArguments) = this(args, new Configuration()) @@ -57,14 +56,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) private var isLastAMRetry: Boolean = true - // default to numWorkers * 2, with minimum of 3 - private val maxNumWorkerFailures = conf.getOrElse("spark.yarn.max.worker.failures", - math.max(args.numWorkers * 2, 3).toString()).toInt + + private val sparkConf = new SparkConf() + // Default to numWorkers * 2, with minimum of 3 + private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", + math.max(args.numWorkers * 2, 3)) def run() { // Setup the directories so things go to yarn approved directories rather // then user specified and /tmp. - conf.set("spark.local.dir", getLocalDirs()) + System.setProperty("spark.local.dir", getLocalDirs()) + + // set the web ui port to be ephemeral for yarn so we don't conflict with + // other spark processes running on the same box + System.setProperty("spark.ui.port", "0") // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using. ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) @@ -99,8 +104,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. - waitForSparkMaster() - waitForSparkContextInitialized() // Do this after spark master is up and SparkContext is created so that we can register UI Url @@ -161,30 +164,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e resourceManager.registerApplicationMaster(appMasterRequest) } - private def waitForSparkMaster() { - logInfo("Waiting for spark driver to be reachable.") - var driverUp = false - var tries = 0 - val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt - while(!driverUp && tries < numTries) { - val driverHost = conf.get("spark.driver.host") - val driverPort = conf.get("spark.driver.port") - try { - val socket = new Socket(driverHost, driverPort.toInt) - socket.close() - logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) - driverUp = true - } catch { - case e: Exception => { - logWarning("Failed to connect to driver at %s:%s, retrying ...". - format(driverHost, driverPort)) - Thread.sleep(100) - tries = tries + 1 - } - } - } - } - private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") val mainMethod = Class.forName( @@ -226,7 +205,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ApplicationMaster.sparkContextRef.synchronized { var count = 0 val waitTime = 10000L - val numTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt + val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10) while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) { logInfo("Waiting for spark context initialization ... " + count) count = count + 1 @@ -242,7 +221,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e resourceManager, appAttemptId, args, - sparkContext.preferredNodeLocationData) + sparkContext.preferredNodeLocationData, + sparkContext.getConf) } else { logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d". format(count * waitTime, numTries)) @@ -250,7 +230,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e yarnConf, resourceManager, appAttemptId, - args) + args, + sparkContext.getConf) } } } finally { @@ -294,7 +275,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // we want to be reasonably responsive without causing too many requests to RM. val schedulerInterval = - conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) // must be <= timeoutInterval / 2. val interval = math.min(timeoutInterval / 2, schedulerInterval) @@ -377,7 +358,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private def cleanupStagingDir() { var stagingDirPath: Path = null try { - val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean + val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean if (!preserveFiles) { stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) if (stagingDirPath == null) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 595a7ee8c3..2bd047c97a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, Records} -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil @@ -54,6 +54,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val credentials = UserGroupInformation.getCurrentUser().getCredentials() private val SPARK_STAGING: String = ".sparkStaging" private val distCacheMgr = new ClientDistributedCacheManager() + private val sparkConf = new SparkConf // Staging directory is private! -> rwx-------- val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short) @@ -230,7 +231,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } } val dst = new Path(fs.getHomeDirectory(), appStagingDir) - val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort + val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort if (UserGroupInformation.isSecurityEnabled()) { val dstFs = dst.getFileSystem(conf) @@ -422,7 +423,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } def monitorApplication(appId: ApplicationId): Boolean = { - val interval = new SparkConf().getOrElse("spark.yarn.report.interval", "1000").toLong + val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) while (true) { Thread.sleep(interval) @@ -485,8 +486,7 @@ object Client { Path.SEPARATOR + LOG4J_PROP) } // Normally the users app.jar is last in case conflicts with spark jars - val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false") - .toBoolean + val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false").toBoolean if (userClasspathFirst) { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + APP_JAR) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index e9e46a193b..9075ca71e7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.yarn +import org.apache.spark.SparkConf import org.apache.spark.util.MemoryParam import org.apache.spark.util.IntParam import collection.mutable.{ArrayBuffer, HashMap} @@ -33,7 +34,7 @@ class ClientArguments(val args: Array[String]) { var workerMemory = 1024 var workerCores = 1 var numWorkers = 2 - var amQueue = conf.getOrElse("QUEUE", "default") + var amQueue = new SparkConf().get("QUEUE", "default") var amMemory: Int = 512 var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster" var appName: String = "Spark" diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index c1e79cbe66..a8de89c670 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ import akka.actor.Terminated -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo @@ -46,8 +46,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte private var yarnAllocator: YarnAllocationHandler = null private var driverClosed:Boolean = false + private val sparkConf = new SparkConf - val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0)._1 + val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, + conf = sparkConf)._1 var actor: ActorRef = null // This actor just working as a monitor to watch on Driver Actor. @@ -162,8 +164,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte Thread.sleep(100) } } - conf.set("spark.driver.host", driverHost) - conf.set("spark.driver.port", driverPort.toString) + sparkConf.set("spark.driver.host", driverHost) + sparkConf.set("spark.driver.port", driverPort.toString) val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) @@ -175,9 +177,11 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte private def allocateWorkers() { // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now. - val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map() + val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = + scala.collection.immutable.Map() - yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, preferredNodeLocationData) + yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, + args, preferredNodeLocationData, sparkConf) logInfo("Allocating " + args.numWorkers + " workers.") // Wait until all containers have finished diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 5966a0f757..c8af653b3f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -26,7 +26,7 @@ import scala.collection import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils @@ -62,7 +62,8 @@ private[yarn] class YarnAllocationHandler( val workerMemory: Int, val workerCores: Int, val preferredHostToCount: Map[String, Int], - val preferredRackToCount: Map[String, Int]) + val preferredRackToCount: Map[String, Int], + val sparkConf: SparkConf) extends Logging { // These three are locked on allocatedHostToContainersMap. Complementary data structures // allocatedHostToContainersMap : containers which are running : host, Set<containerid> @@ -239,7 +240,7 @@ private[yarn] class YarnAllocationHandler( // (workerIdCounter) val workerId = workerIdCounter.incrementAndGet().toString val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( - conf.get("spark.driver.host"), conf.get("spark.driver.port"), + sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) logInfo("launching container on " + containerId + " host " + workerHostname) @@ -552,7 +553,8 @@ object YarnAllocationHandler { conf: Configuration, resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId, - args: ApplicationMasterArguments): YarnAllocationHandler = { + args: ApplicationMasterArguments, + sparkConf: SparkConf): YarnAllocationHandler = { new YarnAllocationHandler( conf, @@ -562,7 +564,8 @@ object YarnAllocationHandler { args.workerMemory, args.workerCores, Map[String, Int](), - Map[String, Int]()) + Map[String, Int](), + sparkConf) } def newAllocator( @@ -571,7 +574,8 @@ object YarnAllocationHandler { appAttemptId: ApplicationAttemptId, args: ApplicationMasterArguments, map: collection.Map[String, - collection.Set[SplitInfo]]): YarnAllocationHandler = { + collection.Set[SplitInfo]], + sparkConf: SparkConf): YarnAllocationHandler = { val (hostToCount, rackToCount) = generateNodeToWeight(conf, map) new YarnAllocationHandler( @@ -582,7 +586,8 @@ object YarnAllocationHandler { args.workerMemory, args.workerCores, hostToCount, - rackToCount) + rackToCount, + sparkConf) } def newAllocator( @@ -592,7 +597,8 @@ object YarnAllocationHandler { maxWorkers: Int, workerMemory: Int, workerCores: Int, - map: collection.Map[String, collection.Set[SplitInfo]]): YarnAllocationHandler = { + map: collection.Map[String, collection.Set[SplitInfo]], + sparkConf: SparkConf): YarnAllocationHandler = { val (hostToCount, rackToCount) = generateNodeToWeight(conf, map) @@ -604,7 +610,8 @@ object YarnAllocationHandler { workerMemory, workerCores, hostToCount, - rackToCount) + rackToCount, + sparkConf) } // A simple method to copy the split info map. |