diff options
13 files changed, 92 insertions, 66 deletions
diff --git a/.gitignore b/.gitignore index b3c4363af0..399362f7d3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ *~ *.swp +*.ipr *.iml +*.iws .idea/ .settings .cache diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 7cf120d3eb..2bb11e54c5 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -39,11 +39,15 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.Utils -class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging { +class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, + sparkConf: SparkConf) extends Logging { - def this(args: ApplicationMasterArguments) = this(args, new Configuration()) + def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = + this(args, new Configuration(), sparkConf) - private var rpc: YarnRPC = YarnRPC.create(conf) + def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) + + private val rpc: YarnRPC = YarnRPC.create(conf) private var resourceManager: AMRMProtocol = _ private var appAttemptId: ApplicationAttemptId = _ private var userThread: Thread = _ @@ -57,7 +61,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) private var isLastAMRetry: Boolean = true - private val sparkConf = new SparkConf() // Default to numWorkers * 2, with minimum of 3 private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numWorkers * 2, 3)) @@ -125,7 +128,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) .getOrElse(Option(System.getenv("LOCAL_DIRS")) - .getOrElse("")) + .getOrElse("")) if (localDirs.isEmpty()) { throw new Exception("Yarn Local dirs can't be empty") @@ -164,11 +167,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e resourceManager.registerApplicationMaster(appMasterRequest) } - private def startUserClass(): Thread = { + private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") val mainMethod = Class.forName( args.userClass, - false /* initialize */, + false /* initialize */ , Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) val t = new Thread { override def run() { @@ -230,7 +233,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e yarnConf, resourceManager, appAttemptId, - args, + args, sparkContext.getConf) } } @@ -285,7 +288,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } private def launchReporterThread(_sleepTime: Long): Thread = { - val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime + val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime val t = new Thread { override def run() { @@ -384,6 +387,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir() } } + } object ApplicationMaster { @@ -393,6 +397,7 @@ object ApplicationMaster { // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. private val ALLOCATOR_LOOP_WAIT_COUNT = 30 + def incrementAllocatorLoop(by: Int) { val count = yarnAllocatorLoop.getAndAdd(by) if (count >= ALLOCATOR_LOOP_WAIT_COUNT) { @@ -431,6 +436,7 @@ object ApplicationMaster { // This is not only logs, but also ensures that log system is initialized for this instance // when we are actually 'run'-ing. logInfo("Adding shutdown hook for context " + sc) + override def run() { logInfo("Invoking sc stop from shutdown hook") sc.stop() @@ -439,7 +445,7 @@ object ApplicationMaster { master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } } - } ) + }) } // Wait for initialization to complete and atleast 'some' nodes can get allocated. diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 2bd047c97a..6abb4d5017 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -45,16 +45,19 @@ import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil -class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging { +class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) + extends YarnClientImpl with Logging { - def this(args: ClientArguments) = this(new Configuration(), args) + def this(args: ClientArguments, sparkConf: SparkConf) = + this(args, new Configuration(), sparkConf) + + def this(args: ClientArguments) = this(args, new SparkConf()) var rpc: YarnRPC = YarnRPC.create(conf) val yarnConf: YarnConfiguration = new YarnConfiguration(conf) val credentials = UserGroupInformation.getCurrentUser().getCredentials() private val SPARK_STAGING: String = ".sparkStaging" private val distCacheMgr = new ClientDistributedCacheManager() - private val sparkConf = new SparkConf // Staging directory is private! -> rwx-------- val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short) @@ -122,7 +125,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl clusterMetrics.getNumNodeManagers) val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue) - logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s, + logInfo( """Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s, queueApplicationCount = %s, queueChildQueueCount = %s""".format( queueInfo.getQueueName, queueInfo.getCurrentCapacity, @@ -142,7 +145,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD if (amMem > maxMem) { - logError("AM size is to large to run on this cluster " + amMem) + logError("AM size is to large to run on this cluster " + amMem) System.exit(1) } @@ -307,7 +310,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val env = new HashMap[String, String]() - Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env) + Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir @@ -327,7 +330,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val prefix = " --args " val args = clientArgs.userArgs val retval = new StringBuilder() - for (arg <- args){ + for (arg <- args) { retval.append(prefix).append(" '").append(arg).append("' ") } retval.toString @@ -466,9 +469,10 @@ object Client { // Note that anything with SPARK prefix gets propagated to all (remote) processes System.setProperty("SPARK_YARN_MODE", "true") - val args = new ClientArguments(argStrings) + val sparkConf = new SparkConf + val args = new ClientArguments(argStrings, sparkConf) - new Client(args).run + new Client(args, sparkConf).run } // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps @@ -478,7 +482,7 @@ object Client { } } - def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) { + def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) // If log4j present, ensure ours overrides all others if (addLog4j) { @@ -486,7 +490,7 @@ object Client { Path.SEPARATOR + LOG4J_PROP) } // Normally the users app.jar is last in case conflicts with spark jars - val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false").toBoolean + val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean if (userClasspathFirst) { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + APP_JAR) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index e64530702c..ddfec1a4ac 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -34,9 +34,12 @@ import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo -class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging { +class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) + extends Logging { - def this(args: ApplicationMasterArguments) = this(args, new Configuration()) + def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf) + + def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) private val rpc: YarnRPC = YarnRPC.create(conf) private var resourceManager: AMRMProtocol = _ @@ -46,7 +49,6 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte private var yarnAllocator: YarnAllocationHandler = _ private var driverClosed:Boolean = false - private val sparkConf = new SparkConf val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, conf = sparkConf)._1 diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index 4f34bd913e..132630e5ef 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -37,12 +37,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils} -import org.apache.spark.Logging +import org.apache.spark.{SparkConf, Logging} class WorkerRunnable( container: Container, conf: Configuration, + sparkConf: SparkConf, masterAddress: String, slaveId: String, hostname: String, @@ -200,7 +201,7 @@ class WorkerRunnable( def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) + Client.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) // Allow users to specify some environment variables Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index c8af653b3f..e91257be8e 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -261,7 +261,7 @@ private[yarn] class YarnAllocationHandler( } new Thread( - new WorkerRunnable(container, conf, driverUrl, workerId, + new WorkerRunnable(container, conf, sparkConf, driverUrl, workerId, workerHostname, workerMemory, workerCores) ).start() } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 7aac2328da..1419f215c7 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -26,7 +26,7 @@ import org.apache.spark.util.MemoryParam // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware ! -class ClientArguments(val args: Array[String]) { +class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { var addJars: String = null var files: String = null var archives: String = null @@ -36,7 +36,7 @@ class ClientArguments(val args: Array[String]) { var workerMemory = 1024 // MB var workerCores = 1 var numWorkers = 2 - var amQueue = new SparkConf().get("QUEUE", "default") + var amQueue = sparkConf.get("QUEUE", "default") var amMemory: Int = 512 // MB var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster" var appName: String = "Spark" diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 4b69f5078b..324ef4616f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -67,8 +67,8 @@ private[spark] class YarnClientSchedulerBackend( "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher" ) - val args = new ClientArguments(argsArray) - client = new Client(args) + val args = new ClientArguments(argsArray, conf) + client = new Client(args, conf) appId = client.runApp() waitForApp() } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 7c32e0ab9b..69ae14ce83 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -42,11 +42,14 @@ import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.Utils -class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging { +class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, + sparkConf: SparkConf) extends Logging { - def this(args: ApplicationMasterArguments) = this(args, new Configuration()) + def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = + this(args, new Configuration(), sparkConf) + + def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) - private var rpc: YarnRPC = YarnRPC.create(conf) private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) private var appAttemptId: ApplicationAttemptId = _ private var userThread: Thread = _ @@ -60,7 +63,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private var isLastAMRetry: Boolean = true private var amClient: AMRMClient[ContainerRequest] = _ - private val sparkConf = new SparkConf() // Default to numWorkers * 2, with minimum of 3 private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numWorkers * 2, 3)) @@ -115,7 +117,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) .getOrElse(Option(System.getenv("LOCAL_DIRS")) - .getOrElse("")) + .getOrElse("")) if (localDirs.isEmpty()) { throw new Exception("Yarn Local dirs can't be empty") @@ -137,11 +139,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) } - private def startUserClass(): Thread = { + private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") val mainMethod = Class.forName( args.userClass, - false /* initialize */, + false /* initialize */ , Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) val t = new Thread { override def run() { @@ -257,7 +259,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } private def launchReporterThread(_sleepTime: Long): Thread = { - val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime + val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime val t = new Thread { override def run() { @@ -316,7 +318,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e logInfo("finishApplicationMaster with " + status) // Set tracking URL to empty since we don't have a history server. - amClient.unregisterApplicationMaster(status, "" /* appMessage */, "" /* appTrackingUrl */) + amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */) } /** @@ -351,6 +353,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir() } } + } object ApplicationMaster { @@ -401,6 +404,7 @@ object ApplicationMaster { // This is not only logs, but also ensures that log system is initialized for this instance // when we are actually 'run'-ing. logInfo("Adding shutdown hook for context " + sc) + override def run() { logInfo("Invoking sc stop from shutdown hook") sc.stop() @@ -409,7 +413,7 @@ object ApplicationMaster { master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } } - } ) + }) } // Wait for initialization to complete and atleast 'some' nodes can get allocated. diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index a75066888c..440ad5cde5 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -50,23 +50,25 @@ import org.apache.spark.deploy.SparkHadoopUtil * Client submits an application to the global ResourceManager to launch Spark's ApplicationMaster, * which will launch a Spark master process and negotiate resources throughout its duration. */ -class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging { +class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) + extends YarnClientImpl with Logging { + + def this(args: ClientArguments, sparkConf: SparkConf) = + this(args, new Configuration(), sparkConf) + + def this(args: ClientArguments) = this(args, new SparkConf()) var rpc: YarnRPC = YarnRPC.create(conf) val yarnConf: YarnConfiguration = new YarnConfiguration(conf) val credentials = UserGroupInformation.getCurrentUser().getCredentials() private val SPARK_STAGING: String = ".sparkStaging" private val distCacheMgr = new ClientDistributedCacheManager() - private val sparkConf = new SparkConf - // Staging directory is private! -> rwx-------- val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700: Short) // App files are world-wide readable and owner writable -> rw-r--r-- val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644: Short) - def this(args: ClientArguments) = this(new Configuration(), args) - def runApp(): ApplicationId = { validateArgs() // Initialize and start the client service. @@ -143,7 +145,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl clusterMetrics.getNumNodeManagers) val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue) - logInfo("""Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s, + logInfo( """Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s, queueApplicationCount = %s, queueChildQueueCount = %s""".format( queueInfo.getQueueName, queueInfo.getCurrentCapacity, @@ -326,7 +328,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val env = new HashMap[String, String]() - Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env) + Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir @@ -347,7 +349,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val prefix = " --args " val args = clientArgs.userArgs val retval = new StringBuilder() - for (arg <- args){ + for (arg <- args) { retval.append(prefix).append(" '").append(arg).append("' ") } retval.toString @@ -482,10 +484,10 @@ object Client { // Note: anything env variable with SPARK_ prefix gets propagated to all (remote) processes - // see Client#setupLaunchEnv(). System.setProperty("SPARK_YARN_MODE", "true") + val sparkConf = new SparkConf() + val args = new ClientArguments(argStrings, sparkConf) - val args = new ClientArguments(argStrings) - - (new Client(args)).run() + new Client(args, sparkConf).run() } // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps @@ -495,7 +497,7 @@ object Client { } } - def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) { + def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) // If log4j present, ensure ours overrides all others if (addLog4j) { @@ -503,7 +505,7 @@ object Client { Path.SEPARATOR + LOG4J_PROP) } // Normally the users app.jar is last in case conflicts with spark jars - val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false") + val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false") .toBoolean if (userClasspathFirst) { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 9b898b5829..49248a8516 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -35,9 +35,13 @@ import org.apache.spark.scheduler.SplitInfo import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging { +class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) + extends Logging { - def this(args: ApplicationMasterArguments) = this(args, new Configuration()) + def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = + this(args, new Configuration(), sparkConf) + + def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) private var appAttemptId: ApplicationAttemptId = _ private var reporterThread: Thread = _ @@ -47,9 +51,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte private var driverClosed:Boolean = false private var amClient: AMRMClient[ContainerRequest] = _ - private val sparkConf = new SparkConf - val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, + val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, conf = sparkConf)._1 var actor: ActorRef = _ @@ -94,7 +97,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte // must be <= timeoutInterval/ 2. // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. - val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L)) + val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L)) reporterThread = launchReporterThread(interval) // Wait for the reporter thread to Finish. @@ -140,8 +143,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte Thread.sleep(100) } } - sparkConf.set("spark.driver.host", driverHost) - sparkConf.set("spark.driver.port", driverPort.toString) + sparkConf.set("spark.driver.host", driverHost) + sparkConf.set("spark.driver.port", driverPort.toString) val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) @@ -170,7 +173,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte // TODO: Handle container failure yarnAllocator.addResourceRequests(args.numWorkers) - while(yarnAllocator.getNumWorkersRunning < args.numWorkers) { + while (yarnAllocator.getNumWorkersRunning < args.numWorkers) { yarnAllocator.allocateResources() Thread.sleep(100) } @@ -181,7 +184,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte // TODO: We might want to extend this to allocate more containers in case they die ! private def launchReporterThread(_sleepTime: Long): Thread = { - val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime + val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime val t = new Thread { override def run() { @@ -213,7 +216,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte def finishApplicationMaster(status: FinalApplicationStatus) { logInfo("finish ApplicationMaster with " + status) - amClient.unregisterApplicationMaster(status, "" /* appMessage */, "" /* appTrackingUrl */) + amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */) } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index 9f5523c4b9..b7699050bb 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -39,12 +39,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records} -import org.apache.spark.Logging +import org.apache.spark.{SparkConf, Logging} class WorkerRunnable( container: Container, conf: Configuration, + sparkConf: SparkConf, masterAddress: String, slaveId: String, hostname: String, @@ -197,7 +198,7 @@ class WorkerRunnable( def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) + Client.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) // Allow users to specify some environment variables Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 8a9a73f5b4..738ff986d8 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -280,6 +280,7 @@ private[yarn] class YarnAllocationHandler( val workerRunnable = new WorkerRunnable( container, conf, + sparkConf, driverUrl, workerId, workerHostname, |