aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-12-24 18:30:31 +0530
committerPrashant Sharma <scrapcodes@gmail.com>2013-12-25 00:09:36 +0530
commit2573add94cf920a88f74d80d8ea94218d812704d (patch)
tree9eb07c85cadbaea90a8e9742687adda924342b42 /yarn
parent0bc57c576792ba800eca0ec196c92a4d29cb3953 (diff)
downloadspark-2573add94cf920a88f74d80d8ea94218d812704d.tar.gz
spark-2573add94cf920a88f74d80d8ea94218d812704d.tar.bz2
spark-2573add94cf920a88f74d80d8ea94218d812704d.zip
spark-544, introducing SparkConf and related configuration overhaul.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala16
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala4
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala4
6 files changed, 17 insertions, 17 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 240ed8b32a..1dd38dd13e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -58,13 +58,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true
// default to numWorkers * 2, with minimum of 3
- private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures",
+ private val maxNumWorkerFailures = conf.getOrElse("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3).toString()).toInt
def run() {
// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
- System.setProperty("spark.local.dir", getLocalDirs())
+ conf.set("spark.local.dir", getLocalDirs())
// Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
@@ -165,10 +165,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("Waiting for spark driver to be reachable.")
var driverUp = false
var tries = 0
- val numTries = System.getProperty("spark.yarn.applicationMaster.waitTries", "10").toInt
+ val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt
while(!driverUp && tries < numTries) {
- val driverHost = System.getProperty("spark.driver.host")
- val driverPort = System.getProperty("spark.driver.port")
+ val driverHost = conf.get("spark.driver.host")
+ val driverPort = conf.get("spark.driver.port")
try {
val socket = new Socket(driverHost, driverPort.toInt)
socket.close()
@@ -226,7 +226,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
ApplicationMaster.sparkContextRef.synchronized {
var count = 0
val waitTime = 10000L
- val numTries = System.getProperty("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+ val numTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt
while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
logInfo("Waiting for spark context initialization ... " + count)
count = count + 1
@@ -294,7 +294,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// we want to be reasonably responsive without causing too many requests to RM.
val schedulerInterval =
- System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+ conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)
@@ -377,7 +377,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private def cleanupStagingDir() {
var stagingDirPath: Path = null
try {
- val preserveFiles = System.getProperty("spark.yarn.preserve.staging.files", "false").toBoolean
+ val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean
if (!preserveFiles) {
stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
if (stagingDirPath == null) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 79dd038065..29892e98e3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -230,7 +230,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
}
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
- val replication = System.getProperty("spark.yarn.submit.file.replication", "3").toShort
+ val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort
if (UserGroupInformation.isSecurityEnabled()) {
val dstFs = dst.getFileSystem(conf)
@@ -461,7 +461,7 @@ object Client {
def main(argStrings: Array[String]) {
// Set an env variable indicating we are running in YARN mode.
// Note that anything with SPARK prefix gets propagated to all (remote) processes
- System.setProperty("SPARK_YARN_MODE", "true")
+ conf.set("SPARK_YARN_MODE", "true")
val args = new ClientArguments(argStrings)
@@ -483,7 +483,7 @@ object Client {
Path.SEPARATOR + LOG4J_PROP)
}
// Normally the users app.jar is last in case conflicts with spark jars
- val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false")
+ val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index b3a7886d93..617289f568 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -33,7 +33,7 @@ class ClientArguments(val args: Array[String]) {
var workerMemory = 1024
var workerCores = 1
var numWorkers = 2
- var amQueue = System.getProperty("QUEUE", "default")
+ var amQueue = conf.getOrElse("QUEUE", "default")
var amMemory: Int = 512
var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
var appName: String = "Spark"
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 69038844bb..c1e79cbe66 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -162,8 +162,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
Thread.sleep(100)
}
}
- System.setProperty("spark.driver.host", driverHost)
- System.setProperty("spark.driver.port", driverPort.toString)
+ conf.set("spark.driver.host", driverHost)
+ conf.set("spark.driver.port", driverPort.toString)
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 9ab2073529..4c9fee5695 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -239,7 +239,7 @@ private[yarn] class YarnAllocationHandler(
// (workerIdCounter)
val workerId = workerIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
+ conf.get("spark.driver.host"), conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
logInfo("launching container on " + containerId + " host " + workerHostname)
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index b206780c78..6feaaff014 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -52,8 +52,8 @@ private[spark] class YarnClientSchedulerBackend(
if (workerNumber == null)
workerNumber = defaultWorkerNumber
- val driverHost = System.getProperty("spark.driver.host")
- val driverPort = System.getProperty("spark.driver.port")
+ val driverHost = conf.get("spark.driver.host")
+ val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
val argsArray = Array[String](