aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMridul Muralidharan <mridul@gmail.com>2013-04-24 02:31:57 +0530
committerMridul Muralidharan <mridul@gmail.com>2013-04-24 02:31:57 +0530
commit8faf5c51c3ea0b3ad83418552b50db596fefc558 (patch)
tree601250c385684ab233320550ba2883b7a6e9bbda
parentb11058f42c1c9c66ea94d3732c2efbdb57cb42b6 (diff)
downloadspark-8faf5c51c3ea0b3ad83418552b50db596fefc558.tar.gz
spark-8faf5c51c3ea0b3ad83418552b50db596fefc558.tar.bz2
spark-8faf5c51c3ea0b3ad83418552b50db596fefc558.zip
Patch from Thomas Graves to improve the YARN Client, and move to more production ready hadoop yarn branch
-rw-r--r--core/pom.xml5
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala72
-rw-r--r--pom.xml9
-rw-r--r--project/SparkBuild.scala5
-rw-r--r--repl-bin/pom.xml5
5 files changed, 30 insertions, 66 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 9baa447662..7f65ce5c00 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -297,6 +297,11 @@
<artifactId>hadoop-yarn-common</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
index c007dae98c..7a881e26df 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
@@ -7,6 +7,7 @@ import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.client.YarnClientImpl
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import scala.collection.mutable.HashMap
@@ -16,19 +17,19 @@ import org.apache.hadoop.yarn.util.{Apps, Records, ConverterUtils}
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import spark.deploy.SparkHadoopUtil
-class Client(conf: Configuration, args: ClientArguments) extends Logging {
+class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
def this(args: ClientArguments) = this(new Configuration(), args)
- var applicationsManager: ClientRMProtocol = null
var rpc: YarnRPC = YarnRPC.create(conf)
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
def run() {
- connectToASM()
+ init(yarnConf)
+ start()
logClusterResourceDetails()
- val newApp = getNewApplication()
+ val newApp = super.getNewApplication()
val appId = newApp.getApplicationId()
verifyClusterResources(newApp)
@@ -47,64 +48,17 @@ class Client(conf: Configuration, args: ClientArguments) extends Logging {
System.exit(0)
}
-
- def connectToASM() {
- val rmAddress: InetSocketAddress = NetUtils.createSocketAddr(
- yarnConf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)
- )
- logInfo("Connecting to ResourceManager at" + rmAddress)
- applicationsManager = rpc.getProxy(classOf[ClientRMProtocol], rmAddress, conf)
- .asInstanceOf[ClientRMProtocol]
- }
def logClusterResourceDetails() {
- val clusterMetrics: YarnClusterMetrics = getYarnClusterMetrics
+ val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
logInfo("Got Cluster metric info from ASM, numNodeManagers=" + clusterMetrics.getNumNodeManagers)
-/*
- val clusterNodeReports: List[NodeReport] = getNodeReports
- logDebug("Got Cluster node info from ASM")
- for (node <- clusterNodeReports) {
- logDebug("Got node report from ASM for, nodeId=" + node.getNodeId + ", nodeAddress=" + node.getHttpAddress +
- ", nodeRackName=" + node.getRackName + ", nodeNumContainers=" + node.getNumContainers + ", nodeHealthStatus=" + node.getNodeHealthStatus)
- }
-*/
-
- val queueInfo: QueueInfo = getQueueInfo(args.amQueue)
+ val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
logInfo("Queue info .. queueName=" + queueInfo.getQueueName + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity +
", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size +
", queueChildQueueCount=" + queueInfo.getChildQueues.size)
}
- def getYarnClusterMetrics: YarnClusterMetrics = {
- val request: GetClusterMetricsRequest = Records.newRecord(classOf[GetClusterMetricsRequest])
- val response: GetClusterMetricsResponse = applicationsManager.getClusterMetrics(request)
- return response.getClusterMetrics
- }
-
- def getNodeReports: List[NodeReport] = {
- val request: GetClusterNodesRequest = Records.newRecord(classOf[GetClusterNodesRequest])
- val response: GetClusterNodesResponse = applicationsManager.getClusterNodes(request)
- return response.getNodeReports.toList
- }
-
- def getQueueInfo(queueName: String): QueueInfo = {
- val request: GetQueueInfoRequest = Records.newRecord(classOf[GetQueueInfoRequest])
- request.setQueueName(queueName)
- request.setIncludeApplications(true)
- request.setIncludeChildQueues(false)
- request.setRecursive(false)
- Records.newRecord(classOf[GetQueueInfoRequest])
- return applicationsManager.getQueueInfo(request).getQueueInfo
- }
-
- def getNewApplication(): GetNewApplicationResponse = {
- logInfo("Requesting new Application")
- val request = Records.newRecord(classOf[GetNewApplicationRequest])
- val response = applicationsManager.getNewApplication(request)
- logInfo("Got new ApplicationId: " + response.getApplicationId())
- return response
- }
def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory()
@@ -265,23 +219,15 @@ class Client(conf: Configuration, args: ClientArguments) extends Logging {
}
def submitApp(appContext: ApplicationSubmissionContext) = {
- // Create the request to send to the applications manager
- val appRequest = Records.newRecord(classOf[SubmitApplicationRequest])
- .asInstanceOf[SubmitApplicationRequest]
- appRequest.setApplicationSubmissionContext(appContext)
// Submit the application to the applications manager
logInfo("Submitting application to ASM")
- applicationsManager.submitApplication(appRequest)
+ super.submitApplication(appContext)
}
def monitorApplication(appId: ApplicationId): Boolean = {
while(true) {
Thread.sleep(1000)
- val reportRequest = Records.newRecord(classOf[GetApplicationReportRequest])
- .asInstanceOf[GetApplicationReportRequest]
- reportRequest.setApplicationId(appId)
- val reportResponse = applicationsManager.getApplicationReport(reportRequest)
- val report = reportResponse.getApplicationReport()
+ val report = super.getApplicationReport(appId)
logInfo("Application report from ASM: \n" +
"\t application identifier: " + appId.toString() + "\n" +
diff --git a/pom.xml b/pom.xml
index ecbfaf9b47..0e95520d50 100644
--- a/pom.xml
+++ b/pom.xml
@@ -564,7 +564,9 @@
<id>hadoop2-yarn</id>
<properties>
<hadoop.major.version>2</hadoop.major.version>
- <yarn.version>2.0.2-alpha</yarn.version>
+ <!-- 0.23.* is same as 2.0.* - except hardened to run production jobs -->
+ <yarn.version>0.23.7</yarn.version>
+ <!-- <yarn.version>2.0.2-alpha</yarn.version> -->
</properties>
<repositories>
@@ -599,6 +601,11 @@
<artifactId>hadoop-yarn-common</artifactId>
<version>${yarn.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <version>${yarn.version}</version>
+ </dependency>
<!-- Specify Avro version because Kafka also has it as a dependency -->
<dependency>
<groupId>org.apache.avro</groupId>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 0a5b89d927..819e940403 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -20,7 +20,7 @@ object SparkBuild extends Build {
//val HADOOP_YARN = false
// For Hadoop 2 YARN support
- val HADOOP_VERSION = "2.0.2-alpha"
+ val HADOOP_VERSION = "0.23.7"
val HADOOP_MAJOR_VERSION = "2"
val HADOOP_YARN = true
@@ -156,7 +156,8 @@ object SparkBuild extends Build {
Seq(
"org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION,
"org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION,
- "org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION
+ "org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION,
+ "org.apache.hadoop" % "hadoop-yarn-client" % HADOOP_VERSION
)
} else {
Seq(
diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml
index b66d193b5d..46f38c2772 100644
--- a/repl-bin/pom.xml
+++ b/repl-bin/pom.xml
@@ -201,6 +201,11 @@
<artifactId>hadoop-yarn-common</artifactId>
<scope>runtime</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <scope>runtime</scope>
+ </dependency>
</dependencies>
</profile>
<profile>