diff options
author | Matei Zaharia <matei@databricks.com> | 2013-12-28 17:13:15 -0500 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2013-12-28 17:13:15 -0500 |
commit | 642029e7f43322f84abe4f7f36bb0b1b95d8101d (patch) | |
tree | cef080193815b279b99a8b35f2401873a3ea3eb1 /new-yarn | |
parent | 2573add94cf920a88f74d80d8ea94218d812704d (diff) | |
download | spark-642029e7f43322f84abe4f7f36bb0b1b95d8101d.tar.gz spark-642029e7f43322f84abe4f7f36bb0b1b95d8101d.tar.bz2 spark-642029e7f43322f84abe4f7f36bb0b1b95d8101d.zip |
Various fixes to configuration code
- Got rid of global SparkContext.globalConf
- Pass SparkConf to serializers and compression codecs
- Made SparkConf public instead of private[spark]
- Improved API of SparkContext and SparkConf
- Switched executor environment vars to be passed through SparkConf
- Fixed some places that were still using system properties
- Fixed some tests, though others are still failing
This still fails several tests in core, repl and streaming, likely due
to properties not being set or cleared correctly (some of the tests run
fine in isolation).
Diffstat (limited to 'new-yarn')
3 files changed, 42 insertions, 42 deletions
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 433268a1dd..91e35e2d34 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -45,7 +45,7 @@ import org.apache.spark.util.Utils class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging { def this(args: ApplicationMasterArguments) = this(args, new Configuration()) - + private var rpc: YarnRPC = YarnRPC.create(conf) private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) private var appAttemptId: ApplicationAttemptId = _ @@ -81,12 +81,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // Workaround until hadoop moves to something which has // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line) // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf) - + ApplicationMaster.register(this) // Start the user's JAR userThread = startUserClass() - + // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. waitForSparkMaster() @@ -99,7 +99,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // Allocate all containers allocateWorkers() - // Wait for the user class to Finish + // Wait for the user class to Finish userThread.join() System.exit(0) @@ -119,7 +119,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } localDirs } - + private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name()) @@ -128,17 +128,17 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e logInfo("ApplicationAttemptId: " + appAttemptId) appAttemptId } - + private def registerApplicationMaster(): RegisterApplicationMasterResponse = { logInfo("Registering the ApplicationMaster") amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) } - + private def waitForSparkMaster() { logInfo("Waiting for Spark driver to be reachable.") var driverUp = false var tries = 0 - val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt + val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt while (!driverUp && tries < numTries) { val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") @@ -199,7 +199,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ApplicationMaster.sparkContextRef.synchronized { var numTries = 0 val waitTime = 10000L - val maxNumTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt + val maxNumTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries) { logInfo("Waiting for Spark context initialization ... " + numTries) numTries = numTries + 1 @@ -214,7 +214,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e yarnConf, amClient, appAttemptId, - args, + args, sparkContext.preferredNodeLocationData) } else { logWarning("Unable to retrieve SparkContext inspite of waiting for %d, maxNumTries = %d". @@ -265,7 +265,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // we want to be reasonably responsive without causing too many requests to RM. val schedulerInterval = - conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong // must be <= timeoutInterval / 2. val interval = math.min(timeoutInterval / 2, schedulerInterval) @@ -314,11 +314,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e for (container <- containers) { logInfo("Launching shell command on a new container." + ", containerId=" + container.getId() - + ", containerNode=" + container.getNodeId().getHost() + + ", containerNode=" + container.getNodeId().getHost() + ":" + container.getNodeId().getPort() + ", containerNodeURI=" + container.getNodeHttpAddress() + ", containerState" + container.getState() - + ", containerResourceMemory" + + ", containerResourceMemory" + container.getResource().getMemory()) } } @@ -338,12 +338,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } /** - * Clean up the staging directory. + * Clean up the staging directory. */ - private def cleanupStagingDir() { + private def cleanupStagingDir() { var stagingDirPath: Path = null try { - val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean + val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean if (!preserveFiles) { stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) if (stagingDirPath == null) { @@ -359,7 +359,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } } - // The shutdown hook that runs when a signal is received AND during normal close of the JVM. + // The shutdown hook that runs when a signal is received AND during normal close of the JVM. class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable { def run() { @@ -415,18 +415,18 @@ object ApplicationMaster { // Note that this will unfortunately not properly clean up the staging files because it gets // called too late, after the filesystem is already shutdown. if (modified) { - Runtime.getRuntime().addShutdownHook(new Thread with Logging { + Runtime.getRuntime().addShutdownHook(new Thread with Logging { // This is not only logs, but also ensures that log system is initialized for this instance // when we are actually 'run'-ing. logInfo("Adding shutdown hook for context " + sc) - override def run() { - logInfo("Invoking sc stop from shutdown hook") - sc.stop() + override def run() { + logInfo("Invoking sc stop from shutdown hook") + sc.stop() // Best case ... for (master <- applicationMasters) { master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } - } + } } ) } diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index a322f60864..963b5b88be 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, Records} -import org.apache.spark.Logging +import org.apache.spark.Logging import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil @@ -150,7 +150,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl queueInfo.getChildQueues.size)) } - def verifyClusterResources(app: GetNewApplicationResponse) = { + def verifyClusterResources(app: GetNewApplicationResponse) = { val maxMem = app.getMaximumResourceCapability().getMemory() logInfo("Max mem capabililty of a single resource in this cluster " + maxMem) @@ -221,7 +221,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf) fs.setReplication(newPath, replication) if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION)) - } + } // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific // version shows the specific version in the distributed cache configuration val qualPath = fs.makeQualified(newPath) @@ -244,7 +244,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } } val dst = new Path(fs.getHomeDirectory(), appStagingDir) - val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort + val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort if (UserGroupInformation.isSecurityEnabled()) { val dstFs = dst.getFileSystem(conf) @@ -269,7 +269,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } val setPermissions = if (destName.equals(Client.APP_JAR)) true else false val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions) - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, + distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, destName, statCache) } } @@ -283,7 +283,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val destPath = copyRemoteFile(dst, localPath, replication) // Only add the resource to the Spark ApplicationMaster. val appMasterOnly = true - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, + distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, linkname, statCache, appMasterOnly) } } @@ -295,7 +295,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val localPath = new Path(localURI) val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) val destPath = copyRemoteFile(dst, localPath, replication) - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, + distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, linkname, statCache) } } @@ -307,7 +307,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val localPath = new Path(localURI) val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) val destPath = copyRemoteFile(dst, localPath, replication) - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, + distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, linkname, statCache) } } @@ -317,7 +317,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } def setupLaunchEnv( - localResources: HashMap[String, LocalResource], + localResources: HashMap[String, LocalResource], stagingDir: String): HashMap[String, String] = { logInfo("Setting up the launch environment") val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null) @@ -406,11 +406,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } val commands = List[String]( - javaCommand + + javaCommand + " -server " + JAVA_OPTS + " " + args.amClass + - " --class " + args.userClass + + " --class " + args.userClass + " --jar " + args.userJar + userArgsToString(args) + " --worker-memory " + args.workerMemory + @@ -436,7 +436,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl super.submitApplication(appContext) } - def monitorApplication(appId: ApplicationId): Boolean = { + def monitorApplication(appId: ApplicationId): Boolean = { while (true) { Thread.sleep(1000) val report = super.getApplicationReport(appId) @@ -458,7 +458,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val state = report.getYarnApplicationState() val dsStatus = report.getFinalApplicationStatus() - if (state == YarnApplicationState.FINISHED || + if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { return true @@ -495,25 +495,25 @@ object Client { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) // If log4j present, ensure ours overrides all others if (addLog4j) { - Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + LOG4J_PROP) } // Normally the users app.jar is last in case conflicts with spark jars - val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false") + val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false") .toBoolean if (userClasspathFirst) { - Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + APP_JAR) } - Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + SPARK_JAR) Client.populateHadoopClasspath(conf, env) if (!userClasspathFirst) { - Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + APP_JAR) } - Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + "*") } } diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 41ac292249..1a9bb97b3e 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -35,7 +35,7 @@ class ClientArguments(val args: Array[String]) { var workerMemory = 1024 // MB var workerCores = 1 var numWorkers = 2 - var amQueue = conf.getOrElse("QUEUE", "default") + var amQueue = conf.getOrElse("QUEUE", "default") var amMemory: Int = 512 // MB var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster" var appName: String = "Spark" |