aboutsummaryrefslogtreecommitdiff
path: root/yarn/stable
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-12-09 11:02:43 -0800
committerAndrew Or <andrew@databricks.com>2014-12-09 11:02:43 -0800
commit912563aa3553afc0871d5b5858f533aa39cb99e5 (patch)
tree092241ac4c78deef8053f5095bc9680b3c8532cd /yarn/stable
parent383c5555c9f26c080bc9e3a463aab21dd5b3797f (diff)
downloadspark-912563aa3553afc0871d5b5858f533aa39cb99e5.tar.gz
spark-912563aa3553afc0871d5b5858f533aa39cb99e5.tar.bz2
spark-912563aa3553afc0871d5b5858f533aa39cb99e5.zip
SPARK-4338. [YARN] Ditch yarn-alpha.
Sorry if this is a little premature with 1.2 still not out the door, but it will make other work like SPARK-4136 and SPARK-2089 a lot easier. Author: Sandy Ryza <sandy@cloudera.com> Closes #3215 from sryza/sandy-spark-4338 and squashes the following commits: 1c5ac08 [Sandy Ryza] Update building Spark docs and remove unnecessary newline 9c1421c [Sandy Ryza] SPARK-4338. Ditch yarn-alpha.
Diffstat (limited to 'yarn/stable')
-rw-r--r--yarn/stable/pom.xml95
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala141
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala113
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala213
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala110
-rw-r--r--yarn/stable/src/test/resources/log4j.properties28
-rw-r--r--yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala189
7 files changed, 0 insertions, 889 deletions
diff --git a/yarn/stable/pom.xml b/yarn/stable/pom.xml
deleted file mode 100644
index 8b6521ad7f..0000000000
--- a/yarn/stable/pom.xml
+++ /dev/null
@@ -1,95 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>yarn-parent_2.10</artifactId>
- <version>1.3.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
- <properties>
- <sbt.project.name>yarn-stable</sbt.project.name>
- </properties>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-yarn_2.10</artifactId>
- <packaging>jar</packaging>
- <name>Spark Project YARN Stable API</name>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-tests</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <!--
- See SPARK-3710. hadoop-yarn-server-tests in Hadoop 2.2 fails to pull some needed
- dependencies, so they need to be added manually for the tests to work.
- -->
- <profiles>
- <profile>
- <id>hadoop-2.2</id>
- <properties>
- <jersey.version>1.9</jersey.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- <version>6.1.26</version>
- <exclusions>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- </exclusions>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-core</artifactId>
- <version>${jersey.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-json</artifactId>
- <version>${jersey.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>stax</groupId>
- <artifactId>stax-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-server</artifactId>
- <version>${jersey.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
- </profile>
- </profiles>
-
-</project>
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
deleted file mode 100644
index addaddb711..0000000000
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.nio.ByteBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.Records
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.deploy.SparkHadoopUtil
-
-/**
- * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API.
- */
-private[spark] class Client(
- val args: ClientArguments,
- val hadoopConf: Configuration,
- val sparkConf: SparkConf)
- extends ClientBase with Logging {
-
- def this(clientArgs: ClientArguments, spConf: SparkConf) =
- this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
-
- def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
-
- val yarnClient = YarnClient.createYarnClient
- val yarnConf = new YarnConfiguration(hadoopConf)
-
- def stop(): Unit = yarnClient.stop()
-
- /* ------------------------------------------------------------------------------------- *
- | The following methods have much in common in the stable and alpha versions of Client, |
- | but cannot be implemented in the parent trait due to subtle API differences across |
- | hadoop versions. |
- * ------------------------------------------------------------------------------------- */
-
- /**
- * Submit an application running our ApplicationMaster to the ResourceManager.
- *
- * The stable Yarn API provides a convenience method (YarnClient#createApplication) for
- * creating applications and setting up the application submission context. This was not
- * available in the alpha API.
- */
- override def submitApplication(): ApplicationId = {
- yarnClient.init(yarnConf)
- yarnClient.start()
-
- logInfo("Requesting a new application from cluster with %d NodeManagers"
- .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
-
- // Get a new application from our RM
- val newApp = yarnClient.createApplication()
- val newAppResponse = newApp.getNewApplicationResponse()
- val appId = newAppResponse.getApplicationId()
-
- // Verify whether the cluster has enough resources for our AM
- verifyClusterResources(newAppResponse)
-
- // Set up the appropriate contexts to launch our AM
- val containerContext = createContainerLaunchContext(newAppResponse)
- val appContext = createApplicationSubmissionContext(newApp, containerContext)
-
- // Finally, submit and monitor the application
- logInfo(s"Submitting application ${appId.getId} to ResourceManager")
- yarnClient.submitApplication(appContext)
- appId
- }
-
- /**
- * Set up the context for submitting our ApplicationMaster.
- * This uses the YarnClientApplication not available in the Yarn alpha API.
- */
- def createApplicationSubmissionContext(
- newApp: YarnClientApplication,
- containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
- val appContext = newApp.getApplicationSubmissionContext
- appContext.setApplicationName(args.appName)
- appContext.setQueue(args.amQueue)
- appContext.setAMContainerSpec(containerContext)
- appContext.setApplicationType("SPARK")
- val capability = Records.newRecord(classOf[Resource])
- capability.setMemory(args.amMemory + amMemoryOverhead)
- appContext.setResource(capability)
- appContext
- }
-
- /** Set up security tokens for launching our ApplicationMaster container. */
- override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
- val dob = new DataOutputBuffer
- credentials.writeTokenStorageToStream(dob)
- amContainer.setTokens(ByteBuffer.wrap(dob.getData))
- }
-
- /** Get the application report from the ResourceManager for an application we have submitted. */
- override def getApplicationReport(appId: ApplicationId): ApplicationReport =
- yarnClient.getApplicationReport(appId)
-
- /**
- * Return the security token used by this client to communicate with the ApplicationMaster.
- * If no security is enabled, the token returned by the report is null.
- */
- override def getClientToken(report: ApplicationReport): String =
- Option(report.getClientToAMToken).map(_.toString).getOrElse("")
-}
-
-object Client {
- def main(argStrings: Array[String]) {
- if (!sys.props.contains("SPARK_SUBMIT")) {
- println("WARNING: This client is deprecated and will be removed in a " +
- "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"")
- }
-
- // Set an env variable indicating we are running in YARN mode.
- // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
- System.setProperty("SPARK_YARN_MODE", "true")
- val sparkConf = new SparkConf
-
- val args = new ClientArguments(argStrings, sparkConf)
- new Client(args, sparkConf).run()
- }
-}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
deleted file mode 100644
index fdd3c2300f..0000000000
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
-
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.client.api.NMClient
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
-
-import org.apache.spark.{SecurityManager, SparkConf, Logging}
-import org.apache.spark.network.util.JavaUtils
-
-
-class ExecutorRunnable(
- container: Container,
- conf: Configuration,
- spConf: SparkConf,
- masterAddress: String,
- slaveId: String,
- hostname: String,
- executorMemory: Int,
- executorCores: Int,
- appId: String,
- securityMgr: SecurityManager)
- extends Runnable with ExecutorRunnableUtil with Logging {
-
- var rpc: YarnRPC = YarnRPC.create(conf)
- var nmClient: NMClient = _
- val sparkConf = spConf
- val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
- def run = {
- logInfo("Starting Executor Container")
- nmClient = NMClient.createNMClient()
- nmClient.init(yarnConf)
- nmClient.start()
- startContainer
- }
-
- def startContainer = {
- logInfo("Setting up ContainerLaunchContext")
-
- val ctx = Records.newRecord(classOf[ContainerLaunchContext])
- .asInstanceOf[ContainerLaunchContext]
-
- val localResources = prepareLocalResources
- ctx.setLocalResources(localResources)
-
- ctx.setEnvironment(env)
-
- val credentials = UserGroupInformation.getCurrentUser().getCredentials()
- val dob = new DataOutputBuffer()
- credentials.writeTokenStorageToStream(dob)
- ctx.setTokens(ByteBuffer.wrap(dob.getData()))
-
- val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
- appId, localResources)
-
- logInfo(s"Setting up executor with environment: $env")
- logInfo("Setting up executor with commands: " + commands)
- ctx.setCommands(commands)
-
- ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
-
- // If external shuffle service is enabled, register with the Yarn shuffle service already
- // started on the NodeManager and, if authentication is enabled, provide it with our secret
- // key for fetching shuffle files later
- if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
- val secretString = securityMgr.getSecretKey()
- val secretBytes =
- if (secretString != null) {
- // This conversion must match how the YarnShuffleService decodes our secret
- JavaUtils.stringToBytes(secretString)
- } else {
- // Authentication is not enabled, so just provide dummy metadata
- ByteBuffer.allocate(0)
- }
- ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes))
- }
-
- // Send the start request to the ContainerManager
- nmClient.startContainer(container, ctx)
- }
-
-}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
deleted file mode 100644
index 2bbf5d7db8..0000000000
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-
-import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.scheduler.SplitInfo
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.util.Records
-
-/**
- * Acquires resources for executors from a ResourceManager and launches executors in new containers.
- */
-private[yarn] class YarnAllocationHandler(
- conf: Configuration,
- sparkConf: SparkConf,
- amClient: AMRMClient[ContainerRequest],
- appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
- preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
- securityMgr: SecurityManager)
- extends YarnAllocator(conf, sparkConf, appAttemptId, args, preferredNodes, securityMgr) {
-
- override protected def releaseContainer(container: Container) = {
- amClient.releaseAssignedContainer(container.getId())
- }
-
- // pending isn't used on stable as the AMRMClient handles incremental asks
- override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = {
- addResourceRequests(count)
-
- // We have already set the container request. Poll the ResourceManager for a response.
- // This doubles as a heartbeat if there are no pending container requests.
- val progressIndicator = 0.1f
- new StableAllocateResponse(amClient.allocate(progressIndicator))
- }
-
- private def createRackResourceRequests(
- hostContainers: ArrayBuffer[ContainerRequest]
- ): ArrayBuffer[ContainerRequest] = {
- // Generate modified racks and new set of hosts under it before issuing requests.
- val rackToCounts = new HashMap[String, Int]()
-
- for (container <- hostContainers) {
- val candidateHost = container.getNodes.last
- assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
-
- val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
- if (rack != null) {
- var count = rackToCounts.getOrElse(rack, 0)
- count += 1
- rackToCounts.put(rack, count)
- }
- }
-
- val requestedContainers = new ArrayBuffer[ContainerRequest](rackToCounts.size)
- for ((rack, count) <- rackToCounts) {
- requestedContainers ++= createResourceRequests(
- AllocationType.RACK,
- rack,
- count,
- YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
- }
-
- requestedContainers
- }
-
- private def addResourceRequests(numExecutors: Int) {
- val containerRequests: List[ContainerRequest] =
- if (numExecutors <= 0) {
- logDebug("numExecutors: " + numExecutors)
- List()
- } else if (preferredHostToCount.isEmpty) {
- logDebug("host preferences is empty")
- createResourceRequests(
- AllocationType.ANY,
- resource = null,
- numExecutors,
- YarnSparkHadoopUtil.RM_REQUEST_PRIORITY).toList
- } else {
- // Request for all hosts in preferred nodes and for numExecutors -
- // candidates.size, request by default allocation policy.
- val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size)
- for ((candidateHost, candidateCount) <- preferredHostToCount) {
- val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
-
- if (requiredCount > 0) {
- hostContainerRequests ++= createResourceRequests(
- AllocationType.HOST,
- candidateHost,
- requiredCount,
- YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
- }
- }
- val rackContainerRequests: List[ContainerRequest] = createRackResourceRequests(
- hostContainerRequests).toList
-
- val anyContainerRequests = createResourceRequests(
- AllocationType.ANY,
- resource = null,
- numExecutors,
- YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
-
- val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
- hostContainerRequests.size + rackContainerRequests.size() + anyContainerRequests.size)
-
- containerRequestBuffer ++= hostContainerRequests
- containerRequestBuffer ++= rackContainerRequests
- containerRequestBuffer ++= anyContainerRequests
- containerRequestBuffer.toList
- }
-
- for (request <- containerRequests) {
- amClient.addContainerRequest(request)
- }
-
- for (request <- containerRequests) {
- val nodes = request.getNodes
- var hostStr = if (nodes == null || nodes.isEmpty) {
- "Any"
- } else {
- nodes.last
- }
- logInfo("Container request (host: %s, priority: %s, capability: %s".format(
- hostStr,
- request.getPriority().getPriority,
- request.getCapability))
- }
- }
-
- private def createResourceRequests(
- requestType: AllocationType.AllocationType,
- resource: String,
- numExecutors: Int,
- priority: Int
- ): ArrayBuffer[ContainerRequest] = {
-
- // If hostname is specified, then we need at least two requests - node local and rack local.
- // There must be a third request, which is ANY. That will be specially handled.
- requestType match {
- case AllocationType.HOST => {
- assert(YarnSparkHadoopUtil.ANY_HOST != resource)
- val hostname = resource
- val nodeLocal = constructContainerRequests(
- Array(hostname),
- racks = null,
- numExecutors,
- priority)
-
- // Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler.
- YarnSparkHadoopUtil.populateRackInfo(conf, hostname)
- nodeLocal
- }
- case AllocationType.RACK => {
- val rack = resource
- constructContainerRequests(hosts = null, Array(rack), numExecutors, priority)
- }
- case AllocationType.ANY => constructContainerRequests(
- hosts = null, racks = null, numExecutors, priority)
- case _ => throw new IllegalArgumentException(
- "Unexpected/unsupported request type: " + requestType)
- }
- }
-
- private def constructContainerRequests(
- hosts: Array[String],
- racks: Array[String],
- numExecutors: Int,
- priority: Int
- ): ArrayBuffer[ContainerRequest] = {
-
- val memoryRequest = executorMemory + memoryOverhead
- val resource = Resource.newInstance(memoryRequest, executorCores)
-
- val prioritySetting = Records.newRecord(classOf[Priority])
- prioritySetting.setPriority(priority)
-
- val requests = new ArrayBuffer[ContainerRequest]()
- for (i <- 0 until numExecutors) {
- requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
- }
- requests
- }
-
- private class StableAllocateResponse(response: AllocateResponse) extends YarnAllocateResponse {
- override def getAllocatedContainers() = response.getAllocatedContainers()
- override def getAvailableResources() = response.getAvailableResources()
- override def getCompletedContainersStatuses() = response.getCompletedContainersStatuses()
- }
-
-}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
deleted file mode 100644
index 8d4b96ed79..0000000000
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.util.{List => JList}
-
-import scala.collection.{Map, Set}
-import scala.collection.JavaConversions._
-import scala.util._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
-import org.apache.spark.scheduler.SplitInfo
-import org.apache.spark.util.Utils
-
-
-/**
- * YarnRMClient implementation for the Yarn stable API.
- */
-private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient with Logging {
-
- private var amClient: AMRMClient[ContainerRequest] = _
- private var uiHistoryAddress: String = _
- private var registered: Boolean = false
-
- override def register(
- conf: YarnConfiguration,
- sparkConf: SparkConf,
- preferredNodeLocations: Map[String, Set[SplitInfo]],
- uiAddress: String,
- uiHistoryAddress: String,
- securityMgr: SecurityManager) = {
- amClient = AMRMClient.createAMRMClient()
- amClient.init(conf)
- amClient.start()
- this.uiHistoryAddress = uiHistoryAddress
-
- logInfo("Registering the ApplicationMaster")
- synchronized {
- amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
- registered = true
- }
- new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
- preferredNodeLocations, securityMgr)
- }
-
- override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
- if (registered) {
- amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
- }
- }
-
- override def getAttemptId() = {
- val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
- val containerId = ConverterUtils.toContainerId(containerIdString)
- val appAttemptId = containerId.getApplicationAttemptId()
- appAttemptId
- }
-
- override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = {
- // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2,
- // so not all stable releases have it.
- val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration])
- .invoke(null, conf).asInstanceOf[String]).getOrElse("http://")
-
- // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses.
- try {
- val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
- classOf[Configuration])
- val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
- val hosts = proxies.map { proxy => proxy.split(":")(0) }
- val uriBases = proxies.map { proxy => prefix + proxy + proxyBase }
- Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
- } catch {
- case e: NoSuchMethodException =>
- val proxy = WebAppUtils.getProxyHostAndPort(conf)
- val parts = proxy.split(":")
- val uriBase = prefix + proxy + proxyBase
- Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
- }
- }
-
- override def getMaxRegAttempts(conf: YarnConfiguration) =
- conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
-
-}
diff --git a/yarn/stable/src/test/resources/log4j.properties b/yarn/stable/src/test/resources/log4j.properties
deleted file mode 100644
index 9dd05f17f0..0000000000
--- a/yarn/stable/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file core/target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=false
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.eclipse.jetty=WARN
-org.eclipse.jetty.LEVEL=WARN
diff --git a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
deleted file mode 100644
index d79b85e867..0000000000
--- a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-
-import scala.collection.JavaConversions._
-
-import com.google.common.base.Charsets
-import com.google.common.io.Files
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.server.MiniYARNCluster
-
-import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.util.Utils
-
-class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging {
-
- // log4j configuration for the Yarn containers, so that their output is collected
- // by Yarn instead of trying to overwrite unit-tests.log.
- private val LOG4J_CONF = """
- |log4j.rootCategory=DEBUG, console
- |log4j.appender.console=org.apache.log4j.ConsoleAppender
- |log4j.appender.console.target=System.err
- |log4j.appender.console.layout=org.apache.log4j.PatternLayout
- |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
- """.stripMargin
-
- private var yarnCluster: MiniYARNCluster = _
- private var tempDir: File = _
- private var fakeSparkJar: File = _
- private var oldConf: Map[String, String] = _
-
- override def beforeAll() {
- tempDir = Utils.createTempDir()
-
- val logConfDir = new File(tempDir, "log4j")
- logConfDir.mkdir()
-
- val logConfFile = new File(logConfDir, "log4j.properties")
- Files.write(LOG4J_CONF, logConfFile, Charsets.UTF_8)
-
- val childClasspath = logConfDir.getAbsolutePath() + File.pathSeparator +
- sys.props("java.class.path")
-
- oldConf = sys.props.filter { case (k, v) => k.startsWith("spark.") }.toMap
-
- yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
- yarnCluster.init(new YarnConfiguration())
- yarnCluster.start()
-
- // There's a race in MiniYARNCluster in which start() may return before the RM has updated
- // its address in the configuration. You can see this in the logs by noticing that when
- // MiniYARNCluster prints the address, it still has port "0" assigned, although later the
- // test works sometimes:
- //
- // INFO MiniYARNCluster: MiniYARN ResourceManager address: blah:0
- //
- // That log message prints the contents of the RM_ADDRESS config variable. If you check it
- // later on, it looks something like this:
- //
- // INFO YarnClusterSuite: RM address in configuration is blah:42631
- //
- // This hack loops for a bit waiting for the port to change, and fails the test if it hasn't
- // done so in a timely manner (defined to be 10 seconds).
- val config = yarnCluster.getConfig()
- val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10)
- while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") {
- if (System.currentTimeMillis() > deadline) {
- throw new IllegalStateException("Timed out waiting for RM to come up.")
- }
- logDebug("RM address still not set in configuration, waiting...")
- TimeUnit.MILLISECONDS.sleep(100)
- }
-
- logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
- config.foreach { e =>
- sys.props += ("spark.hadoop." + e.getKey() -> e.getValue())
- }
-
- fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
- sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath()))
- sys.props += ("spark.executor.instances" -> "1")
- sys.props += ("spark.driver.extraClassPath" -> childClasspath)
- sys.props += ("spark.executor.extraClassPath" -> childClasspath)
-
- super.beforeAll()
- }
-
- override def afterAll() {
- yarnCluster.stop()
- sys.props.retain { case (k, v) => !k.startsWith("spark.") }
- sys.props ++= oldConf
- super.afterAll()
- }
-
- test("run Spark in yarn-client mode") {
- var result = File.createTempFile("result", null, tempDir)
- YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
- checkResult(result)
- }
-
- test("run Spark in yarn-cluster mode") {
- val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
- var result = File.createTempFile("result", null, tempDir)
-
- val args = Array("--class", main,
- "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
- "--arg", "yarn-cluster",
- "--arg", result.getAbsolutePath(),
- "--num-executors", "1")
- Client.main(args)
- checkResult(result)
- }
-
- test("run Spark in yarn-cluster mode unsuccessfully") {
- val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
-
- // Use only one argument so the driver will fail
- val args = Array("--class", main,
- "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
- "--arg", "yarn-cluster",
- "--num-executors", "1")
- val exception = intercept[SparkException] {
- Client.main(args)
- }
- assert(Utils.exceptionString(exception).contains("Application finished with failed status"))
- }
-
- /**
- * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
- * any sort of error when the job process finishes successfully, but the job itself fails. So
- * the tests enforce that something is written to a file after everything is ok to indicate
- * that the job succeeded.
- */
- private def checkResult(result: File) = {
- var resultString = Files.toString(result, Charsets.UTF_8)
- resultString should be ("success")
- }
-
-}
-
-private object YarnClusterDriver extends Logging with Matchers {
-
- def main(args: Array[String]) = {
- if (args.length != 2) {
- System.err.println(
- s"""
- |Invalid command line: ${args.mkString(" ")}
- |
- |Usage: YarnClusterDriver [master] [result file]
- """.stripMargin)
- System.exit(1)
- }
-
- val sc = new SparkContext(new SparkConf().setMaster(args(0))
- .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
- val status = new File(args(1))
- var result = "failure"
- try {
- val data = sc.parallelize(1 to 4, 4).collect().toSet
- data should be (Set(1, 2, 3, 4))
- result = "success"
- } finally {
- sc.stop()
- Files.write(result, status, Charsets.UTF_8)
- }
- }
-
-}