diff options
Diffstat (limited to 'yarn/src/main')
6 files changed, 65 insertions, 63 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 240ed8b32a..dc9228180f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -43,7 +43,7 @@ import org.apache.spark.util.Utils class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging { def this(args: ApplicationMasterArguments) = this(args, new Configuration()) - + private var rpc: YarnRPC = YarnRPC.create(conf) private var resourceManager: AMRMProtocol = _ private var appAttemptId: ApplicationAttemptId = _ @@ -58,17 +58,17 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) private var isLastAMRetry: Boolean = true // default to numWorkers * 2, with minimum of 3 - private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures", + private val maxNumWorkerFailures = conf.getOrElse("spark.yarn.max.worker.failures", math.max(args.numWorkers * 2, 3).toString()).toInt def run() { // Setup the directories so things go to yarn approved directories rather // then user specified and /tmp. - System.setProperty("spark.local.dir", getLocalDirs()) + conf.set("spark.local.dir", getLocalDirs()) // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using. ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) - + appAttemptId = getApplicationAttemptId() isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts resourceManager = registerWithResourceManager() @@ -92,11 +92,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // } //} // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf) - + ApplicationMaster.register(this) // Start the user's JAR userThread = startUserClass() - + // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. waitForSparkMaster() @@ -105,11 +105,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // Do this after spark master is up and SparkContext is created so that we can register UI Url val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() - + // Allocate all containers allocateWorkers() - - // Wait for the user class to Finish + + // Wait for the user class to Finish userThread.join() System.exit(0) @@ -129,7 +129,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } localDirs } - + private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV) @@ -138,7 +138,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e logInfo("ApplicationAttemptId: " + appAttemptId) appAttemptId } - + private def registerWithResourceManager(): AMRMProtocol = { val rmAddress = NetUtils.createSocketAddr(yarnConf.get( YarnConfiguration.RM_SCHEDULER_ADDRESS, @@ -146,29 +146,29 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e logInfo("Connecting to ResourceManager at " + rmAddress) rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol] } - + private def registerApplicationMaster(): RegisterApplicationMasterResponse = { logInfo("Registering the ApplicationMaster") val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest]) .asInstanceOf[RegisterApplicationMasterRequest] appMasterRequest.setApplicationAttemptId(appAttemptId) // Setting this to master host,port - so that the ApplicationReport at client has some - // sensible info. + // sensible info. // Users can then monitor stderr/stdout on that node if required. appMasterRequest.setHost(Utils.localHostName()) appMasterRequest.setRpcPort(0) appMasterRequest.setTrackingUrl(uiAddress) resourceManager.registerApplicationMaster(appMasterRequest) } - + private def waitForSparkMaster() { logInfo("Waiting for spark driver to be reachable.") var driverUp = false var tries = 0 - val numTries = System.getProperty("spark.yarn.applicationMaster.waitTries", "10").toInt + val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt while(!driverUp && tries < numTries) { - val driverHost = System.getProperty("spark.driver.host") - val driverPort = System.getProperty("spark.driver.port") + val driverHost = conf.get("spark.driver.host") + val driverPort = conf.get("spark.driver.port") try { val socket = new Socket(driverHost, driverPort.toInt) socket.close() @@ -226,7 +226,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ApplicationMaster.sparkContextRef.synchronized { var count = 0 val waitTime = 10000L - val numTries = System.getProperty("spark.yarn.ApplicationMaster.waitTries", "10").toInt + val numTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) { logInfo("Waiting for spark context initialization ... " + count) count = count + 1 @@ -241,8 +241,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e yarnConf, resourceManager, appAttemptId, - args, - sparkContext.preferredNodeLocationData) + args, + sparkContext.preferredNodeLocationData) } else { logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d". format(count * waitTime, numTries)) @@ -294,7 +294,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // we want to be reasonably responsive without causing too many requests to RM. val schedulerInterval = - System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong // must be <= timeoutInterval / 2. val interval = math.min(timeoutInterval / 2, schedulerInterval) @@ -342,11 +342,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e for (container <- containers) { logInfo("Launching shell command on a new container." + ", containerId=" + container.getId() - + ", containerNode=" + container.getNodeId().getHost() + + ", containerNode=" + container.getNodeId().getHost() + ":" + container.getNodeId().getPort() + ", containerNodeURI=" + container.getNodeHttpAddress() + ", containerState" + container.getState() - + ", containerResourceMemory" + + ", containerResourceMemory" + container.getResource().getMemory()) } } @@ -372,12 +372,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } /** - * Clean up the staging directory. + * Clean up the staging directory. */ - private def cleanupStagingDir() { + private def cleanupStagingDir() { var stagingDirPath: Path = null try { - val preserveFiles = System.getProperty("spark.yarn.preserve.staging.files", "false").toBoolean + val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean if (!preserveFiles) { stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) if (stagingDirPath == null) { @@ -393,7 +393,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } } - // The shutdown hook that runs when a signal is received AND during normal close of the JVM. + // The shutdown hook that runs when a signal is received AND during normal close of the JVM. class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable { def run() { @@ -446,18 +446,18 @@ object ApplicationMaster { // Note that this will unfortunately not properly clean up the staging files because it gets // called too late, after the filesystem is already shutdown. if (modified) { - Runtime.getRuntime().addShutdownHook(new Thread with Logging { + Runtime.getRuntime().addShutdownHook(new Thread with Logging { // This is not only logs, but also ensures that log system is initialized for this instance // when we are actually 'run'-ing. logInfo("Adding shutdown hook for context " + sc) - override def run() { - logInfo("Invoking sc stop from shutdown hook") - sc.stop() + override def run() { + logInfo("Invoking sc stop from shutdown hook") + sc.stop() // Best case ... for (master <- applicationMasters) { master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } - } + } } ) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 79dd038065..595a7ee8c3 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, Records} -import org.apache.spark.Logging +import org.apache.spark.Logging import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil @@ -59,7 +59,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short) // App files are world-wide readable and owner writable -> rw-r--r-- - val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short) + val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short) // for client user who want to monitor app status by itself. def runApp() = { @@ -103,7 +103,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD), (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size " + "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD) - ).foreach { case(cond, errStr) => + ).foreach { case(cond, errStr) => if (cond) { logError(errStr) args.printUsageAndExit(1) @@ -130,7 +130,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl queueInfo.getChildQueues.size)) } - def verifyClusterResources(app: GetNewApplicationResponse) = { + def verifyClusterResources(app: GetNewApplicationResponse) = { val maxMem = app.getMaximumResourceCapability().getMemory() logInfo("Max mem capabililty of a single resource in this cluster " + maxMem) @@ -146,7 +146,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } // We could add checks to make sure the entire cluster has enough resources but that involves - // getting all the node reports and computing ourselves + // getting all the node reports and computing ourselves } def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = { @@ -207,7 +207,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf) fs.setReplication(newPath, replication) if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION)) - } + } // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific // version shows the specific version in the distributed cache configuration val qualPath = fs.makeQualified(newPath) @@ -230,7 +230,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } } val dst = new Path(fs.getHomeDirectory(), appStagingDir) - val replication = System.getProperty("spark.yarn.submit.file.replication", "3").toShort + val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort if (UserGroupInformation.isSecurityEnabled()) { val dstFs = dst.getFileSystem(conf) @@ -241,7 +241,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar, + Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar, Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF")) .foreach { case(destName, _localPath) => val localPath: String = if (_localPath != null) _localPath.trim() else "" @@ -253,7 +253,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } val setPermissions = if (destName.equals(Client.APP_JAR)) true else false val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions) - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, + distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, destName, statCache) } } @@ -265,7 +265,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val localPath = new Path(localURI) val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) val destPath = copyRemoteFile(dst, localPath, replication) - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, + distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, linkname, statCache, true) } } @@ -277,7 +277,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val localPath = new Path(localURI) val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) val destPath = copyRemoteFile(dst, localPath, replication) - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, + distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, linkname, statCache) } } @@ -289,7 +289,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val localPath = new Path(localURI) val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) val destPath = copyRemoteFile(dst, localPath, replication) - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, + distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, linkname, statCache) } } @@ -299,7 +299,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } def setupLaunchEnv( - localResources: HashMap[String, LocalResource], + localResources: HashMap[String, LocalResource], stagingDir: String): HashMap[String, String] = { logInfo("Setting up the launch environment") val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null) @@ -354,7 +354,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl // Add Xmx for am memory JAVA_OPTS += "-Xmx" + amMemory + "m " - JAVA_OPTS += " -Djava.io.tmpdir=" + + JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " // Commenting it out for now - so that people can refer to the properties if required. Remove @@ -387,11 +387,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl javaCommand = Environment.JAVA_HOME.$() + "/bin/java" } - val commands = List[String](javaCommand + + val commands = List[String](javaCommand + " -server " + JAVA_OPTS + " " + args.amClass + - " --class " + args.userClass + + " --class " + args.userClass + " --jar " + args.userJar + userArgsToString(args) + " --worker-memory " + args.workerMemory + @@ -421,9 +421,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl super.submitApplication(appContext) } - def monitorApplication(appId: ApplicationId): Boolean = { + def monitorApplication(appId: ApplicationId): Boolean = { + val interval = new SparkConf().getOrElse("spark.yarn.report.interval", "1000").toLong + while (true) { - Thread.sleep(1000) + Thread.sleep(interval) val report = super.getApplicationReport(appId) logInfo("Application report from ASM: \n" + @@ -443,7 +445,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val state = report.getYarnApplicationState() val dsStatus = report.getFinalApplicationStatus() - if (state == YarnApplicationState.FINISHED || + if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { return true @@ -479,25 +481,25 @@ object Client { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) // If log4j present, ensure ours overrides all others if (addLog4j) { - Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + LOG4J_PROP) } // Normally the users app.jar is last in case conflicts with spark jars - val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false") + val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false") .toBoolean if (userClasspathFirst) { - Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + APP_JAR) } - Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + SPARK_JAR) Client.populateHadoopClasspath(conf, env) if (!userClasspathFirst) { - Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + APP_JAR) } - Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + "*") } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index b3a7886d93..e9e46a193b 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -33,7 +33,7 @@ class ClientArguments(val args: Array[String]) { var workerMemory = 1024 var workerCores = 1 var numWorkers = 2 - var amQueue = System.getProperty("QUEUE", "default") + var amQueue = conf.getOrElse("QUEUE", "default") var amMemory: Int = 512 var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster" var appName: String = "Spark" diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 69038844bb..c1e79cbe66 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -162,8 +162,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte Thread.sleep(100) } } - System.setProperty("spark.driver.host", driverHost) - System.setProperty("spark.driver.port", driverPort.toString) + conf.set("spark.driver.host", driverHost) + conf.set("spark.driver.port", driverPort.toString) val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index a01657c9fa..5966a0f757 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -239,7 +239,7 @@ private[yarn] class YarnAllocationHandler( // (workerIdCounter) val workerId = workerIdCounter.incrementAndGet().toString val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( - System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), + conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) logInfo("launching container on " + containerId + " host " + workerHostname) diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 350fc760a4..4b69f5078b 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -53,8 +53,8 @@ private[spark] class YarnClientSchedulerBackend( if (workerNumber == null) workerNumber = defaultWorkerNumber - val driverHost = System.getProperty("spark.driver.host") - val driverPort = System.getProperty("spark.driver.port") + val driverHost = conf.get("spark.driver.host") + val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort val argsArray = Array[String]( |