aboutsummaryrefslogtreecommitdiff
path: root/yarn/alpha
diff options
context:
space:
mode:
Diffstat (limited to 'yarn/alpha')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala145
1 files changed, 59 insertions, 86 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index aff9ab71f0..5a20532315 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -23,13 +23,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.YarnClientImpl
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, Records}
+import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
@@ -37,7 +35,10 @@ import org.apache.spark.deploy.SparkHadoopUtil
/**
* Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's alpha API.
*/
-class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
+private[spark] class Client(
+ val args: ClientArguments,
+ val hadoopConf: Configuration,
+ val sparkConf: SparkConf)
extends YarnClientImpl with ClientBase with Logging {
def this(clientArgs: ClientArguments, spConf: SparkConf) =
@@ -45,112 +46,86 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
- val args = clientArgs
- val conf = hadoopConf
- val sparkConf = spConf
- var rpc: YarnRPC = YarnRPC.create(conf)
- val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+ val yarnConf: YarnConfiguration = new YarnConfiguration(hadoopConf)
+ /* ------------------------------------------------------------------------------------- *
+ | The following methods have much in common in the stable and alpha versions of Client, |
+ | but cannot be implemented in the parent trait due to subtle API differences across |
+ | hadoop versions. |
+ * ------------------------------------------------------------------------------------- */
- // for client user who want to monitor app status by itself.
- def runApp() = {
- validateArgs()
-
+ /** Submit an application running our ApplicationMaster to the ResourceManager. */
+ override def submitApplication(): ApplicationId = {
init(yarnConf)
start()
- logClusterResourceDetails()
- val newApp = super.getNewApplication()
- val appId = newApp.getApplicationId()
+ logInfo("Requesting a new application from cluster with %d NodeManagers"
+ .format(getYarnClusterMetrics.getNumNodeManagers))
- verifyClusterResources(newApp)
- val appContext = createApplicationSubmissionContext(appId)
- val appStagingDir = getAppStagingDir(appId)
- val localResources = prepareLocalResources(appStagingDir)
- val env = setupLaunchEnv(localResources, appStagingDir)
- val amContainer = createContainerLaunchContext(newApp, localResources, env)
+ // Get a new application from our RM
+ val newAppResponse = getNewApplication()
+ val appId = newAppResponse.getApplicationId()
- val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
- // Memory for the ApplicationMaster.
- capability.setMemory(args.amMemory + memoryOverhead)
- amContainer.setResource(capability)
+ // Verify whether the cluster has enough resources for our AM
+ verifyClusterResources(newAppResponse)
- appContext.setQueue(args.amQueue)
- appContext.setAMContainerSpec(amContainer)
- appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
+ // Set up the appropriate contexts to launch our AM
+ val containerContext = createContainerLaunchContext(newAppResponse)
+ val appContext = createApplicationSubmissionContext(appId, containerContext)
- submitApp(appContext)
+ // Finally, submit and monitor the application
+ logInfo(s"Submitting application ${appId.getId} to ResourceManager")
+ submitApplication(appContext)
appId
}
- def run() {
- val appId = runApp()
- monitorApplication(appId)
- }
-
- def logClusterResourceDetails() {
- val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
- logInfo("Got cluster metric info from ASM, numNodeManagers = " +
- clusterMetrics.getNumNodeManagers)
+ /**
+ * Set up a context for launching our ApplicationMaster container.
+ * In the Yarn alpha API, the memory requirements of this container must be set in
+ * the ContainerLaunchContext instead of the ApplicationSubmissionContext.
+ */
+ override def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
+ : ContainerLaunchContext = {
+ val containerContext = super.createContainerLaunchContext(newAppResponse)
+ val capability = Records.newRecord(classOf[Resource])
+ capability.setMemory(args.amMemory + amMemoryOverhead)
+ containerContext.setResource(capability)
+ containerContext
}
-
- def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
- logInfo("Setting up application submission context for ASM")
+ /** Set up the context for submitting our ApplicationMaster. */
+ def createApplicationSubmissionContext(
+ appId: ApplicationId,
+ containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
appContext.setApplicationId(appId)
appContext.setApplicationName(args.appName)
+ appContext.setQueue(args.amQueue)
+ appContext.setAMContainerSpec(containerContext)
+ appContext.setUser(UserGroupInformation.getCurrentUser.getShortUserName)
appContext
}
- def setupSecurityToken(amContainer: ContainerLaunchContext) = {
- // Setup security tokens.
+ /**
+ * Set up security tokens for launching our ApplicationMaster container.
+ * ContainerLaunchContext#setContainerTokens is renamed `setTokens` in the stable API.
+ */
+ override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
}
- def submitApp(appContext: ApplicationSubmissionContext) = {
- // Submit the application to the applications manager.
- logInfo("Submitting application to ASM")
- super.submitApplication(appContext)
- }
-
- def monitorApplication(appId: ApplicationId): Boolean = {
- val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
-
- while (true) {
- Thread.sleep(interval)
- val report = super.getApplicationReport(appId)
-
- logInfo("Application report from ASM: \n" +
- "\t application identifier: " + appId.toString() + "\n" +
- "\t appId: " + appId.getId() + "\n" +
- "\t clientToken: " + report.getClientToken() + "\n" +
- "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
- "\t appMasterHost: " + report.getHost() + "\n" +
- "\t appQueue: " + report.getQueue() + "\n" +
- "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
- "\t appStartTime: " + report.getStartTime() + "\n" +
- "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
- "\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" +
- "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
- "\t appUser: " + report.getUser()
- )
-
- val state = report.getYarnApplicationState()
- if (state == YarnApplicationState.FINISHED ||
- state == YarnApplicationState.FAILED ||
- state == YarnApplicationState.KILLED) {
- return true
- }
- }
- true
- }
+ /**
+ * Return the security token used by this client to communicate with the ApplicationMaster.
+ * If no security is enabled, the token returned by the report is null.
+ * ApplicationReport#getClientToken is renamed `getClientToAMToken` in the stable API.
+ */
+ override def getClientToken(report: ApplicationReport): String =
+ Option(report.getClientToken).getOrElse("")
}
object Client {
-
def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a " +
@@ -158,19 +133,17 @@ object Client {
}
// Set an env variable indicating we are running in YARN mode.
- // Note that anything with SPARK prefix gets propagated to all (remote) processes
+ // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
System.setProperty("SPARK_YARN_MODE", "true")
-
val sparkConf = new SparkConf
try {
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
} catch {
- case e: Exception => {
+ case e: Exception =>
Console.err.println(e.getMessage)
System.exit(1)
- }
}
System.exit(0)