diff options
author | liguoqiang <liguoqiang@rd.tuan800.com> | 2014-01-03 15:34:24 +0800 |
---|---|---|
committer | liguoqiang <liguoqiang@rd.tuan800.com> | 2014-01-03 15:34:24 +0800 |
commit | b27b75f1c595139bdcebbadb43e89b0a7eadf2b5 (patch) | |
tree | b94e48423721ddf274ac035668d4b21ff32e2fde /yarn | |
parent | 010e72c079274cab7c86cbde3bc7fa5c447e2072 (diff) | |
download | spark-b27b75f1c595139bdcebbadb43e89b0a7eadf2b5.tar.gz spark-b27b75f1c595139bdcebbadb43e89b0a7eadf2b5.tar.bz2 spark-b27b75f1c595139bdcebbadb43e89b0a7eadf2b5.zip |
Modify spark on yarn to create SparkConf process
Diffstat (limited to 'yarn')
4 files changed, 36 insertions, 27 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 69170c7427..2bb11e54c5 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -39,9 +39,11 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.Utils -class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging { +class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, + sparkConf: SparkConf) extends Logging { - def this(args: ApplicationMasterArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf) + def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = + this(args, new Configuration(), sparkConf) def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) @@ -126,7 +128,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) .getOrElse(Option(System.getenv("LOCAL_DIRS")) - .getOrElse("")) + .getOrElse("")) if (localDirs.isEmpty()) { throw new Exception("Yarn Local dirs can't be empty") @@ -165,11 +167,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s resourceManager.registerApplicationMaster(appMasterRequest) } - private def startUserClass(): Thread = { + private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") val mainMethod = Class.forName( args.userClass, - false /* initialize */, + false /* initialize */ , Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) val t = new Thread { override def run() { @@ -231,7 +233,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s yarnConf, resourceManager, appAttemptId, - args, + args, sparkContext.getConf) } } @@ -286,7 +288,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s } private def launchReporterThread(_sleepTime: Long): Thread = { - val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime + val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime val t = new Thread { override def run() { @@ -385,6 +387,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir() } } + } object ApplicationMaster { @@ -394,6 +397,7 @@ object ApplicationMaster { // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. private val ALLOCATOR_LOOP_WAIT_COUNT = 30 + def incrementAllocatorLoop(by: Int) { val count = yarnAllocatorLoop.getAndAdd(by) if (count >= ALLOCATOR_LOOP_WAIT_COUNT) { @@ -432,6 +436,7 @@ object ApplicationMaster { // This is not only logs, but also ensures that log system is initialized for this instance // when we are actually 'run'-ing. logInfo("Adding shutdown hook for context " + sc) + override def run() { logInfo("Invoking sc stop from shutdown hook") sc.stop() @@ -440,7 +445,7 @@ object ApplicationMaster { master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } } - } ) + }) } // Wait for initialization to complete and atleast 'some' nodes can get allocated. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 525ea72762..6abb4d5017 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -45,9 +45,11 @@ import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil -class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) extends YarnClientImpl with Logging { +class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) + extends YarnClientImpl with Logging { - def this(args: ClientArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf) + def this(args: ClientArguments, sparkConf: SparkConf) = + this(args, new Configuration(), sparkConf) def this(args: ClientArguments) = this(args, new SparkConf()) @@ -123,7 +125,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e clusterMetrics.getNumNodeManagers) val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue) - logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s, + logInfo( """Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s, queueApplicationCount = %s, queueChildQueueCount = %s""".format( queueInfo.getQueueName, queueInfo.getCurrentCapacity, @@ -143,7 +145,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e } val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD if (amMem > maxMem) { - logError("AM size is to large to run on this cluster " + amMem) + logError("AM size is to large to run on this cluster " + amMem) System.exit(1) } @@ -328,7 +330,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e val prefix = " --args " val args = clientArgs.userArgs val retval = new StringBuilder() - for (arg <- args){ + for (arg <- args) { retval.append(prefix).append(" '").append(arg).append("' ") } retval.toString @@ -467,10 +469,10 @@ object Client { // Note that anything with SPARK prefix gets propagated to all (remote) processes System.setProperty("SPARK_YARN_MODE", "true") - val sparkConf = new SparkConf - val args = new ClientArguments(argStrings,sparkConf) + val sparkConf = new SparkConf + val args = new ClientArguments(argStrings, sparkConf) - new Client(args,sparkConf).run + new Client(args, sparkConf).run } // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 09303ae5c2..8254d628fb 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -24,7 +24,7 @@ import collection.mutable.{ArrayBuffer, HashMap} import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo} // TODO: Add code and support for ensuring that yarn resource 'asks' are location aware ! -class ClientArguments(val args: Array[String],val sparkConf: SparkConf) { +class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { var addJars: String = null var files: String = null var archives: String = null diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 1a792ddf66..300e78612e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -34,9 +34,11 @@ import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo -class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging { +class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) + extends Logging { - def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf) + def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = + this(args, new Configuration(), sparkConf) def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) @@ -47,9 +49,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) private var yarnAllocator: YarnAllocationHandler = null - private var driverClosed:Boolean = false + private var driverClosed: Boolean = false - val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, + val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, conf = sparkConf)._1 var actor: ActorRef = null @@ -83,7 +85,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar if (minimumMemory > 0) { val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD - val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) + val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) if (numCore > 0) { // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406 @@ -104,7 +106,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // must be <= timeoutInterval/ 2. // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. - val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L)) + val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L)) reporterThread = launchReporterThread(interval) // Wait for the reporter thread to Finish. @@ -165,8 +167,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar Thread.sleep(100) } } - sparkConf.set("spark.driver.host", driverHost) - sparkConf.set("spark.driver.port", driverPort.toString) + sparkConf.set("spark.driver.host", driverHost) + sparkConf.set("spark.driver.port", driverPort.toString) val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) @@ -188,7 +190,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // Wait until all containers have finished // TODO: This is a bit ugly. Can we make it nicer? // TODO: Handle container failure - while(yarnAllocator.getNumWorkersRunning < args.numWorkers) { + while (yarnAllocator.getNumWorkersRunning < args.numWorkers) { yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)) Thread.sleep(100) } @@ -199,7 +201,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // TODO: We might want to extend this to allocate more containers in case they die ! private def launchReporterThread(_sleepTime: Long): Thread = { - val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime + val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime val t = new Thread { override def run() { |