aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorliguoqiang <liguoqiang@rd.tuan800.com>2014-01-03 15:01:38 +0800
committerliguoqiang <liguoqiang@rd.tuan800.com>2014-01-03 15:01:38 +0800
commit010e72c079274cab7c86cbde3bc7fa5c447e2072 (patch)
treedc66141ef654ef240975269fdca110762d31b6f3 /yarn
parent498a5f0a1c6e82a33c2ad8c48b68bbdb8da57a95 (diff)
downloadspark-010e72c079274cab7c86cbde3bc7fa5c447e2072.tar.gz
spark-010e72c079274cab7c86cbde3bc7fa5c447e2072.tar.bz2
spark-010e72c079274cab7c86cbde3bc7fa5c447e2072.zip
Modify spark on yarn to create SparkConf process
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala9
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala18
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala4
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala7
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala5
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala4
7 files changed, 27 insertions, 22 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 7cf120d3eb..69170c7427 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -39,11 +39,13 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.util.Utils
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
- def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+ def this(args: ApplicationMasterArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
- private var rpc: YarnRPC = YarnRPC.create(conf)
+ def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
+
+ private val rpc: YarnRPC = YarnRPC.create(conf)
private var resourceManager: AMRMProtocol = _
private var appAttemptId: ApplicationAttemptId = _
private var userThread: Thread = _
@@ -57,7 +59,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true
- private val sparkConf = new SparkConf()
// Default to numWorkers * 2, with minimum of 3
private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3))
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 2bd047c97a..525ea72762 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -45,16 +45,17 @@ import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil
-class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
+class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) extends YarnClientImpl with Logging {
- def this(args: ClientArguments) = this(new Configuration(), args)
+ def this(args: ClientArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+
+ def this(args: ClientArguments) = this(args, new SparkConf())
var rpc: YarnRPC = YarnRPC.create(conf)
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
private val SPARK_STAGING: String = ".sparkStaging"
private val distCacheMgr = new ClientDistributedCacheManager()
- private val sparkConf = new SparkConf
// Staging directory is private! -> rwx--------
val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
@@ -307,7 +308,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val env = new HashMap[String, String]()
- Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
+ Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
@@ -466,9 +467,10 @@ object Client {
// Note that anything with SPARK prefix gets propagated to all (remote) processes
System.setProperty("SPARK_YARN_MODE", "true")
- val args = new ClientArguments(argStrings)
+ val sparkConf = new SparkConf
+ val args = new ClientArguments(argStrings,sparkConf)
- new Client(args).run
+ new Client(args,sparkConf).run
}
// Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
@@ -478,7 +480,7 @@ object Client {
}
}
- def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
+ def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
// If log4j present, ensure ours overrides all others
if (addLog4j) {
@@ -486,7 +488,7 @@ object Client {
Path.SEPARATOR + LOG4J_PROP)
}
// Normally the users app.jar is last in case conflicts with spark jars
- val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false").toBoolean
+ val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean
if (userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 9075ca71e7..09303ae5c2 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -24,7 +24,7 @@ import collection.mutable.{ArrayBuffer, HashMap}
import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
// TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
-class ClientArguments(val args: Array[String]) {
+class ClientArguments(val args: Array[String],val sparkConf: SparkConf) {
var addJars: String = null
var files: String = null
var archives: String = null
@@ -34,7 +34,7 @@ class ClientArguments(val args: Array[String]) {
var workerMemory = 1024
var workerCores = 1
var numWorkers = 2
- var amQueue = new SparkConf().get("QUEUE", "default")
+ var amQueue = sparkConf.get("QUEUE", "default")
var amMemory: Int = 512
var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
var appName: String = "Spark"
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index a8de89c670..1a792ddf66 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -34,9 +34,11 @@ import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
- def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+ def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+
+ def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
private val rpc: YarnRPC = YarnRPC.create(conf)
private var resourceManager: AMRMProtocol = null
@@ -46,7 +48,6 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
private var yarnAllocator: YarnAllocationHandler = null
private var driverClosed:Boolean = false
- private val sparkConf = new SparkConf
val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf)._1
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 6a90cc51cf..5e5d0421ba 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -37,12 +37,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
class WorkerRunnable(
container: Container,
conf: Configuration,
+ sparkConf: SparkConf,
masterAddress: String,
slaveId: String,
hostname: String,
@@ -200,7 +201,7 @@ class WorkerRunnable(
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()
- Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
+ Client.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
// Allow users to specify some environment variables
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index c8af653b3f..e91257be8e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -261,7 +261,7 @@ private[yarn] class YarnAllocationHandler(
}
new Thread(
- new WorkerRunnable(container, conf, driverUrl, workerId,
+ new WorkerRunnable(container, conf, sparkConf, driverUrl, workerId,
workerHostname, workerMemory, workerCores)
).start()
}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 4b69f5078b..324ef4616f 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -67,8 +67,8 @@ private[spark] class YarnClientSchedulerBackend(
"--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
)
- val args = new ClientArguments(argsArray)
- client = new Client(args)
+ val args = new ClientArguments(argsArray, conf)
+ client = new Client(args, conf)
appId = client.runApp()
waitForApp()
}