aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorColin Patrick McCabe <cmccabe@cloudera.com>2014-06-08 12:27:34 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-06-08 12:27:34 -0700
commitee96e9406613e621837360b15c34ea7c7220a7a3 (patch)
tree8039107e31b87a80fabc7efddc6a0c0bb21735a8 /yarn
parenta338834f90556d78119b37985b603ecee85f97ed (diff)
downloadspark-ee96e9406613e621837360b15c34ea7c7220a7a3.tar.gz
spark-ee96e9406613e621837360b15c34ea7c7220a7a3.tar.bz2
spark-ee96e9406613e621837360b15c34ea7c7220a7a3.zip
SPARK-1898: In deploy.yarn.Client, use YarnClient not YarnClientImpl
https://issues.apache.org/jira/browse/SPARK-1898 Author: Colin Patrick McCabe <cmccabe@cloudera.com> Closes #850 from cmccabe/master and squashes the following commits: d66eddc [Colin Patrick McCabe] SPARK-1898: In deploy.yarn.Client, use YarnClient rather than YarnClientImpl
Diffstat (limited to 'yarn')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala2
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala25
2 files changed, 17 insertions, 10 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index e01ed5a57d..039cf4f276 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -112,7 +112,7 @@ private[spark] class YarnClientSchedulerBackend(
override def stop() {
super.stop()
- client.stop()
+ client.stop
logInfo("Stopped")
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1b6bfb42a5..393edd1f2d 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
+import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, Records}
@@ -37,7 +37,9 @@ import org.apache.spark.{Logging, SparkConf}
* Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API.
*/
class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
- extends YarnClientImpl with ClientBase with Logging {
+ extends ClientBase with Logging {
+
+ val yarnClient = YarnClient.createYarnClient
def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, new Configuration(), spConf)
@@ -53,8 +55,8 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
def runApp(): ApplicationId = {
validateArgs()
// Initialize and start the client service.
- init(yarnConf)
- start()
+ yarnClient.init(yarnConf)
+ yarnClient.start()
// Log details about this YARN cluster (e.g, the number of slave machines/NodeManagers).
logClusterResourceDetails()
@@ -63,7 +65,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
// interface).
// Get a new client application.
- val newApp = super.createApplication()
+ val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
val appId = newAppResponse.getApplicationId()
@@ -99,11 +101,11 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
}
def logClusterResourceDetails() {
- val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
+ val clusterMetrics: YarnClusterMetrics = yarnClient.getYarnClusterMetrics
logInfo("Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: " +
clusterMetrics.getNumNodeManagers)
- val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
+ val queueInfo: QueueInfo = yarnClient.getQueueInfo(args.amQueue)
logInfo( """Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s,
queueApplicationCount = %s, queueChildQueueCount = %s""".format(
queueInfo.getQueueName,
@@ -132,15 +134,20 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
def submitApp(appContext: ApplicationSubmissionContext) = {
// Submit the application to the applications manager.
logInfo("Submitting application to ASM")
- super.submitApplication(appContext)
+ yarnClient.submitApplication(appContext)
}
+ def getApplicationReport(appId: ApplicationId) =
+ yarnClient.getApplicationReport(appId)
+
+ def stop = yarnClient.stop
+
def monitorApplication(appId: ApplicationId): Boolean = {
val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
while (true) {
Thread.sleep(interval)
- val report = super.getApplicationReport(appId)
+ val report = yarnClient.getApplicationReport(appId)
logInfo("Application report from ASM: \n" +
"\t application identifier: " + appId.toString() + "\n" +