aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala56
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala50
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala2
3 files changed, 54 insertions, 54 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 1dd38dd13e..dc9228180f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -43,7 +43,7 @@ import org.apache.spark.util.Utils
class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
def this(args: ApplicationMasterArguments) = this(args, new Configuration())
-
+
private var rpc: YarnRPC = YarnRPC.create(conf)
private var resourceManager: AMRMProtocol = _
private var appAttemptId: ApplicationAttemptId = _
@@ -68,7 +68,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
-
+
appAttemptId = getApplicationAttemptId()
isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
resourceManager = registerWithResourceManager()
@@ -92,11 +92,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// }
//}
// org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
-
+
ApplicationMaster.register(this)
// Start the user's JAR
userThread = startUserClass()
-
+
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
waitForSparkMaster()
@@ -105,11 +105,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Do this after spark master is up and SparkContext is created so that we can register UI Url
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
-
+
// Allocate all containers
allocateWorkers()
-
- // Wait for the user class to Finish
+
+ // Wait for the user class to Finish
userThread.join()
System.exit(0)
@@ -129,7 +129,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
localDirs
}
-
+
private def getApplicationAttemptId(): ApplicationAttemptId = {
val envs = System.getenv()
val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
@@ -138,7 +138,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("ApplicationAttemptId: " + appAttemptId)
appAttemptId
}
-
+
private def registerWithResourceManager(): AMRMProtocol = {
val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
@@ -146,26 +146,26 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("Connecting to ResourceManager at " + rmAddress)
rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
}
-
+
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
logInfo("Registering the ApplicationMaster")
val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
.asInstanceOf[RegisterApplicationMasterRequest]
appMasterRequest.setApplicationAttemptId(appAttemptId)
// Setting this to master host,port - so that the ApplicationReport at client has some
- // sensible info.
+ // sensible info.
// Users can then monitor stderr/stdout on that node if required.
appMasterRequest.setHost(Utils.localHostName())
appMasterRequest.setRpcPort(0)
appMasterRequest.setTrackingUrl(uiAddress)
resourceManager.registerApplicationMaster(appMasterRequest)
}
-
+
private def waitForSparkMaster() {
logInfo("Waiting for spark driver to be reachable.")
var driverUp = false
var tries = 0
- val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt
+ val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt
while(!driverUp && tries < numTries) {
val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
@@ -226,7 +226,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
ApplicationMaster.sparkContextRef.synchronized {
var count = 0
val waitTime = 10000L
- val numTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+ val numTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt
while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
logInfo("Waiting for spark context initialization ... " + count)
count = count + 1
@@ -241,8 +241,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
yarnConf,
resourceManager,
appAttemptId,
- args,
- sparkContext.preferredNodeLocationData)
+ args,
+ sparkContext.preferredNodeLocationData)
} else {
logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".
format(count * waitTime, numTries))
@@ -294,7 +294,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// we want to be reasonably responsive without causing too many requests to RM.
val schedulerInterval =
- conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+ conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)
@@ -342,11 +342,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
for (container <- containers) {
logInfo("Launching shell command on a new container."
+ ", containerId=" + container.getId()
- + ", containerNode=" + container.getNodeId().getHost()
+ + ", containerNode=" + container.getNodeId().getHost()
+ ":" + container.getNodeId().getPort()
+ ", containerNodeURI=" + container.getNodeHttpAddress()
+ ", containerState" + container.getState()
- + ", containerResourceMemory"
+ + ", containerResourceMemory"
+ container.getResource().getMemory())
}
}
@@ -372,12 +372,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
/**
- * Clean up the staging directory.
+ * Clean up the staging directory.
*/
- private def cleanupStagingDir() {
+ private def cleanupStagingDir() {
var stagingDirPath: Path = null
try {
- val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean
+ val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean
if (!preserveFiles) {
stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
if (stagingDirPath == null) {
@@ -393,7 +393,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
- // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
+ // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
def run() {
@@ -446,18 +446,18 @@ object ApplicationMaster {
// Note that this will unfortunately not properly clean up the staging files because it gets
// called too late, after the filesystem is already shutdown.
if (modified) {
- Runtime.getRuntime().addShutdownHook(new Thread with Logging {
+ Runtime.getRuntime().addShutdownHook(new Thread with Logging {
// This is not only logs, but also ensures that log system is initialized for this instance
// when we are actually 'run'-ing.
logInfo("Adding shutdown hook for context " + sc)
- override def run() {
- logInfo("Invoking sc stop from shutdown hook")
- sc.stop()
+ override def run() {
+ logInfo("Invoking sc stop from shutdown hook")
+ sc.stop()
// Best case ...
for (master <- applicationMasters) {
master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
}
- }
+ }
} )
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 29892e98e3..cc150888eb 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, Records}
-import org.apache.spark.Logging
+import org.apache.spark.Logging
import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil
@@ -59,7 +59,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
// App files are world-wide readable and owner writable -> rw-r--r--
- val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
+ val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
// for client user who want to monitor app status by itself.
def runApp() = {
@@ -103,7 +103,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
"greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
(args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size " +
"must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD)
- ).foreach { case(cond, errStr) =>
+ ).foreach { case(cond, errStr) =>
if (cond) {
logError(errStr)
args.printUsageAndExit(1)
@@ -130,7 +130,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
queueInfo.getChildQueues.size))
}
- def verifyClusterResources(app: GetNewApplicationResponse) = {
+ def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory()
logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
@@ -146,7 +146,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
// We could add checks to make sure the entire cluster has enough resources but that involves
- // getting all the node reports and computing ourselves
+ // getting all the node reports and computing ourselves
}
def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
@@ -207,7 +207,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
fs.setReplication(newPath, replication)
if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
- }
+ }
// Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
// version shows the specific version in the distributed cache configuration
val qualPath = fs.makeQualified(newPath)
@@ -230,7 +230,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
}
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
- val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort
+ val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort
if (UserGroupInformation.isSecurityEnabled()) {
val dstFs = dst.getFileSystem(conf)
@@ -241,7 +241,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
+ Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
.foreach { case(destName, _localPath) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
@@ -253,7 +253,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
destName, statCache)
}
}
@@ -265,7 +265,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
linkname, statCache, true)
}
}
@@ -277,7 +277,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
linkname, statCache)
}
}
@@ -289,7 +289,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
linkname, statCache)
}
}
@@ -299,7 +299,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
def setupLaunchEnv(
- localResources: HashMap[String, LocalResource],
+ localResources: HashMap[String, LocalResource],
stagingDir: String): HashMap[String, String] = {
logInfo("Setting up the launch environment")
val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null)
@@ -354,7 +354,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// Add Xmx for am memory
JAVA_OPTS += "-Xmx" + amMemory + "m "
- JAVA_OPTS += " -Djava.io.tmpdir=" +
+ JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
// Commenting it out for now - so that people can refer to the properties if required. Remove
@@ -387,11 +387,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
}
- val commands = List[String](javaCommand +
+ val commands = List[String](javaCommand +
" -server " +
JAVA_OPTS +
" " + args.amClass +
- " --class " + args.userClass +
+ " --class " + args.userClass +
" --jar " + args.userJar +
userArgsToString(args) +
" --worker-memory " + args.workerMemory +
@@ -421,7 +421,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
super.submitApplication(appContext)
}
- def monitorApplication(appId: ApplicationId): Boolean = {
+ def monitorApplication(appId: ApplicationId): Boolean = {
while (true) {
Thread.sleep(1000)
val report = super.getApplicationReport(appId)
@@ -443,7 +443,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val state = report.getYarnApplicationState()
val dsStatus = report.getFinalApplicationStatus()
- if (state == YarnApplicationState.FINISHED ||
+ if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
return true
@@ -461,7 +461,7 @@ object Client {
def main(argStrings: Array[String]) {
// Set an env variable indicating we are running in YARN mode.
// Note that anything with SPARK prefix gets propagated to all (remote) processes
- conf.set("SPARK_YARN_MODE", "true")
+ System.setProperty("SPARK_YARN_MODE", "true")
val args = new ClientArguments(argStrings)
@@ -479,25 +479,25 @@ object Client {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
// If log4j present, ensure ours overrides all others
if (addLog4j) {
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + LOG4J_PROP)
}
// Normally the users app.jar is last in case conflicts with spark jars
- val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false")
+ val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
}
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + SPARK_JAR)
Client.populateHadoopClasspath(conf, env)
if (!userClasspathFirst) {
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
}
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "*")
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 617289f568..e9e46a193b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -33,7 +33,7 @@ class ClientArguments(val args: Array[String]) {
var workerMemory = 1024
var workerCores = 1
var numWorkers = 2
- var amQueue = conf.getOrElse("QUEUE", "default")
+ var amQueue = conf.getOrElse("QUEUE", "default")
var amMemory: Int = 512
var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
var appName: String = "Spark"