From 010e72c079274cab7c86cbde3bc7fa5c447e2072 Mon Sep 17 00:00:00 2001 From: liguoqiang Date: Fri, 3 Jan 2014 15:01:38 +0800 Subject: Modify spark on yarn to create SparkConf process --- .../apache/spark/deploy/yarn/ApplicationMaster.scala | 9 +++++---- .../scala/org/apache/spark/deploy/yarn/Client.scala | 18 ++++++++++-------- .../org/apache/spark/deploy/yarn/ClientArguments.scala | 4 ++-- .../org/apache/spark/deploy/yarn/WorkerLauncher.scala | 7 ++++--- .../org/apache/spark/deploy/yarn/WorkerRunnable.scala | 5 +++-- .../spark/deploy/yarn/YarnAllocationHandler.scala | 2 +- .../scheduler/cluster/YarnClientSchedulerBackend.scala | 4 ++-- 7 files changed, 27 insertions(+), 22 deletions(-) (limited to 'yarn') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 7cf120d3eb..69170c7427 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -39,11 +39,13 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.Utils -class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging { +class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging { - def this(args: ApplicationMasterArguments) = this(args, new Configuration()) + def this(args: ApplicationMasterArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf) - private var rpc: YarnRPC = YarnRPC.create(conf) + def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) + + private val rpc: YarnRPC = YarnRPC.create(conf) private var resourceManager: AMRMProtocol = _ private var appAttemptId: ApplicationAttemptId = _ private var userThread: Thread = _ @@ -57,7 +59,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) private var isLastAMRetry: Boolean = true - private val sparkConf = new SparkConf() // Default to numWorkers * 2, with minimum of 3 private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numWorkers * 2, 3)) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 2bd047c97a..525ea72762 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -45,16 +45,17 @@ import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil -class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging { +class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) extends YarnClientImpl with Logging { - def this(args: ClientArguments) = this(new Configuration(), args) + def this(args: ClientArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf) + + def this(args: ClientArguments) = this(args, new SparkConf()) var rpc: YarnRPC = YarnRPC.create(conf) val yarnConf: YarnConfiguration = new YarnConfiguration(conf) val credentials = UserGroupInformation.getCurrentUser().getCredentials() private val SPARK_STAGING: String = ".sparkStaging" private val distCacheMgr = new ClientDistributedCacheManager() - private val sparkConf = new SparkConf // Staging directory is private! -> rwx-------- val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short) @@ -307,7 +308,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val env = new HashMap[String, String]() - Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env) + Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir @@ -466,9 +467,10 @@ object Client { // Note that anything with SPARK prefix gets propagated to all (remote) processes System.setProperty("SPARK_YARN_MODE", "true") - val args = new ClientArguments(argStrings) + val sparkConf = new SparkConf + val args = new ClientArguments(argStrings,sparkConf) - new Client(args).run + new Client(args,sparkConf).run } // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps @@ -478,7 +480,7 @@ object Client { } } - def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) { + def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) // If log4j present, ensure ours overrides all others if (addLog4j) { @@ -486,7 +488,7 @@ object Client { Path.SEPARATOR + LOG4J_PROP) } // Normally the users app.jar is last in case conflicts with spark jars - val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false").toBoolean + val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean if (userClasspathFirst) { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + APP_JAR) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 9075ca71e7..09303ae5c2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -24,7 +24,7 @@ import collection.mutable.{ArrayBuffer, HashMap} import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo} // TODO: Add code and support for ensuring that yarn resource 'asks' are location aware ! -class ClientArguments(val args: Array[String]) { +class ClientArguments(val args: Array[String],val sparkConf: SparkConf) { var addJars: String = null var files: String = null var archives: String = null @@ -34,7 +34,7 @@ class ClientArguments(val args: Array[String]) { var workerMemory = 1024 var workerCores = 1 var numWorkers = 2 - var amQueue = new SparkConf().get("QUEUE", "default") + var amQueue = sparkConf.get("QUEUE", "default") var amMemory: Int = 512 var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster" var appName: String = "Spark" diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index a8de89c670..1a792ddf66 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -34,9 +34,11 @@ import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo -class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging { +class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging { - def this(args: ApplicationMasterArguments) = this(args, new Configuration()) + def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf) + + def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) private val rpc: YarnRPC = YarnRPC.create(conf) private var resourceManager: AMRMProtocol = null @@ -46,7 +48,6 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte private var yarnAllocator: YarnAllocationHandler = null private var driverClosed:Boolean = false - private val sparkConf = new SparkConf val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, conf = sparkConf)._1 diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index 6a90cc51cf..5e5d0421ba 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -37,12 +37,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils} -import org.apache.spark.Logging +import org.apache.spark.{SparkConf, Logging} class WorkerRunnable( container: Container, conf: Configuration, + sparkConf: SparkConf, masterAddress: String, slaveId: String, hostname: String, @@ -200,7 +201,7 @@ class WorkerRunnable( def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) + Client.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) // Allow users to specify some environment variables Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index c8af653b3f..e91257be8e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -261,7 +261,7 @@ private[yarn] class YarnAllocationHandler( } new Thread( - new WorkerRunnable(container, conf, driverUrl, workerId, + new WorkerRunnable(container, conf, sparkConf, driverUrl, workerId, workerHostname, workerMemory, workerCores) ).start() } diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 4b69f5078b..324ef4616f 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -67,8 +67,8 @@ private[spark] class YarnClientSchedulerBackend( "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher" ) - val args = new ClientArguments(argsArray) - client = new Client(args) + val args = new ClientArguments(argsArray, conf) + client = new Client(args, conf) appId = client.runApp() waitForApp() } -- cgit v1.2.3