aboutsummaryrefslogtreecommitdiff
path: root/yarn/alpha/src/main/scala/org/apache
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-12-09 11:02:43 -0800
committerAndrew Or <andrew@databricks.com>2014-12-09 11:02:43 -0800
commit912563aa3553afc0871d5b5858f533aa39cb99e5 (patch)
tree092241ac4c78deef8053f5095bc9680b3c8532cd /yarn/alpha/src/main/scala/org/apache
parent383c5555c9f26c080bc9e3a463aab21dd5b3797f (diff)
downloadspark-912563aa3553afc0871d5b5858f533aa39cb99e5.tar.gz
spark-912563aa3553afc0871d5b5858f533aa39cb99e5.tar.bz2
spark-912563aa3553afc0871d5b5858f533aa39cb99e5.zip
SPARK-4338. [YARN] Ditch yarn-alpha.
Sorry if this is a little premature with 1.2 still not out the door, but it will make other work like SPARK-4136 and SPARK-2089 a lot easier. Author: Sandy Ryza <sandy@cloudera.com> Closes #3215 from sryza/sandy-spark-4338 and squashes the following commits: 1c5ac08 [Sandy Ryza] Update building Spark docs and remove unnecessary newline 9c1421c [Sandy Ryza] SPARK-4338. Ditch yarn-alpha.
Diffstat (limited to 'yarn/alpha/src/main/scala/org/apache')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala145
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala139
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala229
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala118
4 files changed, 0 insertions, 631 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
deleted file mode 100644
index 73b705ba50..0000000000
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.nio.ByteBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.YarnClientImpl
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.Records
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.deploy.SparkHadoopUtil
-
-/**
- * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's alpha API.
- */
-@deprecated("use yarn/stable", "1.2.0")
-private[spark] class Client(
- val args: ClientArguments,
- val hadoopConf: Configuration,
- val sparkConf: SparkConf)
- extends YarnClientImpl with ClientBase with Logging {
-
- def this(clientArgs: ClientArguments, spConf: SparkConf) =
- this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
-
- def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
-
- val yarnConf: YarnConfiguration = new YarnConfiguration(hadoopConf)
-
- /* ------------------------------------------------------------------------------------- *
- | The following methods have much in common in the stable and alpha versions of Client, |
- | but cannot be implemented in the parent trait due to subtle API differences across |
- | hadoop versions. |
- * ------------------------------------------------------------------------------------- */
-
- /** Submit an application running our ApplicationMaster to the ResourceManager. */
- override def submitApplication(): ApplicationId = {
- init(yarnConf)
- start()
-
- logInfo("Requesting a new application from cluster with %d NodeManagers"
- .format(getYarnClusterMetrics.getNumNodeManagers))
-
- // Get a new application from our RM
- val newAppResponse = getNewApplication()
- val appId = newAppResponse.getApplicationId()
-
- // Verify whether the cluster has enough resources for our AM
- verifyClusterResources(newAppResponse)
-
- // Set up the appropriate contexts to launch our AM
- val containerContext = createContainerLaunchContext(newAppResponse)
- val appContext = createApplicationSubmissionContext(appId, containerContext)
-
- // Finally, submit and monitor the application
- logInfo(s"Submitting application ${appId.getId} to ResourceManager")
- submitApplication(appContext)
- appId
- }
-
- /**
- * Set up a context for launching our ApplicationMaster container.
- * In the Yarn alpha API, the memory requirements of this container must be set in
- * the ContainerLaunchContext instead of the ApplicationSubmissionContext.
- */
- override def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
- : ContainerLaunchContext = {
- val containerContext = super.createContainerLaunchContext(newAppResponse)
- val capability = Records.newRecord(classOf[Resource])
- capability.setMemory(args.amMemory + amMemoryOverhead)
- containerContext.setResource(capability)
- containerContext
- }
-
- /** Set up the context for submitting our ApplicationMaster. */
- def createApplicationSubmissionContext(
- appId: ApplicationId,
- containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
- val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
- appContext.setApplicationId(appId)
- appContext.setApplicationName(args.appName)
- appContext.setQueue(args.amQueue)
- appContext.setAMContainerSpec(containerContext)
- appContext.setUser(UserGroupInformation.getCurrentUser.getShortUserName)
- appContext
- }
-
- /**
- * Set up security tokens for launching our ApplicationMaster container.
- * ContainerLaunchContext#setContainerTokens is renamed `setTokens` in the stable API.
- */
- override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
- val dob = new DataOutputBuffer()
- credentials.writeTokenStorageToStream(dob)
- amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
- }
-
- /**
- * Return the security token used by this client to communicate with the ApplicationMaster.
- * If no security is enabled, the token returned by the report is null.
- * ApplicationReport#getClientToken is renamed `getClientToAMToken` in the stable API.
- */
- override def getClientToken(report: ApplicationReport): String =
- Option(report.getClientToken).map(_.toString).getOrElse("")
-}
-
-object Client {
- def main(argStrings: Array[String]) {
- if (!sys.props.contains("SPARK_SUBMIT")) {
- println("WARNING: This client is deprecated and will be removed in a " +
- "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"")
- }
- println("WARNING: Support for YARN-alpha API's will be removed in Spark 1.3 (see SPARK-3445)")
-
- // Set an env variable indicating we are running in YARN mode.
- // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
- System.setProperty("SPARK_YARN_MODE", "true")
- val sparkConf = new SparkConf
-
- val args = new ClientArguments(argStrings, sparkConf)
- new Client(args, sparkConf).run()
- }
-}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
deleted file mode 100644
index 7023a11706..0000000000
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
-
-import org.apache.spark.{SecurityManager, SparkConf, Logging}
-import org.apache.spark.network.util.JavaUtils
-
-@deprecated("use yarn/stable", "1.2.0")
-class ExecutorRunnable(
- container: Container,
- conf: Configuration,
- spConf: SparkConf,
- masterAddress: String,
- slaveId: String,
- hostname: String,
- executorMemory: Int,
- executorCores: Int,
- appAttemptId: String,
- securityMgr: SecurityManager)
- extends Runnable with ExecutorRunnableUtil with Logging {
-
- var rpc: YarnRPC = YarnRPC.create(conf)
- var cm: ContainerManager = _
- val sparkConf = spConf
- val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
- def run = {
- logInfo("Starting Executor Container")
- cm = connectToCM
- startContainer
- }
-
- def startContainer = {
- logInfo("Setting up ContainerLaunchContext")
-
- val ctx = Records.newRecord(classOf[ContainerLaunchContext])
- .asInstanceOf[ContainerLaunchContext]
-
- ctx.setContainerId(container.getId())
- ctx.setResource(container.getResource())
- val localResources = prepareLocalResources
- ctx.setLocalResources(localResources)
-
- val env = prepareEnvironment
- ctx.setEnvironment(env)
-
- ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
-
- val credentials = UserGroupInformation.getCurrentUser().getCredentials()
- val dob = new DataOutputBuffer()
- credentials.writeTokenStorageToStream(dob)
- ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
-
- val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
- appAttemptId, localResources)
- logInfo("Setting up executor with commands: " + commands)
- ctx.setCommands(commands)
-
- ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
-
- // If external shuffle service is enabled, register with the Yarn shuffle service already
- // started on the NodeManager and, if authentication is enabled, provide it with our secret
- // key for fetching shuffle files later
- if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
- val secretString = securityMgr.getSecretKey()
- val secretBytes =
- if (secretString != null) {
- // This conversion must match how the YarnShuffleService decodes our secret
- JavaUtils.stringToBytes(secretString)
- } else {
- // Authentication is not enabled, so just provide dummy metadata
- ByteBuffer.allocate(0)
- }
- ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes))
- }
-
- // Send the start request to the ContainerManager
- val startReq = Records.newRecord(classOf[StartContainerRequest])
- .asInstanceOf[StartContainerRequest]
- startReq.setContainerLaunchContext(ctx)
- cm.startContainer(startReq)
- }
-
- def connectToCM: ContainerManager = {
- val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
- val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
- logInfo("Connecting to ContainerManager at " + cmHostPortStr)
-
- // Use doAs and remoteUser here so we can add the container token and not pollute the current
- // users credentials with all of the individual container tokens
- val user = UserGroupInformation.createRemoteUser(container.getId().toString())
- val containerToken = container.getContainerToken()
- if (containerToken != null) {
- user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
- }
-
- val proxy = user
- .doAs(new PrivilegedExceptionAction[ContainerManager] {
- def run: ContainerManager = {
- rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager]
- }
- })
- proxy
- }
-
-}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
deleted file mode 100644
index abd37834ed..0000000000
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.util.concurrent.CopyOnWriteArrayList
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-
-import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.scheduler.SplitInfo
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.AMRMProtocol
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest
-import org.apache.hadoop.yarn.util.Records
-
-/**
- * Acquires resources for executors from a ResourceManager and launches executors in new containers.
- */
-private[yarn] class YarnAllocationHandler(
- conf: Configuration,
- sparkConf: SparkConf,
- resourceManager: AMRMProtocol,
- appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
- preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
- securityMgr: SecurityManager)
- extends YarnAllocator(conf, sparkConf, appAttemptId, args, preferredNodes, securityMgr) {
-
- private val lastResponseId = new AtomicInteger()
- private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList()
-
- override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = {
- var resourceRequests: List[ResourceRequest] = null
-
- logDebug("asking for additional executors: " + count + " with already pending: " + pending)
- val totalNumAsk = count + pending
- if (count <= 0) {
- resourceRequests = List()
- } else if (preferredHostToCount.isEmpty) {
- logDebug("host preferences is empty")
- resourceRequests = List(createResourceRequest(
- AllocationType.ANY, null, totalNumAsk, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
- } else {
- // request for all hosts in preferred nodes and for numExecutors -
- // candidates.size, request by default allocation policy.
- val hostContainerRequests: ArrayBuffer[ResourceRequest] =
- new ArrayBuffer[ResourceRequest](preferredHostToCount.size)
- for ((candidateHost, candidateCount) <- preferredHostToCount) {
- val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
-
- if (requiredCount > 0) {
- hostContainerRequests += createResourceRequest(
- AllocationType.HOST,
- candidateHost,
- requiredCount,
- YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
- }
- }
- val rackContainerRequests: List[ResourceRequest] = createRackResourceRequests(
- hostContainerRequests.toList)
-
- val anyContainerRequests: ResourceRequest = createResourceRequest(
- AllocationType.ANY,
- resource = null,
- totalNumAsk,
- YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
-
- val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
- hostContainerRequests.size + rackContainerRequests.size + 1)
-
- containerRequests ++= hostContainerRequests
- containerRequests ++= rackContainerRequests
- containerRequests += anyContainerRequests
-
- resourceRequests = containerRequests.toList
- }
-
- val req = Records.newRecord(classOf[AllocateRequest])
- req.setResponseId(lastResponseId.incrementAndGet)
- req.setApplicationAttemptId(appAttemptId)
-
- req.addAllAsks(resourceRequests)
-
- val releasedContainerList = createReleasedContainerList()
- req.addAllReleases(releasedContainerList)
-
- if (count > 0) {
- logInfo("Allocating %d executor containers with %d of memory each.".format(totalNumAsk,
- executorMemory + memoryOverhead))
- } else {
- logDebug("Empty allocation req .. release : " + releasedContainerList)
- }
-
- for (request <- resourceRequests) {
- logInfo("ResourceRequest (host : %s, num containers: %d, priority = %s , capability : %s)".
- format(
- request.getHostName,
- request.getNumContainers,
- request.getPriority,
- request.getCapability))
- }
- new AlphaAllocateResponse(resourceManager.allocate(req).getAMResponse())
- }
-
- override protected def releaseContainer(container: Container) = {
- releaseList.add(container.getId())
- }
-
- private def createRackResourceRequests(hostContainers: List[ResourceRequest]):
- List[ResourceRequest] = {
- // First generate modified racks and new set of hosts under it : then issue requests
- val rackToCounts = new HashMap[String, Int]()
-
- // Within this lock - used to read/write to the rack related maps too.
- for (container <- hostContainers) {
- val candidateHost = container.getHostName
- val candidateNumContainers = container.getNumContainers
- assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
-
- val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
- if (rack != null) {
- var count = rackToCounts.getOrElse(rack, 0)
- count += candidateNumContainers
- rackToCounts.put(rack, count)
- }
- }
-
- val requestedContainers: ArrayBuffer[ResourceRequest] =
- new ArrayBuffer[ResourceRequest](rackToCounts.size)
- for ((rack, count) <- rackToCounts){
- requestedContainers +=
- createResourceRequest(AllocationType.RACK, rack, count,
- YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
- }
-
- requestedContainers.toList
- }
-
- private def createResourceRequest(
- requestType: AllocationType.AllocationType,
- resource:String,
- numExecutors: Int,
- priority: Int): ResourceRequest = {
-
- // If hostname specified, we need atleast two requests - node local and rack local.
- // There must be a third request - which is ANY : that will be specially handled.
- requestType match {
- case AllocationType.HOST => {
- assert(YarnSparkHadoopUtil.ANY_HOST != resource)
- val hostname = resource
- val nodeLocal = createResourceRequestImpl(hostname, numExecutors, priority)
-
- // Add to host->rack mapping
- YarnSparkHadoopUtil.populateRackInfo(conf, hostname)
-
- nodeLocal
- }
- case AllocationType.RACK => {
- val rack = resource
- createResourceRequestImpl(rack, numExecutors, priority)
- }
- case AllocationType.ANY => createResourceRequestImpl(
- YarnSparkHadoopUtil.ANY_HOST, numExecutors, priority)
- case _ => throw new IllegalArgumentException(
- "Unexpected/unsupported request type: " + requestType)
- }
- }
-
- private def createResourceRequestImpl(
- hostname:String,
- numExecutors: Int,
- priority: Int): ResourceRequest = {
-
- val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
- val memCapability = Records.newRecord(classOf[Resource])
- // There probably is some overhead here, let's reserve a bit more memory.
- memCapability.setMemory(executorMemory + memoryOverhead)
- rsrcRequest.setCapability(memCapability)
-
- val pri = Records.newRecord(classOf[Priority])
- pri.setPriority(priority)
- rsrcRequest.setPriority(pri)
-
- rsrcRequest.setHostName(hostname)
-
- rsrcRequest.setNumContainers(java.lang.Math.max(numExecutors, 0))
- rsrcRequest
- }
-
- private def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
- val retval = new ArrayBuffer[ContainerId](1)
- // Iterator on COW list ...
- for (container <- releaseList.iterator()){
- retval += container
- }
- // Remove from the original list.
- if (!retval.isEmpty) {
- releaseList.removeAll(retval)
- logInfo("Releasing " + retval.size + " containers.")
- }
- retval
- }
-
- private class AlphaAllocateResponse(response: AMResponse) extends YarnAllocateResponse {
- override def getAllocatedContainers() = response.getAllocatedContainers()
- override def getAvailableResources() = response.getAvailableResources()
- override def getCompletedContainersStatuses() = response.getCompletedContainersStatuses()
- }
-
-}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
deleted file mode 100644
index e342cc82f4..0000000000
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import scala.collection.{Map, Set}
-import java.net.URI
-
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
-import org.apache.spark.scheduler.SplitInfo
-import org.apache.spark.util.Utils
-
-/**
- * YarnRMClient implementation for the Yarn alpha API.
- */
-private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient with Logging {
-
- private var rpc: YarnRPC = null
- private var resourceManager: AMRMProtocol = _
- private var uiHistoryAddress: String = _
- private var registered: Boolean = false
-
- override def register(
- conf: YarnConfiguration,
- sparkConf: SparkConf,
- preferredNodeLocations: Map[String, Set[SplitInfo]],
- uiAddress: String,
- uiHistoryAddress: String,
- securityMgr: SecurityManager) = {
- this.rpc = YarnRPC.create(conf)
- this.uiHistoryAddress = uiHistoryAddress
-
- synchronized {
- resourceManager = registerWithResourceManager(conf)
- registerApplicationMaster(uiAddress)
- registered = true
- }
-
- new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
- preferredNodeLocations, securityMgr)
- }
-
- override def getAttemptId() = {
- val envs = System.getenv()
- val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
- val containerId = ConverterUtils.toContainerId(containerIdString)
- val appAttemptId = containerId.getApplicationAttemptId()
- appAttemptId
- }
-
- override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
- if (registered) {
- val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
- .asInstanceOf[FinishApplicationMasterRequest]
- finishReq.setAppAttemptId(getAttemptId())
- finishReq.setFinishApplicationStatus(status)
- finishReq.setDiagnostics(diagnostics)
- finishReq.setTrackingUrl(uiHistoryAddress)
- resourceManager.finishApplicationMaster(finishReq)
- }
- }
-
- override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = {
- val proxy = YarnConfiguration.getProxyHostAndPort(conf)
- val parts = proxy.split(":")
- val uriBase = "http://" + proxy + proxyBase
- Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
- }
-
- override def getMaxRegAttempts(conf: YarnConfiguration) =
- conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
-
- private def registerWithResourceManager(conf: YarnConfiguration): AMRMProtocol = {
- val rmAddress = NetUtils.createSocketAddr(conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
- logInfo("Connecting to ResourceManager at " + rmAddress)
- rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
- }
-
- private def registerApplicationMaster(uiAddress: String): RegisterApplicationMasterResponse = {
- val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
- .asInstanceOf[RegisterApplicationMasterRequest]
- appMasterRequest.setApplicationAttemptId(getAttemptId())
- // Setting this to master host,port - so that the ApplicationReport at client has some
- // sensible info.
- // Users can then monitor stderr/stdout on that node if required.
- appMasterRequest.setHost(Utils.localHostName())
- appMasterRequest.setRpcPort(0)
- // remove the scheme from the url if it exists since Hadoop does not expect scheme
- val uri = new URI(uiAddress)
- val authority = if (uri.getScheme == null) uiAddress else uri.getAuthority
- appMasterRequest.setTrackingUrl(authority)
- resourceManager.registerApplicationMaster(appMasterRequest)
- }
-
-}