aboutsummaryrefslogtreecommitdiff
path: root/new-yarn
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2013-12-28 17:13:15 -0500
committerMatei Zaharia <matei@databricks.com>2013-12-28 17:13:15 -0500
commit642029e7f43322f84abe4f7f36bb0b1b95d8101d (patch)
treecef080193815b279b99a8b35f2401873a3ea3eb1 /new-yarn
parent2573add94cf920a88f74d80d8ea94218d812704d (diff)
downloadspark-642029e7f43322f84abe4f7f36bb0b1b95d8101d.tar.gz
spark-642029e7f43322f84abe4f7f36bb0b1b95d8101d.tar.bz2
spark-642029e7f43322f84abe4f7f36bb0b1b95d8101d.zip
Various fixes to configuration code
- Got rid of global SparkContext.globalConf - Pass SparkConf to serializers and compression codecs - Made SparkConf public instead of private[spark] - Improved API of SparkContext and SparkConf - Switched executor environment vars to be passed through SparkConf - Fixed some places that were still using system properties - Fixed some tests, though others are still failing This still fails several tests in core, repl and streaming, likely due to properties not being set or cleared correctly (some of the tests run fine in isolation).
Diffstat (limited to 'new-yarn')
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala44
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala38
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala2
3 files changed, 42 insertions, 42 deletions
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 433268a1dd..91e35e2d34 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -45,7 +45,7 @@ import org.apache.spark.util.Utils
class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
def this(args: ApplicationMasterArguments) = this(args, new Configuration())
-
+
private var rpc: YarnRPC = YarnRPC.create(conf)
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var appAttemptId: ApplicationAttemptId = _
@@ -81,12 +81,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Workaround until hadoop moves to something which has
// https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
// org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
-
+
ApplicationMaster.register(this)
// Start the user's JAR
userThread = startUserClass()
-
+
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
waitForSparkMaster()
@@ -99,7 +99,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Allocate all containers
allocateWorkers()
- // Wait for the user class to Finish
+ // Wait for the user class to Finish
userThread.join()
System.exit(0)
@@ -119,7 +119,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
localDirs
}
-
+
private def getApplicationAttemptId(): ApplicationAttemptId = {
val envs = System.getenv()
val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
@@ -128,17 +128,17 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("ApplicationAttemptId: " + appAttemptId)
appAttemptId
}
-
+
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
logInfo("Registering the ApplicationMaster")
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
}
-
+
private def waitForSparkMaster() {
logInfo("Waiting for Spark driver to be reachable.")
var driverUp = false
var tries = 0
- val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt
+ val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt
while (!driverUp && tries < numTries) {
val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
@@ -199,7 +199,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
ApplicationMaster.sparkContextRef.synchronized {
var numTries = 0
val waitTime = 10000L
- val maxNumTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+ val maxNumTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt
while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries) {
logInfo("Waiting for Spark context initialization ... " + numTries)
numTries = numTries + 1
@@ -214,7 +214,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
yarnConf,
amClient,
appAttemptId,
- args,
+ args,
sparkContext.preferredNodeLocationData)
} else {
logWarning("Unable to retrieve SparkContext inspite of waiting for %d, maxNumTries = %d".
@@ -265,7 +265,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// we want to be reasonably responsive without causing too many requests to RM.
val schedulerInterval =
- conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+ conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)
@@ -314,11 +314,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
for (container <- containers) {
logInfo("Launching shell command on a new container."
+ ", containerId=" + container.getId()
- + ", containerNode=" + container.getNodeId().getHost()
+ + ", containerNode=" + container.getNodeId().getHost()
+ ":" + container.getNodeId().getPort()
+ ", containerNodeURI=" + container.getNodeHttpAddress()
+ ", containerState" + container.getState()
- + ", containerResourceMemory"
+ + ", containerResourceMemory"
+ container.getResource().getMemory())
}
}
@@ -338,12 +338,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
/**
- * Clean up the staging directory.
+ * Clean up the staging directory.
*/
- private def cleanupStagingDir() {
+ private def cleanupStagingDir() {
var stagingDirPath: Path = null
try {
- val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean
+ val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean
if (!preserveFiles) {
stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
if (stagingDirPath == null) {
@@ -359,7 +359,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
- // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
+ // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
def run() {
@@ -415,18 +415,18 @@ object ApplicationMaster {
// Note that this will unfortunately not properly clean up the staging files because it gets
// called too late, after the filesystem is already shutdown.
if (modified) {
- Runtime.getRuntime().addShutdownHook(new Thread with Logging {
+ Runtime.getRuntime().addShutdownHook(new Thread with Logging {
// This is not only logs, but also ensures that log system is initialized for this instance
// when we are actually 'run'-ing.
logInfo("Adding shutdown hook for context " + sc)
- override def run() {
- logInfo("Invoking sc stop from shutdown hook")
- sc.stop()
+ override def run() {
+ logInfo("Invoking sc stop from shutdown hook")
+ sc.stop()
// Best case ...
for (master <- applicationMasters) {
master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
}
- }
+ }
} )
}
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index a322f60864..963b5b88be 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, Records}
-import org.apache.spark.Logging
+import org.apache.spark.Logging
import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil
@@ -150,7 +150,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
queueInfo.getChildQueues.size))
}
- def verifyClusterResources(app: GetNewApplicationResponse) = {
+ def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory()
logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
@@ -221,7 +221,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
fs.setReplication(newPath, replication)
if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
- }
+ }
// Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
// version shows the specific version in the distributed cache configuration
val qualPath = fs.makeQualified(newPath)
@@ -244,7 +244,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
}
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
- val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort
+ val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort
if (UserGroupInformation.isSecurityEnabled()) {
val dstFs = dst.getFileSystem(conf)
@@ -269,7 +269,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
destName, statCache)
}
}
@@ -283,7 +283,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val destPath = copyRemoteFile(dst, localPath, replication)
// Only add the resource to the Spark ApplicationMaster.
val appMasterOnly = true
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
linkname, statCache, appMasterOnly)
}
}
@@ -295,7 +295,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
linkname, statCache)
}
}
@@ -307,7 +307,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
linkname, statCache)
}
}
@@ -317,7 +317,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
def setupLaunchEnv(
- localResources: HashMap[String, LocalResource],
+ localResources: HashMap[String, LocalResource],
stagingDir: String): HashMap[String, String] = {
logInfo("Setting up the launch environment")
val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null)
@@ -406,11 +406,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
val commands = List[String](
- javaCommand +
+ javaCommand +
" -server " +
JAVA_OPTS +
" " + args.amClass +
- " --class " + args.userClass +
+ " --class " + args.userClass +
" --jar " + args.userJar +
userArgsToString(args) +
" --worker-memory " + args.workerMemory +
@@ -436,7 +436,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
super.submitApplication(appContext)
}
- def monitorApplication(appId: ApplicationId): Boolean = {
+ def monitorApplication(appId: ApplicationId): Boolean = {
while (true) {
Thread.sleep(1000)
val report = super.getApplicationReport(appId)
@@ -458,7 +458,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val state = report.getYarnApplicationState()
val dsStatus = report.getFinalApplicationStatus()
- if (state == YarnApplicationState.FINISHED ||
+ if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
return true
@@ -495,25 +495,25 @@ object Client {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
// If log4j present, ensure ours overrides all others
if (addLog4j) {
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + LOG4J_PROP)
}
// Normally the users app.jar is last in case conflicts with spark jars
- val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false")
+ val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
}
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + SPARK_JAR)
Client.populateHadoopClasspath(conf, env)
if (!userClasspathFirst) {
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
}
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "*")
}
}
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 41ac292249..1a9bb97b3e 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -35,7 +35,7 @@ class ClientArguments(val args: Array[String]) {
var workerMemory = 1024 // MB
var workerCores = 1
var numWorkers = 2
- var amQueue = conf.getOrElse("QUEUE", "default")
+ var amQueue = conf.getOrElse("QUEUE", "default")
var amMemory: Int = 512 // MB
var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
var appName: String = "Spark"