aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala26
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala26
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala8
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala5
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala2
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala4
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala4
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala24
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala28
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala23
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala5
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala1
13 files changed, 92 insertions, 66 deletions
diff --git a/.gitignore b/.gitignore
index b3c4363af0..399362f7d3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,8 @@
*~
*.swp
+*.ipr
*.iml
+*.iws
.idea/
.settings
.cache
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 7cf120d3eb..2bb11e54c5 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -39,11 +39,15 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.util.Utils
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
+ sparkConf: SparkConf) extends Logging {
- def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+ def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+ this(args, new Configuration(), sparkConf)
- private var rpc: YarnRPC = YarnRPC.create(conf)
+ def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
+
+ private val rpc: YarnRPC = YarnRPC.create(conf)
private var resourceManager: AMRMProtocol = _
private var appAttemptId: ApplicationAttemptId = _
private var userThread: Thread = _
@@ -57,7 +61,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true
- private val sparkConf = new SparkConf()
// Default to numWorkers * 2, with minimum of 3
private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3))
@@ -125,7 +128,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.getOrElse(Option(System.getenv("LOCAL_DIRS"))
- .getOrElse(""))
+ .getOrElse(""))
if (localDirs.isEmpty()) {
throw new Exception("Yarn Local dirs can't be empty")
@@ -164,11 +167,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
resourceManager.registerApplicationMaster(appMasterRequest)
}
- private def startUserClass(): Thread = {
+ private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
val mainMethod = Class.forName(
args.userClass,
- false /* initialize */,
+ false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
@@ -230,7 +233,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
yarnConf,
resourceManager,
appAttemptId,
- args,
+ args,
sparkContext.getConf)
}
}
@@ -285,7 +288,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+ val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
val t = new Thread {
override def run() {
@@ -384,6 +387,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
}
}
+
}
object ApplicationMaster {
@@ -393,6 +397,7 @@ object ApplicationMaster {
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
+
def incrementAllocatorLoop(by: Int) {
val count = yarnAllocatorLoop.getAndAdd(by)
if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
@@ -431,6 +436,7 @@ object ApplicationMaster {
// This is not only logs, but also ensures that log system is initialized for this instance
// when we are actually 'run'-ing.
logInfo("Adding shutdown hook for context " + sc)
+
override def run() {
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
@@ -439,7 +445,7 @@ object ApplicationMaster {
master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
}
}
- } )
+ })
}
// Wait for initialization to complete and atleast 'some' nodes can get allocated.
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 2bd047c97a..6abb4d5017 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -45,16 +45,19 @@ import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil
-class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
+class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
+ extends YarnClientImpl with Logging {
- def this(args: ClientArguments) = this(new Configuration(), args)
+ def this(args: ClientArguments, sparkConf: SparkConf) =
+ this(args, new Configuration(), sparkConf)
+
+ def this(args: ClientArguments) = this(args, new SparkConf())
var rpc: YarnRPC = YarnRPC.create(conf)
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
private val SPARK_STAGING: String = ".sparkStaging"
private val distCacheMgr = new ClientDistributedCacheManager()
- private val sparkConf = new SparkConf
// Staging directory is private! -> rwx--------
val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
@@ -122,7 +125,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
clusterMetrics.getNumNodeManagers)
val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
- logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
+ logInfo( """Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
queueApplicationCount = %s, queueChildQueueCount = %s""".format(
queueInfo.getQueueName,
queueInfo.getCurrentCapacity,
@@ -142,7 +145,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
if (amMem > maxMem) {
- logError("AM size is to large to run on this cluster " + amMem)
+ logError("AM size is to large to run on this cluster " + amMem)
System.exit(1)
}
@@ -307,7 +310,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val env = new HashMap[String, String]()
- Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
+ Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
@@ -327,7 +330,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val prefix = " --args "
val args = clientArgs.userArgs
val retval = new StringBuilder()
- for (arg <- args){
+ for (arg <- args) {
retval.append(prefix).append(" '").append(arg).append("' ")
}
retval.toString
@@ -466,9 +469,10 @@ object Client {
// Note that anything with SPARK prefix gets propagated to all (remote) processes
System.setProperty("SPARK_YARN_MODE", "true")
- val args = new ClientArguments(argStrings)
+ val sparkConf = new SparkConf
+ val args = new ClientArguments(argStrings, sparkConf)
- new Client(args).run
+ new Client(args, sparkConf).run
}
// Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
@@ -478,7 +482,7 @@ object Client {
}
}
- def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
+ def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
// If log4j present, ensure ours overrides all others
if (addLog4j) {
@@ -486,7 +490,7 @@ object Client {
Path.SEPARATOR + LOG4J_PROP)
}
// Normally the users app.jar is last in case conflicts with spark jars
- val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false").toBoolean
+ val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean
if (userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index e64530702c..ddfec1a4ac 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -34,9 +34,12 @@ import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
+ extends Logging {
- def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+ def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+
+ def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
private val rpc: YarnRPC = YarnRPC.create(conf)
private var resourceManager: AMRMProtocol = _
@@ -46,7 +49,6 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
private var yarnAllocator: YarnAllocationHandler = _
private var driverClosed:Boolean = false
- private val sparkConf = new SparkConf
val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf)._1
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 4f34bd913e..132630e5ef 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -37,12 +37,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
class WorkerRunnable(
container: Container,
conf: Configuration,
+ sparkConf: SparkConf,
masterAddress: String,
slaveId: String,
hostname: String,
@@ -200,7 +201,7 @@ class WorkerRunnable(
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()
- Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
+ Client.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
// Allow users to specify some environment variables
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index c8af653b3f..e91257be8e 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -261,7 +261,7 @@ private[yarn] class YarnAllocationHandler(
}
new Thread(
- new WorkerRunnable(container, conf, driverUrl, workerId,
+ new WorkerRunnable(container, conf, sparkConf, driverUrl, workerId,
workerHostname, workerMemory, workerCores)
).start()
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 7aac2328da..1419f215c7 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -26,7 +26,7 @@ import org.apache.spark.util.MemoryParam
// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
-class ClientArguments(val args: Array[String]) {
+class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
var addJars: String = null
var files: String = null
var archives: String = null
@@ -36,7 +36,7 @@ class ClientArguments(val args: Array[String]) {
var workerMemory = 1024 // MB
var workerCores = 1
var numWorkers = 2
- var amQueue = new SparkConf().get("QUEUE", "default")
+ var amQueue = sparkConf.get("QUEUE", "default")
var amMemory: Int = 512 // MB
var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
var appName: String = "Spark"
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 4b69f5078b..324ef4616f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -67,8 +67,8 @@ private[spark] class YarnClientSchedulerBackend(
"--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
)
- val args = new ClientArguments(argsArray)
- client = new Client(args)
+ val args = new ClientArguments(argsArray, conf)
+ client = new Client(args, conf)
appId = client.runApp()
waitForApp()
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 7c32e0ab9b..69ae14ce83 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -42,11 +42,14 @@ import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.util.Utils
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
+ sparkConf: SparkConf) extends Logging {
- def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+ def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+ this(args, new Configuration(), sparkConf)
+
+ def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
- private var rpc: YarnRPC = YarnRPC.create(conf)
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var appAttemptId: ApplicationAttemptId = _
private var userThread: Thread = _
@@ -60,7 +63,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private var isLastAMRetry: Boolean = true
private var amClient: AMRMClient[ContainerRequest] = _
- private val sparkConf = new SparkConf()
// Default to numWorkers * 2, with minimum of 3
private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3))
@@ -115,7 +117,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.getOrElse(Option(System.getenv("LOCAL_DIRS"))
- .getOrElse(""))
+ .getOrElse(""))
if (localDirs.isEmpty()) {
throw new Exception("Yarn Local dirs can't be empty")
@@ -137,11 +139,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
}
- private def startUserClass(): Thread = {
+ private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
val mainMethod = Class.forName(
args.userClass,
- false /* initialize */,
+ false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
@@ -257,7 +259,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+ val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
val t = new Thread {
override def run() {
@@ -316,7 +318,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("finishApplicationMaster with " + status)
// Set tracking URL to empty since we don't have a history server.
- amClient.unregisterApplicationMaster(status, "" /* appMessage */, "" /* appTrackingUrl */)
+ amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
}
/**
@@ -351,6 +353,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
}
}
+
}
object ApplicationMaster {
@@ -401,6 +404,7 @@ object ApplicationMaster {
// This is not only logs, but also ensures that log system is initialized for this instance
// when we are actually 'run'-ing.
logInfo("Adding shutdown hook for context " + sc)
+
override def run() {
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
@@ -409,7 +413,7 @@ object ApplicationMaster {
master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
}
}
- } )
+ })
}
// Wait for initialization to complete and atleast 'some' nodes can get allocated.
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index a75066888c..440ad5cde5 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -50,23 +50,25 @@ import org.apache.spark.deploy.SparkHadoopUtil
* Client submits an application to the global ResourceManager to launch Spark's ApplicationMaster,
* which will launch a Spark master process and negotiate resources throughout its duration.
*/
-class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
+class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
+ extends YarnClientImpl with Logging {
+
+ def this(args: ClientArguments, sparkConf: SparkConf) =
+ this(args, new Configuration(), sparkConf)
+
+ def this(args: ClientArguments) = this(args, new SparkConf())
var rpc: YarnRPC = YarnRPC.create(conf)
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
private val SPARK_STAGING: String = ".sparkStaging"
private val distCacheMgr = new ClientDistributedCacheManager()
- private val sparkConf = new SparkConf
-
// Staging directory is private! -> rwx--------
val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700: Short)
// App files are world-wide readable and owner writable -> rw-r--r--
val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644: Short)
- def this(args: ClientArguments) = this(new Configuration(), args)
-
def runApp(): ApplicationId = {
validateArgs()
// Initialize and start the client service.
@@ -143,7 +145,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
clusterMetrics.getNumNodeManagers)
val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
- logInfo("""Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s,
+ logInfo( """Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s,
queueApplicationCount = %s, queueChildQueueCount = %s""".format(
queueInfo.getQueueName,
queueInfo.getCurrentCapacity,
@@ -326,7 +328,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val env = new HashMap[String, String]()
- Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
+ Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
@@ -347,7 +349,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val prefix = " --args "
val args = clientArgs.userArgs
val retval = new StringBuilder()
- for (arg <- args){
+ for (arg <- args) {
retval.append(prefix).append(" '").append(arg).append("' ")
}
retval.toString
@@ -482,10 +484,10 @@ object Client {
// Note: anything env variable with SPARK_ prefix gets propagated to all (remote) processes -
// see Client#setupLaunchEnv().
System.setProperty("SPARK_YARN_MODE", "true")
+ val sparkConf = new SparkConf()
+ val args = new ClientArguments(argStrings, sparkConf)
- val args = new ClientArguments(argStrings)
-
- (new Client(args)).run()
+ new Client(args, sparkConf).run()
}
// Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
@@ -495,7 +497,7 @@ object Client {
}
}
- def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
+ def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
// If log4j present, ensure ours overrides all others
if (addLog4j) {
@@ -503,7 +505,7 @@ object Client {
Path.SEPARATOR + LOG4J_PROP)
}
// Normally the users app.jar is last in case conflicts with spark jars
- val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false")
+ val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 9b898b5829..49248a8516 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -35,9 +35,13 @@ import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
+ extends Logging {
- def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+ def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+ this(args, new Configuration(), sparkConf)
+
+ def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
private var appAttemptId: ApplicationAttemptId = _
private var reporterThread: Thread = _
@@ -47,9 +51,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
private var driverClosed:Boolean = false
private var amClient: AMRMClient[ContainerRequest] = _
- private val sparkConf = new SparkConf
- val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+ val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf)._1
var actor: ActorRef = _
@@ -94,7 +97,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
// must be <= timeoutInterval/ 2.
// On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
// so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
- val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+ val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L))
reporterThread = launchReporterThread(interval)
// Wait for the reporter thread to Finish.
@@ -140,8 +143,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
Thread.sleep(100)
}
}
- sparkConf.set("spark.driver.host", driverHost)
- sparkConf.set("spark.driver.port", driverPort.toString)
+ sparkConf.set("spark.driver.host", driverHost)
+ sparkConf.set("spark.driver.port", driverPort.toString)
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
@@ -170,7 +173,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
// TODO: Handle container failure
yarnAllocator.addResourceRequests(args.numWorkers)
- while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
+ while (yarnAllocator.getNumWorkersRunning < args.numWorkers) {
yarnAllocator.allocateResources()
Thread.sleep(100)
}
@@ -181,7 +184,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
// TODO: We might want to extend this to allocate more containers in case they die !
private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+ val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
val t = new Thread {
override def run() {
@@ -213,7 +216,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
def finishApplicationMaster(status: FinalApplicationStatus) {
logInfo("finish ApplicationMaster with " + status)
- amClient.unregisterApplicationMaster(status, "" /* appMessage */, "" /* appTrackingUrl */)
+ amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
}
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 9f5523c4b9..b7699050bb 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -39,12 +39,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
class WorkerRunnable(
container: Container,
conf: Configuration,
+ sparkConf: SparkConf,
masterAddress: String,
slaveId: String,
hostname: String,
@@ -197,7 +198,7 @@ class WorkerRunnable(
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()
- Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
+ Client.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
// Allow users to specify some environment variables
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 8a9a73f5b4..738ff986d8 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -280,6 +280,7 @@ private[yarn] class YarnAllocationHandler(
val workerRunnable = new WorkerRunnable(
container,
conf,
+ sparkConf,
driverUrl,
workerId,
workerHostname,