aboutsummaryrefslogtreecommitdiff
path: root/new-yarn
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-01 21:29:12 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-01 21:29:12 -0800
commit3713f8129a618a633a7aca8c944960c3e7ac9d3b (patch)
treeff3aa8fa3460078007259a6a6479dc4aec27b50a /new-yarn
parentc1d928a897f8daed5d7e74f4af476b67046f348d (diff)
parent7e8d2e8a5c88d16c771923504c433491b109ab2a (diff)
downloadspark-3713f8129a618a633a7aca8c944960c3e7ac9d3b.tar.gz
spark-3713f8129a618a633a7aca8c944960c3e7ac9d3b.tar.bz2
spark-3713f8129a618a633a7aca8c944960c3e7ac9d3b.zip
Merge pull request #309 from mateiz/conf2
SPARK-544. Migrate configuration to a SparkConf class This is still a work in progress based on Prashant and Evan's code. So far I've done the following: - Got rid of global SparkContext.globalConf - Passed SparkConf to serializers and compression codecs - Made SparkConf public instead of private[spark] - Improved API of SparkContext and SparkConf - Switched executor environment vars to be passed through SparkConf - Fixed some places that were still using system properties - Fixed some tests, though others are still failing This still fails several tests in core, repl and streaming, likely due to properties not being set or cleared correctly (some of the tests run fine in isolation). But the API at least is hopefully ready for review. Unfortunately there was a lot of global stuff before due to a "SparkContext.globalConf" method that let you set a "default" configuration of sorts, which meant I had to make some pretty big changes.
Diffstat (limited to 'new-yarn')
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala52
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala40
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala2
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala4
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala4
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala4
6 files changed, 53 insertions, 53 deletions
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index eeeca3ea8a..91e35e2d34 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -45,7 +45,7 @@ import org.apache.spark.util.Utils
class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
def this(args: ApplicationMasterArguments) = this(args, new Configuration())
-
+
private var rpc: YarnRPC = YarnRPC.create(conf)
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var appAttemptId: ApplicationAttemptId = _
@@ -61,13 +61,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private var amClient: AMRMClient[ContainerRequest] = _
// Default to numWorkers * 2, with minimum of 3
- private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures",
+ private val maxNumWorkerFailures = conf.getOrElse("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3).toString()).toInt
def run() {
// Setup the directories so things go to YARN approved directories rather
// than user specified and /tmp.
- System.setProperty("spark.local.dir", getLocalDirs())
+ conf.set("spark.local.dir", getLocalDirs())
// Use priority 30 as it's higher then HDFS. It's same priority as MapReduce is using.
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
@@ -81,12 +81,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Workaround until hadoop moves to something which has
// https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
// org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
-
+
ApplicationMaster.register(this)
// Start the user's JAR
userThread = startUserClass()
-
+
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
waitForSparkMaster()
@@ -99,7 +99,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Allocate all containers
allocateWorkers()
- // Wait for the user class to Finish
+ // Wait for the user class to Finish
userThread.join()
System.exit(0)
@@ -119,7 +119,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
localDirs
}
-
+
private def getApplicationAttemptId(): ApplicationAttemptId = {
val envs = System.getenv()
val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
@@ -128,20 +128,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("ApplicationAttemptId: " + appAttemptId)
appAttemptId
}
-
+
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
logInfo("Registering the ApplicationMaster")
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
}
-
+
private def waitForSparkMaster() {
logInfo("Waiting for Spark driver to be reachable.")
var driverUp = false
var tries = 0
- val numTries = System.getProperty("spark.yarn.applicationMaster.waitTries", "10").toInt
+ val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt
while (!driverUp && tries < numTries) {
- val driverHost = System.getProperty("spark.driver.host")
- val driverPort = System.getProperty("spark.driver.port")
+ val driverHost = conf.get("spark.driver.host")
+ val driverPort = conf.get("spark.driver.port")
try {
val socket = new Socket(driverHost, driverPort.toInt)
socket.close()
@@ -199,7 +199,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
ApplicationMaster.sparkContextRef.synchronized {
var numTries = 0
val waitTime = 10000L
- val maxNumTries = System.getProperty("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+ val maxNumTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt
while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries) {
logInfo("Waiting for Spark context initialization ... " + numTries)
numTries = numTries + 1
@@ -214,7 +214,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
yarnConf,
amClient,
appAttemptId,
- args,
+ args,
sparkContext.preferredNodeLocationData)
} else {
logWarning("Unable to retrieve SparkContext inspite of waiting for %d, maxNumTries = %d".
@@ -265,7 +265,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// we want to be reasonably responsive without causing too many requests to RM.
val schedulerInterval =
- System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+ conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)
@@ -314,11 +314,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
for (container <- containers) {
logInfo("Launching shell command on a new container."
+ ", containerId=" + container.getId()
- + ", containerNode=" + container.getNodeId().getHost()
+ + ", containerNode=" + container.getNodeId().getHost()
+ ":" + container.getNodeId().getPort()
+ ", containerNodeURI=" + container.getNodeHttpAddress()
+ ", containerState" + container.getState()
- + ", containerResourceMemory"
+ + ", containerResourceMemory"
+ container.getResource().getMemory())
}
}
@@ -338,12 +338,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
/**
- * Clean up the staging directory.
+ * Clean up the staging directory.
*/
- private def cleanupStagingDir() {
+ private def cleanupStagingDir() {
var stagingDirPath: Path = null
try {
- val preserveFiles = System.getProperty("spark.yarn.preserve.staging.files", "false").toBoolean
+ val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean
if (!preserveFiles) {
stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
if (stagingDirPath == null) {
@@ -359,7 +359,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
- // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
+ // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
def run() {
@@ -415,18 +415,18 @@ object ApplicationMaster {
// Note that this will unfortunately not properly clean up the staging files because it gets
// called too late, after the filesystem is already shutdown.
if (modified) {
- Runtime.getRuntime().addShutdownHook(new Thread with Logging {
+ Runtime.getRuntime().addShutdownHook(new Thread with Logging {
// This is not only logs, but also ensures that log system is initialized for this instance
// when we are actually 'run'-ing.
logInfo("Adding shutdown hook for context " + sc)
- override def run() {
- logInfo("Invoking sc stop from shutdown hook")
- sc.stop()
+ override def run() {
+ logInfo("Invoking sc stop from shutdown hook")
+ sc.stop()
// Best case ...
for (master <- applicationMasters) {
master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
}
- }
+ }
} )
}
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 9fdee29498..1bba6a5ae4 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, Records}
-import org.apache.spark.Logging
+import org.apache.spark.Logging
import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil
@@ -150,7 +150,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
queueInfo.getChildQueues.size))
}
- def verifyClusterResources(app: GetNewApplicationResponse) = {
+ def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory()
logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
@@ -221,7 +221,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
fs.setReplication(newPath, replication)
if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
- }
+ }
// Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
// version shows the specific version in the distributed cache configuration
val qualPath = fs.makeQualified(newPath)
@@ -244,7 +244,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
}
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
- val replication = System.getProperty("spark.yarn.submit.file.replication", "3").toShort
+ val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort
if (UserGroupInformation.isSecurityEnabled()) {
val dstFs = dst.getFileSystem(conf)
@@ -269,7 +269,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
destName, statCache)
}
}
@@ -283,7 +283,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val destPath = copyRemoteFile(dst, localPath, replication)
// Only add the resource to the Spark ApplicationMaster.
val appMasterOnly = true
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
linkname, statCache, appMasterOnly)
}
}
@@ -295,7 +295,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
linkname, statCache)
}
}
@@ -307,7 +307,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
linkname, statCache)
}
}
@@ -317,7 +317,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
def setupLaunchEnv(
- localResources: HashMap[String, LocalResource],
+ localResources: HashMap[String, LocalResource],
stagingDir: String): HashMap[String, String] = {
logInfo("Setting up the launch environment")
val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null)
@@ -406,11 +406,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
val commands = List[String](
- javaCommand +
+ javaCommand +
" -server " +
JAVA_OPTS +
" " + args.amClass +
- " --class " + args.userClass +
+ " --class " + args.userClass +
" --jar " + args.userJar +
userArgsToString(args) +
" --worker-memory " + args.workerMemory +
@@ -436,8 +436,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
super.submitApplication(appContext)
}
- def monitorApplication(appId: ApplicationId): Boolean = {
- val interval = System.getProperty("spark.yarn.report.interval", "1000").toLong
+ def monitorApplication(appId: ApplicationId): Boolean = {
+ val interval = new SparkConf().getOrElse("spark.yarn.report.interval", "1000").toLong
while (true) {
Thread.sleep(interval)
@@ -460,7 +460,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val state = report.getYarnApplicationState()
val dsStatus = report.getFinalApplicationStatus()
- if (state == YarnApplicationState.FINISHED ||
+ if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
return true
@@ -497,25 +497,25 @@ object Client {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
// If log4j present, ensure ours overrides all others
if (addLog4j) {
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + LOG4J_PROP)
}
// Normally the users app.jar is last in case conflicts with spark jars
- val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false")
+ val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
}
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + SPARK_JAR)
Client.populateHadoopClasspath(conf, env)
if (!userClasspathFirst) {
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
}
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "*")
}
}
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 70be15d0a3..1a9bb97b3e 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -35,7 +35,7 @@ class ClientArguments(val args: Array[String]) {
var workerMemory = 1024 // MB
var workerCores = 1
var numWorkers = 2
- var amQueue = System.getProperty("QUEUE", "default")
+ var amQueue = conf.getOrElse("QUEUE", "default")
var amMemory: Int = 512 // MB
var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
var appName: String = "Spark"
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index bc31bb2eb0..f7d73f0d83 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -136,8 +136,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
Thread.sleep(100)
}
}
- System.setProperty("spark.driver.host", driverHost)
- System.setProperty("spark.driver.port", driverPort.toString)
+ conf.set("spark.driver.host", driverHost)
+ conf.set("spark.driver.port", driverPort.toString)
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 784a3112de..abc3447746 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -254,8 +254,8 @@ private[yarn] class YarnAllocationHandler(
} else {
val workerId = workerIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- System.getProperty("spark.driver.host"),
- System.getProperty("spark.driver.port"),
+ conf.get("spark.driver.host"),
+ conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
logInfo("Launching container %s for on host %s".format(containerId, workerHostname))
diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 350fc760a4..4b69f5078b 100644
--- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -53,8 +53,8 @@ private[spark] class YarnClientSchedulerBackend(
if (workerNumber == null)
workerNumber = defaultWorkerNumber
- val driverHost = System.getProperty("spark.driver.host")
- val driverPort = System.getProperty("spark.driver.port")
+ val driverHost = conf.get("spark.driver.host")
+ val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
val argsArray = Array[String](