From c6de982be69cd50e66375cfea3d6c3267a01583b Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Thu, 2 Jan 2014 16:50:35 -0600 Subject: Fix yarn build after sparkConf changes --- .../spark/deploy/yarn/ApplicationMaster.scala | 55 +++++++--------------- .../org/apache/spark/deploy/yarn/Client.scala | 10 ++-- .../apache/spark/deploy/yarn/ClientArguments.scala | 3 +- .../apache/spark/deploy/yarn/WorkerLauncher.scala | 11 +++-- .../spark/deploy/yarn/YarnAllocationHandler.scala | 25 ++++++---- 5 files changed, 48 insertions(+), 56 deletions(-) (limited to 'yarn') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index dc9228180f..7cf120d3eb 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -36,10 +36,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.Utils - class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging { def this(args: ApplicationMasterArguments) = this(args, new Configuration()) @@ -57,14 +56,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) private var isLastAMRetry: Boolean = true - // default to numWorkers * 2, with minimum of 3 - private val maxNumWorkerFailures = conf.getOrElse("spark.yarn.max.worker.failures", - math.max(args.numWorkers * 2, 3).toString()).toInt + + private val sparkConf = new SparkConf() + // Default to numWorkers * 2, with minimum of 3 + private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", + math.max(args.numWorkers * 2, 3)) def run() { // Setup the directories so things go to yarn approved directories rather // then user specified and /tmp. - conf.set("spark.local.dir", getLocalDirs()) + System.setProperty("spark.local.dir", getLocalDirs()) + + // set the web ui port to be ephemeral for yarn so we don't conflict with + // other spark processes running on the same box + System.setProperty("spark.ui.port", "0") // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using. ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) @@ -99,8 +104,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. - waitForSparkMaster() - waitForSparkContextInitialized() // Do this after spark master is up and SparkContext is created so that we can register UI Url @@ -161,30 +164,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e resourceManager.registerApplicationMaster(appMasterRequest) } - private def waitForSparkMaster() { - logInfo("Waiting for spark driver to be reachable.") - var driverUp = false - var tries = 0 - val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt - while(!driverUp && tries < numTries) { - val driverHost = conf.get("spark.driver.host") - val driverPort = conf.get("spark.driver.port") - try { - val socket = new Socket(driverHost, driverPort.toInt) - socket.close() - logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) - driverUp = true - } catch { - case e: Exception => { - logWarning("Failed to connect to driver at %s:%s, retrying ...". - format(driverHost, driverPort)) - Thread.sleep(100) - tries = tries + 1 - } - } - } - } - private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") val mainMethod = Class.forName( @@ -226,7 +205,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ApplicationMaster.sparkContextRef.synchronized { var count = 0 val waitTime = 10000L - val numTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt + val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10) while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) { logInfo("Waiting for spark context initialization ... " + count) count = count + 1 @@ -242,7 +221,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e resourceManager, appAttemptId, args, - sparkContext.preferredNodeLocationData) + sparkContext.preferredNodeLocationData, + sparkContext.getConf) } else { logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d". format(count * waitTime, numTries)) @@ -250,7 +230,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e yarnConf, resourceManager, appAttemptId, - args) + args, + sparkContext.getConf) } } } finally { @@ -294,7 +275,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // we want to be reasonably responsive without causing too many requests to RM. val schedulerInterval = - conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) // must be <= timeoutInterval / 2. val interval = math.min(timeoutInterval / 2, schedulerInterval) @@ -377,7 +358,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private def cleanupStagingDir() { var stagingDirPath: Path = null try { - val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean + val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean if (!preserveFiles) { stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) if (stagingDirPath == null) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 595a7ee8c3..2bd047c97a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, Records} -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil @@ -54,6 +54,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val credentials = UserGroupInformation.getCurrentUser().getCredentials() private val SPARK_STAGING: String = ".sparkStaging" private val distCacheMgr = new ClientDistributedCacheManager() + private val sparkConf = new SparkConf // Staging directory is private! -> rwx-------- val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short) @@ -230,7 +231,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } } val dst = new Path(fs.getHomeDirectory(), appStagingDir) - val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort + val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort if (UserGroupInformation.isSecurityEnabled()) { val dstFs = dst.getFileSystem(conf) @@ -422,7 +423,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } def monitorApplication(appId: ApplicationId): Boolean = { - val interval = new SparkConf().getOrElse("spark.yarn.report.interval", "1000").toLong + val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) while (true) { Thread.sleep(interval) @@ -485,8 +486,7 @@ object Client { Path.SEPARATOR + LOG4J_PROP) } // Normally the users app.jar is last in case conflicts with spark jars - val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false") - .toBoolean + val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false").toBoolean if (userClasspathFirst) { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + APP_JAR) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index e9e46a193b..9075ca71e7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.yarn +import org.apache.spark.SparkConf import org.apache.spark.util.MemoryParam import org.apache.spark.util.IntParam import collection.mutable.{ArrayBuffer, HashMap} @@ -33,7 +34,7 @@ class ClientArguments(val args: Array[String]) { var workerMemory = 1024 var workerCores = 1 var numWorkers = 2 - var amQueue = conf.getOrElse("QUEUE", "default") + var amQueue = new SparkConf().get("QUEUE", "default") var amMemory: Int = 512 var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster" var appName: String = "Spark" diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index c1e79cbe66..28259de68f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ import akka.actor.Terminated -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo @@ -47,7 +47,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte private var yarnAllocator: YarnAllocationHandler = null private var driverClosed:Boolean = false - val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0)._1 + val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, + conf = new SparkConf)._1 var actor: ActorRef = null // This actor just working as a monitor to watch on Driver Actor. @@ -175,9 +176,11 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte private def allocateWorkers() { // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now. - val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map() + val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = + scala.collection.immutable.Map() - yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, preferredNodeLocationData) + yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, + args, preferredNodeLocationData, new SparkConf) logInfo("Allocating " + args.numWorkers + " workers.") // Wait until all containers have finished diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 5966a0f757..c8af653b3f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -26,7 +26,7 @@ import scala.collection import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils @@ -62,7 +62,8 @@ private[yarn] class YarnAllocationHandler( val workerMemory: Int, val workerCores: Int, val preferredHostToCount: Map[String, Int], - val preferredRackToCount: Map[String, Int]) + val preferredRackToCount: Map[String, Int], + val sparkConf: SparkConf) extends Logging { // These three are locked on allocatedHostToContainersMap. Complementary data structures // allocatedHostToContainersMap : containers which are running : host, Set @@ -239,7 +240,7 @@ private[yarn] class YarnAllocationHandler( // (workerIdCounter) val workerId = workerIdCounter.incrementAndGet().toString val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( - conf.get("spark.driver.host"), conf.get("spark.driver.port"), + sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) logInfo("launching container on " + containerId + " host " + workerHostname) @@ -552,7 +553,8 @@ object YarnAllocationHandler { conf: Configuration, resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId, - args: ApplicationMasterArguments): YarnAllocationHandler = { + args: ApplicationMasterArguments, + sparkConf: SparkConf): YarnAllocationHandler = { new YarnAllocationHandler( conf, @@ -562,7 +564,8 @@ object YarnAllocationHandler { args.workerMemory, args.workerCores, Map[String, Int](), - Map[String, Int]()) + Map[String, Int](), + sparkConf) } def newAllocator( @@ -571,7 +574,8 @@ object YarnAllocationHandler { appAttemptId: ApplicationAttemptId, args: ApplicationMasterArguments, map: collection.Map[String, - collection.Set[SplitInfo]]): YarnAllocationHandler = { + collection.Set[SplitInfo]], + sparkConf: SparkConf): YarnAllocationHandler = { val (hostToCount, rackToCount) = generateNodeToWeight(conf, map) new YarnAllocationHandler( @@ -582,7 +586,8 @@ object YarnAllocationHandler { args.workerMemory, args.workerCores, hostToCount, - rackToCount) + rackToCount, + sparkConf) } def newAllocator( @@ -592,7 +597,8 @@ object YarnAllocationHandler { maxWorkers: Int, workerMemory: Int, workerCores: Int, - map: collection.Map[String, collection.Set[SplitInfo]]): YarnAllocationHandler = { + map: collection.Map[String, collection.Set[SplitInfo]], + sparkConf: SparkConf): YarnAllocationHandler = { val (hostToCount, rackToCount) = generateNodeToWeight(conf, map) @@ -604,7 +610,8 @@ object YarnAllocationHandler { workerMemory, workerCores, hostToCount, - rackToCount) + rackToCount, + sparkConf) } // A simple method to copy the split info map. -- cgit v1.2.3 From fced7885cb6cd09761578f960540d739bcbb465a Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Thu, 2 Jan 2014 17:11:16 -0600 Subject: fix yarn-client --- .../main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala | 9 +++++---- .../main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala | 9 +++++---- 2 files changed, 10 insertions(+), 8 deletions(-) (limited to 'yarn') diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 32c774c90e..99b824e129 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -47,9 +47,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte private var driverClosed:Boolean = false private var amClient: AMRMClient[ContainerRequest] = _ + private val sparkConf = new SparkConf val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, - conf = new SparkConf)._1 + conf = sparkConf)._1 var actor: ActorRef = _ // This actor just working as a monitor to watch on Driver Actor. @@ -137,8 +138,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte Thread.sleep(100) } } - conf.set("spark.driver.host", driverHost) - conf.set("spark.driver.port", driverPort.toString) + sparkConf.set("spark.driver.host", driverHost) + sparkConf.set("spark.driver.port", driverPort.toString) val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) @@ -159,7 +160,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte appAttemptId, args, preferredNodeLocationData, - new SparkConf) + sparkConf) logInfo("Allocating " + args.numWorkers + " workers.") // Wait until all containers have finished diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 28259de68f..a8de89c670 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -46,9 +46,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte private var yarnAllocator: YarnAllocationHandler = null private var driverClosed:Boolean = false + private val sparkConf = new SparkConf val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, - conf = new SparkConf)._1 + conf = sparkConf)._1 var actor: ActorRef = null // This actor just working as a monitor to watch on Driver Actor. @@ -163,8 +164,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte Thread.sleep(100) } } - conf.set("spark.driver.host", driverHost) - conf.set("spark.driver.port", driverPort.toString) + sparkConf.set("spark.driver.host", driverHost) + sparkConf.set("spark.driver.port", driverPort.toString) val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) @@ -180,7 +181,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte scala.collection.immutable.Map() yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, - args, preferredNodeLocationData, new SparkConf) + args, preferredNodeLocationData, sparkConf) logInfo("Allocating " + args.numWorkers + " workers.") // Wait until all containers have finished -- cgit v1.2.3