aboutsummaryrefslogtreecommitdiff
path: root/core/src/hadoop2-yarn/scala
diff options
context:
space:
mode:
authorMridul Muralidharan <mridul@gmail.com>2013-04-24 02:31:57 +0530
committerMridul Muralidharan <mridul@gmail.com>2013-04-24 02:31:57 +0530
commit8faf5c51c3ea0b3ad83418552b50db596fefc558 (patch)
tree601250c385684ab233320550ba2883b7a6e9bbda /core/src/hadoop2-yarn/scala
parentb11058f42c1c9c66ea94d3732c2efbdb57cb42b6 (diff)
downloadspark-8faf5c51c3ea0b3ad83418552b50db596fefc558.tar.gz
spark-8faf5c51c3ea0b3ad83418552b50db596fefc558.tar.bz2
spark-8faf5c51c3ea0b3ad83418552b50db596fefc558.zip
Patch from Thomas Graves to improve the YARN Client, and move to more production ready hadoop yarn branch
Diffstat (limited to 'core/src/hadoop2-yarn/scala')
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala72
1 files changed, 9 insertions, 63 deletions
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
index c007dae98c..7a881e26df 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
@@ -7,6 +7,7 @@ import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.client.YarnClientImpl
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import scala.collection.mutable.HashMap
@@ -16,19 +17,19 @@ import org.apache.hadoop.yarn.util.{Apps, Records, ConverterUtils}
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import spark.deploy.SparkHadoopUtil
-class Client(conf: Configuration, args: ClientArguments) extends Logging {
+class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
def this(args: ClientArguments) = this(new Configuration(), args)
- var applicationsManager: ClientRMProtocol = null
var rpc: YarnRPC = YarnRPC.create(conf)
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
def run() {
- connectToASM()
+ init(yarnConf)
+ start()
logClusterResourceDetails()
- val newApp = getNewApplication()
+ val newApp = super.getNewApplication()
val appId = newApp.getApplicationId()
verifyClusterResources(newApp)
@@ -47,64 +48,17 @@ class Client(conf: Configuration, args: ClientArguments) extends Logging {
System.exit(0)
}
-
- def connectToASM() {
- val rmAddress: InetSocketAddress = NetUtils.createSocketAddr(
- yarnConf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)
- )
- logInfo("Connecting to ResourceManager at" + rmAddress)
- applicationsManager = rpc.getProxy(classOf[ClientRMProtocol], rmAddress, conf)
- .asInstanceOf[ClientRMProtocol]
- }
def logClusterResourceDetails() {
- val clusterMetrics: YarnClusterMetrics = getYarnClusterMetrics
+ val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
logInfo("Got Cluster metric info from ASM, numNodeManagers=" + clusterMetrics.getNumNodeManagers)
-/*
- val clusterNodeReports: List[NodeReport] = getNodeReports
- logDebug("Got Cluster node info from ASM")
- for (node <- clusterNodeReports) {
- logDebug("Got node report from ASM for, nodeId=" + node.getNodeId + ", nodeAddress=" + node.getHttpAddress +
- ", nodeRackName=" + node.getRackName + ", nodeNumContainers=" + node.getNumContainers + ", nodeHealthStatus=" + node.getNodeHealthStatus)
- }
-*/
-
- val queueInfo: QueueInfo = getQueueInfo(args.amQueue)
+ val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
logInfo("Queue info .. queueName=" + queueInfo.getQueueName + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity +
", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size +
", queueChildQueueCount=" + queueInfo.getChildQueues.size)
}
- def getYarnClusterMetrics: YarnClusterMetrics = {
- val request: GetClusterMetricsRequest = Records.newRecord(classOf[GetClusterMetricsRequest])
- val response: GetClusterMetricsResponse = applicationsManager.getClusterMetrics(request)
- return response.getClusterMetrics
- }
-
- def getNodeReports: List[NodeReport] = {
- val request: GetClusterNodesRequest = Records.newRecord(classOf[GetClusterNodesRequest])
- val response: GetClusterNodesResponse = applicationsManager.getClusterNodes(request)
- return response.getNodeReports.toList
- }
-
- def getQueueInfo(queueName: String): QueueInfo = {
- val request: GetQueueInfoRequest = Records.newRecord(classOf[GetQueueInfoRequest])
- request.setQueueName(queueName)
- request.setIncludeApplications(true)
- request.setIncludeChildQueues(false)
- request.setRecursive(false)
- Records.newRecord(classOf[GetQueueInfoRequest])
- return applicationsManager.getQueueInfo(request).getQueueInfo
- }
-
- def getNewApplication(): GetNewApplicationResponse = {
- logInfo("Requesting new Application")
- val request = Records.newRecord(classOf[GetNewApplicationRequest])
- val response = applicationsManager.getNewApplication(request)
- logInfo("Got new ApplicationId: " + response.getApplicationId())
- return response
- }
def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory()
@@ -265,23 +219,15 @@ class Client(conf: Configuration, args: ClientArguments) extends Logging {
}
def submitApp(appContext: ApplicationSubmissionContext) = {
- // Create the request to send to the applications manager
- val appRequest = Records.newRecord(classOf[SubmitApplicationRequest])
- .asInstanceOf[SubmitApplicationRequest]
- appRequest.setApplicationSubmissionContext(appContext)
// Submit the application to the applications manager
logInfo("Submitting application to ASM")
- applicationsManager.submitApplication(appRequest)
+ super.submitApplication(appContext)
}
def monitorApplication(appId: ApplicationId): Boolean = {
while(true) {
Thread.sleep(1000)
- val reportRequest = Records.newRecord(classOf[GetApplicationReportRequest])
- .asInstanceOf[GetApplicationReportRequest]
- reportRequest.setApplicationId(appId)
- val reportResponse = applicationsManager.getApplicationReport(reportRequest)
- val report = reportResponse.getApplicationReport()
+ val report = super.getApplicationReport(appId)
logInfo("Application report from ASM: \n" +
"\t application identifier: " + appId.toString() + "\n" +