diff options
Diffstat (limited to 'yarn/alpha')
-rw-r--r-- | yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 145 |
1 files changed, 59 insertions, 86 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index aff9ab71f0..5a20532315 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -23,13 +23,11 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api._ -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.YarnClientImpl import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.ipc.YarnRPC -import org.apache.hadoop.yarn.util.{Apps, Records} +import org.apache.hadoop.yarn.util.Records import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil @@ -37,7 +35,10 @@ import org.apache.spark.deploy.SparkHadoopUtil /** * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's alpha API. */ -class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf) +private[spark] class Client( + val args: ClientArguments, + val hadoopConf: Configuration, + val sparkConf: SparkConf) extends YarnClientImpl with ClientBase with Logging { def this(clientArgs: ClientArguments, spConf: SparkConf) = @@ -45,112 +46,86 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf()) - val args = clientArgs - val conf = hadoopConf - val sparkConf = spConf - var rpc: YarnRPC = YarnRPC.create(conf) - val yarnConf: YarnConfiguration = new YarnConfiguration(conf) + val yarnConf: YarnConfiguration = new YarnConfiguration(hadoopConf) + /* ------------------------------------------------------------------------------------- * + | The following methods have much in common in the stable and alpha versions of Client, | + | but cannot be implemented in the parent trait due to subtle API differences across | + | hadoop versions. | + * ------------------------------------------------------------------------------------- */ - // for client user who want to monitor app status by itself. - def runApp() = { - validateArgs() - + /** Submit an application running our ApplicationMaster to the ResourceManager. */ + override def submitApplication(): ApplicationId = { init(yarnConf) start() - logClusterResourceDetails() - val newApp = super.getNewApplication() - val appId = newApp.getApplicationId() + logInfo("Requesting a new application from cluster with %d NodeManagers" + .format(getYarnClusterMetrics.getNumNodeManagers)) - verifyClusterResources(newApp) - val appContext = createApplicationSubmissionContext(appId) - val appStagingDir = getAppStagingDir(appId) - val localResources = prepareLocalResources(appStagingDir) - val env = setupLaunchEnv(localResources, appStagingDir) - val amContainer = createContainerLaunchContext(newApp, localResources, env) + // Get a new application from our RM + val newAppResponse = getNewApplication() + val appId = newAppResponse.getApplicationId() - val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] - // Memory for the ApplicationMaster. - capability.setMemory(args.amMemory + memoryOverhead) - amContainer.setResource(capability) + // Verify whether the cluster has enough resources for our AM + verifyClusterResources(newAppResponse) - appContext.setQueue(args.amQueue) - appContext.setAMContainerSpec(amContainer) - appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName()) + // Set up the appropriate contexts to launch our AM + val containerContext = createContainerLaunchContext(newAppResponse) + val appContext = createApplicationSubmissionContext(appId, containerContext) - submitApp(appContext) + // Finally, submit and monitor the application + logInfo(s"Submitting application ${appId.getId} to ResourceManager") + submitApplication(appContext) appId } - def run() { - val appId = runApp() - monitorApplication(appId) - } - - def logClusterResourceDetails() { - val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics - logInfo("Got cluster metric info from ASM, numNodeManagers = " + - clusterMetrics.getNumNodeManagers) + /** + * Set up a context for launching our ApplicationMaster container. + * In the Yarn alpha API, the memory requirements of this container must be set in + * the ContainerLaunchContext instead of the ApplicationSubmissionContext. + */ + override def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) + : ContainerLaunchContext = { + val containerContext = super.createContainerLaunchContext(newAppResponse) + val capability = Records.newRecord(classOf[Resource]) + capability.setMemory(args.amMemory + amMemoryOverhead) + containerContext.setResource(capability) + containerContext } - - def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = { - logInfo("Setting up application submission context for ASM") + /** Set up the context for submitting our ApplicationMaster. */ + def createApplicationSubmissionContext( + appId: ApplicationId, + containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) appContext.setApplicationId(appId) appContext.setApplicationName(args.appName) + appContext.setQueue(args.amQueue) + appContext.setAMContainerSpec(containerContext) + appContext.setUser(UserGroupInformation.getCurrentUser.getShortUserName) appContext } - def setupSecurityToken(amContainer: ContainerLaunchContext) = { - // Setup security tokens. + /** + * Set up security tokens for launching our ApplicationMaster container. + * ContainerLaunchContext#setContainerTokens is renamed `setTokens` in the stable API. + */ + override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = { val dob = new DataOutputBuffer() credentials.writeTokenStorageToStream(dob) amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData())) } - def submitApp(appContext: ApplicationSubmissionContext) = { - // Submit the application to the applications manager. - logInfo("Submitting application to ASM") - super.submitApplication(appContext) - } - - def monitorApplication(appId: ApplicationId): Boolean = { - val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) - - while (true) { - Thread.sleep(interval) - val report = super.getApplicationReport(appId) - - logInfo("Application report from ASM: \n" + - "\t application identifier: " + appId.toString() + "\n" + - "\t appId: " + appId.getId() + "\n" + - "\t clientToken: " + report.getClientToken() + "\n" + - "\t appDiagnostics: " + report.getDiagnostics() + "\n" + - "\t appMasterHost: " + report.getHost() + "\n" + - "\t appQueue: " + report.getQueue() + "\n" + - "\t appMasterRpcPort: " + report.getRpcPort() + "\n" + - "\t appStartTime: " + report.getStartTime() + "\n" + - "\t yarnAppState: " + report.getYarnApplicationState() + "\n" + - "\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" + - "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" + - "\t appUser: " + report.getUser() - ) - - val state = report.getYarnApplicationState() - if (state == YarnApplicationState.FINISHED || - state == YarnApplicationState.FAILED || - state == YarnApplicationState.KILLED) { - return true - } - } - true - } + /** + * Return the security token used by this client to communicate with the ApplicationMaster. + * If no security is enabled, the token returned by the report is null. + * ApplicationReport#getClientToken is renamed `getClientToAMToken` in the stable API. + */ + override def getClientToken(report: ApplicationReport): String = + Option(report.getClientToken).getOrElse("") } object Client { - def main(argStrings: Array[String]) { if (!sys.props.contains("SPARK_SUBMIT")) { println("WARNING: This client is deprecated and will be removed in a " + @@ -158,19 +133,17 @@ object Client { } // Set an env variable indicating we are running in YARN mode. - // Note that anything with SPARK prefix gets propagated to all (remote) processes + // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes System.setProperty("SPARK_YARN_MODE", "true") - val sparkConf = new SparkConf try { val args = new ClientArguments(argStrings, sparkConf) new Client(args, sparkConf).run() } catch { - case e: Exception => { + case e: Exception => Console.err.println(e.getMessage) System.exit(1) - } } System.exit(0) |