From c6de982be69cd50e66375cfea3d6c3267a01583b Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Thu, 2 Jan 2014 16:50:35 -0600 Subject: Fix yarn build after sparkConf changes --- .../spark/deploy/yarn/ApplicationMaster.scala | 52 +++++++--------------- .../org/apache/spark/deploy/yarn/Client.scala | 10 +++-- .../apache/spark/deploy/yarn/ClientArguments.scala | 3 +- .../apache/spark/deploy/yarn/WorkerLauncher.scala | 8 ++-- .../spark/deploy/yarn/YarnAllocationHandler.scala | 27 ++++++----- 5 files changed, 47 insertions(+), 53 deletions(-) (limited to 'new-yarn/src/main/scala/org/apache') diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 91e35e2d34..7c32e0ab9b 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.Utils @@ -60,14 +60,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private var isLastAMRetry: Boolean = true private var amClient: AMRMClient[ContainerRequest] = _ + private val sparkConf = new SparkConf() // Default to numWorkers * 2, with minimum of 3 - private val maxNumWorkerFailures = conf.getOrElse("spark.yarn.max.worker.failures", - math.max(args.numWorkers * 2, 3).toString()).toInt + private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", + math.max(args.numWorkers * 2, 3)) def run() { // Setup the directories so things go to YARN approved directories rather // than user specified and /tmp. - conf.set("spark.local.dir", getLocalDirs()) + System.setProperty("spark.local.dir", getLocalDirs()) + + // set the web ui port to be ephemeral for yarn so we don't conflict with + // other spark processes running on the same box + System.setProperty("spark.ui.port", "0") // Use priority 30 as it's higher then HDFS. It's same priority as MapReduce is using. ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) @@ -89,8 +94,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. - waitForSparkMaster() - waitForSparkContextInitialized() // Do this after Spark master is up and SparkContext is created so that we can register UI Url. @@ -134,30 +137,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) } - private def waitForSparkMaster() { - logInfo("Waiting for Spark driver to be reachable.") - var driverUp = false - var tries = 0 - val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt - while (!driverUp && tries < numTries) { - val driverHost = conf.get("spark.driver.host") - val driverPort = conf.get("spark.driver.port") - try { - val socket = new Socket(driverHost, driverPort.toInt) - socket.close() - logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) - driverUp = true - } catch { - case e: Exception => { - logWarning("Failed to connect to driver at %s:%s, retrying ...". - format(driverHost, driverPort)) - Thread.sleep(100) - tries = tries + 1 - } - } - } - } - private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") val mainMethod = Class.forName( @@ -199,7 +178,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ApplicationMaster.sparkContextRef.synchronized { var numTries = 0 val waitTime = 10000L - val maxNumTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt + val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10) while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries) { logInfo("Waiting for Spark context initialization ... " + numTries) numTries = numTries + 1 @@ -215,7 +194,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e amClient, appAttemptId, args, - sparkContext.preferredNodeLocationData) + sparkContext.preferredNodeLocationData, + sparkContext.getConf) } else { logWarning("Unable to retrieve SparkContext inspite of waiting for %d, maxNumTries = %d". format(numTries * waitTime, maxNumTries)) @@ -223,7 +203,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e yarnConf, amClient, appAttemptId, - args) + args, + sparkContext.getConf) } } } finally { @@ -265,7 +246,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // we want to be reasonably responsive without causing too many requests to RM. val schedulerInterval = - conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) + // must be <= timeoutInterval / 2. val interval = math.min(timeoutInterval / 2, schedulerInterval) @@ -343,7 +325,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private def cleanupStagingDir() { var stagingDirPath: Path = null try { - val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean + val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean if (!preserveFiles) { stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) if (stagingDirPath == null) { diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1bba6a5ae4..a75066888c 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, Records} -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil @@ -57,6 +57,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val credentials = UserGroupInformation.getCurrentUser().getCredentials() private val SPARK_STAGING: String = ".sparkStaging" private val distCacheMgr = new ClientDistributedCacheManager() + private val sparkConf = new SparkConf + // Staging directory is private! -> rwx-------- val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700: Short) @@ -244,7 +246,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } } val dst = new Path(fs.getHomeDirectory(), appStagingDir) - val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort + val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort if (UserGroupInformation.isSecurityEnabled()) { val dstFs = dst.getFileSystem(conf) @@ -437,7 +439,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } def monitorApplication(appId: ApplicationId): Boolean = { - val interval = new SparkConf().getOrElse("spark.yarn.report.interval", "1000").toLong + val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) while (true) { Thread.sleep(interval) @@ -501,7 +503,7 @@ object Client { Path.SEPARATOR + LOG4J_PROP) } // Normally the users app.jar is last in case conflicts with spark jars - val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false") + val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false") .toBoolean if (userClasspathFirst) { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 1a9bb97b3e..7aac2328da 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn import scala.collection.mutable.{ArrayBuffer, HashMap} +import org.apache.spark.SparkConf import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo} import org.apache.spark.util.IntParam import org.apache.spark.util.MemoryParam @@ -35,7 +36,7 @@ class ClientArguments(val args: Array[String]) { var workerMemory = 1024 // MB var workerCores = 1 var numWorkers = 2 - var amQueue = conf.getOrElse("QUEUE", "default") + var amQueue = new SparkConf().get("QUEUE", "default") var amMemory: Int = 512 // MB var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster" var appName: String = "Spark" diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index f7d73f0d83..32c774c90e 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ import akka.actor.Terminated -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo @@ -48,7 +48,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte private var amClient: AMRMClient[ContainerRequest] = _ - val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0)._1 + val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, + conf = new SparkConf)._1 var actor: ActorRef = _ // This actor just working as a monitor to watch on Driver Actor. @@ -157,7 +158,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte amClient, appAttemptId, args, - preferredNodeLocationData) + preferredNodeLocationData, + new SparkConf) logInfo("Allocating " + args.numWorkers + " workers.") // Wait until all containers have finished diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index abc3447746..85ab08ef34 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -26,7 +26,7 @@ import scala.collection import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils @@ -64,7 +64,8 @@ private[yarn] class YarnAllocationHandler( val workerMemory: Int, val workerCores: Int, val preferredHostToCount: Map[String, Int], - val preferredRackToCount: Map[String, Int]) + val preferredRackToCount: Map[String, Int], + val sparkConf: SparkConf) extends Logging { // These three are locked on allocatedHostToContainersMap. Complementary data structures // allocatedHostToContainersMap : containers which are running : host, Set @@ -254,8 +255,8 @@ private[yarn] class YarnAllocationHandler( } else { val workerId = workerIdCounter.incrementAndGet().toString val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( - conf.get("spark.driver.host"), - conf.get("spark.driver.port"), + sparkConf.get("spark.driver.host"), + sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) logInfo("Launching container %s for on host %s".format(containerId, workerHostname)) @@ -565,7 +566,8 @@ object YarnAllocationHandler { conf: Configuration, amClient: AMRMClient[ContainerRequest], appAttemptId: ApplicationAttemptId, - args: ApplicationMasterArguments + args: ApplicationMasterArguments, + sparkConf: SparkConf ): YarnAllocationHandler = { new YarnAllocationHandler( conf, @@ -575,7 +577,8 @@ object YarnAllocationHandler { args.workerMemory, args.workerCores, Map[String, Int](), - Map[String, Int]()) + Map[String, Int](), + sparkConf) } def newAllocator( @@ -584,7 +587,8 @@ object YarnAllocationHandler { appAttemptId: ApplicationAttemptId, args: ApplicationMasterArguments, map: collection.Map[String, - collection.Set[SplitInfo]] + collection.Set[SplitInfo]], + sparkConf: SparkConf ): YarnAllocationHandler = { val (hostToSplitCount, rackToSplitCount) = generateNodeToWeight(conf, map) new YarnAllocationHandler( @@ -595,7 +599,8 @@ object YarnAllocationHandler { args.workerMemory, args.workerCores, hostToSplitCount, - rackToSplitCount) + rackToSplitCount, + sparkConf) } def newAllocator( @@ -605,7 +610,8 @@ object YarnAllocationHandler { maxWorkers: Int, workerMemory: Int, workerCores: Int, - map: collection.Map[String, collection.Set[SplitInfo]] + map: collection.Map[String, collection.Set[SplitInfo]], + sparkConf: SparkConf ): YarnAllocationHandler = { val (hostToCount, rackToCount) = generateNodeToWeight(conf, map) new YarnAllocationHandler( @@ -616,7 +622,8 @@ object YarnAllocationHandler { workerMemory, workerCores, hostToCount, - rackToCount) + rackToCount, + sparkConf) } // A simple method to copy the split info map. -- cgit v1.2.3