From ee96e9406613e621837360b15c34ea7c7220a7a3 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Sun, 8 Jun 2014 12:27:34 -0700 Subject: SPARK-1898: In deploy.yarn.Client, use YarnClient not YarnClientImpl https://issues.apache.org/jira/browse/SPARK-1898 Author: Colin Patrick McCabe Closes #850 from cmccabe/master and squashes the following commits: d66eddc [Colin Patrick McCabe] SPARK-1898: In deploy.yarn.Client, use YarnClient rather than YarnClientImpl --- .../cluster/YarnClientSchedulerBackend.scala | 2 +- .../org/apache/spark/deploy/yarn/Client.scala | 25 ++++++++++++++-------- 2 files changed, 17 insertions(+), 10 deletions(-) (limited to 'yarn') diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index e01ed5a57d..039cf4f276 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -112,7 +112,7 @@ private[spark] class YarnClientSchedulerBackend( override def stop() { super.stop() - client.stop() + client.stop logInfo("Stopped") } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1b6bfb42a5..393edd1f2d 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl +import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, Records} @@ -37,7 +37,9 @@ import org.apache.spark.{Logging, SparkConf} * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API. */ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf) - extends YarnClientImpl with ClientBase with Logging { + extends ClientBase with Logging { + + val yarnClient = YarnClient.createYarnClient def this(clientArgs: ClientArguments, spConf: SparkConf) = this(clientArgs, new Configuration(), spConf) @@ -53,8 +55,8 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa def runApp(): ApplicationId = { validateArgs() // Initialize and start the client service. - init(yarnConf) - start() + yarnClient.init(yarnConf) + yarnClient.start() // Log details about this YARN cluster (e.g, the number of slave machines/NodeManagers). logClusterResourceDetails() @@ -63,7 +65,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa // interface). // Get a new client application. - val newApp = super.createApplication() + val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() val appId = newAppResponse.getApplicationId() @@ -99,11 +101,11 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa } def logClusterResourceDetails() { - val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics + val clusterMetrics: YarnClusterMetrics = yarnClient.getYarnClusterMetrics logInfo("Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: " + clusterMetrics.getNumNodeManagers) - val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue) + val queueInfo: QueueInfo = yarnClient.getQueueInfo(args.amQueue) logInfo( """Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s, queueApplicationCount = %s, queueChildQueueCount = %s""".format( queueInfo.getQueueName, @@ -132,15 +134,20 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa def submitApp(appContext: ApplicationSubmissionContext) = { // Submit the application to the applications manager. logInfo("Submitting application to ASM") - super.submitApplication(appContext) + yarnClient.submitApplication(appContext) } + def getApplicationReport(appId: ApplicationId) = + yarnClient.getApplicationReport(appId) + + def stop = yarnClient.stop + def monitorApplication(appId: ApplicationId): Boolean = { val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) while (true) { Thread.sleep(interval) - val report = super.getApplicationReport(appId) + val report = yarnClient.getApplicationReport(appId) logInfo("Application report from ASM: \n" + "\t application identifier: " + appId.toString() + "\n" + -- cgit v1.2.3