aboutsummaryrefslogtreecommitdiff
path: root/yarn/src
diff options
context:
space:
mode:
authorRaymond Liu <raymond.liu@intel.com>2013-12-04 13:20:27 +0800
committerRaymond Liu <raymond.liu@intel.com>2014-01-03 12:12:37 +0800
commit3dc379ce5aa51cc9c41f590d79c350b5dea33fc3 (patch)
tree5a99812d5e89006a3f1d4106e6eca4eb51e81433 /yarn/src
parent498a5f0a1c6e82a33c2ad8c48b68bbdb8da57a95 (diff)
downloadspark-3dc379ce5aa51cc9c41f590d79c350b5dea33fc3.tar.gz
spark-3dc379ce5aa51cc9c41f590d79c350b5dea33fc3.tar.bz2
spark-3dc379ce5aa51cc9c41f590d79c350b5dea33fc3.zip
Reorganize yarn related codes into sub projects to remove duplicate files.
Diffstat (limited to 'yarn/src')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala458
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala94
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala505
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala147
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala228
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala247
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala235
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala680
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala43
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala48
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala110
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala59
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala220
13 files changed, 0 insertions, 3074 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
deleted file mode 100644
index 7cf120d3eb..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.IOException
-import java.net.Socket
-import java.util.concurrent.CopyOnWriteArrayList
-import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
-
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-
-import org.apache.spark.{SparkConf, SparkContext, Logging}
-import org.apache.spark.util.Utils
-
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
-
- def this(args: ApplicationMasterArguments) = this(args, new Configuration())
-
- private var rpc: YarnRPC = YarnRPC.create(conf)
- private var resourceManager: AMRMProtocol = _
- private var appAttemptId: ApplicationAttemptId = _
- private var userThread: Thread = _
- private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
- private val fs = FileSystem.get(yarnConf)
-
- private var yarnAllocator: YarnAllocationHandler = _
- private var isFinished: Boolean = false
- private var uiAddress: String = _
- private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
- YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
- private var isLastAMRetry: Boolean = true
-
- private val sparkConf = new SparkConf()
- // Default to numWorkers * 2, with minimum of 3
- private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
- math.max(args.numWorkers * 2, 3))
-
- def run() {
- // Setup the directories so things go to yarn approved directories rather
- // then user specified and /tmp.
- System.setProperty("spark.local.dir", getLocalDirs())
-
- // set the web ui port to be ephemeral for yarn so we don't conflict with
- // other spark processes running on the same box
- System.setProperty("spark.ui.port", "0")
-
- // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
- ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
-
- appAttemptId = getApplicationAttemptId()
- isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
- resourceManager = registerWithResourceManager()
-
- // Workaround until hadoop moves to something which has
- // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
- // ignore result.
- // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
- // Hence args.workerCores = numCore disabled above. Any better option?
-
- // Compute number of threads for akka
- //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
- //if (minimumMemory > 0) {
- // val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
- // val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
-
- // if (numCore > 0) {
- // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
- // TODO: Uncomment when hadoop is on a version which has this fixed.
- // args.workerCores = numCore
- // }
- //}
- // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
-
- ApplicationMaster.register(this)
- // Start the user's JAR
- userThread = startUserClass()
-
- // This a bit hacky, but we need to wait until the spark.driver.port property has
- // been set by the Thread executing the user class.
- waitForSparkContextInitialized()
-
- // Do this after spark master is up and SparkContext is created so that we can register UI Url
- val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
-
- // Allocate all containers
- allocateWorkers()
-
- // Wait for the user class to Finish
- userThread.join()
-
- System.exit(0)
- }
-
- /** Get the Yarn approved local directories. */
- private def getLocalDirs(): String = {
- // Hadoop 0.23 and 2.x have different Environment variable names for the
- // local dirs, so lets check both. We assume one of the 2 is set.
- // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
- val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
- .getOrElse(Option(System.getenv("LOCAL_DIRS"))
- .getOrElse(""))
-
- if (localDirs.isEmpty()) {
- throw new Exception("Yarn Local dirs can't be empty")
- }
- localDirs
- }
-
- private def getApplicationAttemptId(): ApplicationAttemptId = {
- val envs = System.getenv()
- val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
- val containerId = ConverterUtils.toContainerId(containerIdString)
- val appAttemptId = containerId.getApplicationAttemptId()
- logInfo("ApplicationAttemptId: " + appAttemptId)
- appAttemptId
- }
-
- private def registerWithResourceManager(): AMRMProtocol = {
- val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
- logInfo("Connecting to ResourceManager at " + rmAddress)
- rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
- }
-
- private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
- logInfo("Registering the ApplicationMaster")
- val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
- .asInstanceOf[RegisterApplicationMasterRequest]
- appMasterRequest.setApplicationAttemptId(appAttemptId)
- // Setting this to master host,port - so that the ApplicationReport at client has some
- // sensible info.
- // Users can then monitor stderr/stdout on that node if required.
- appMasterRequest.setHost(Utils.localHostName())
- appMasterRequest.setRpcPort(0)
- appMasterRequest.setTrackingUrl(uiAddress)
- resourceManager.registerApplicationMaster(appMasterRequest)
- }
-
- private def startUserClass(): Thread = {
- logInfo("Starting the user JAR in a separate Thread")
- val mainMethod = Class.forName(
- args.userClass,
- false /* initialize */,
- Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
- val t = new Thread {
- override def run() {
- var successed = false
- try {
- // Copy
- var mainArgs: Array[String] = new Array[String](args.userArgs.size)
- args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
- mainMethod.invoke(null, mainArgs)
- // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
- // userThread will stop here unless it has uncaught exception thrown out
- // It need shutdown hook to set SUCCEEDED
- successed = true
- } finally {
- logDebug("finishing main")
- isLastAMRetry = true
- if (successed) {
- ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- } else {
- ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
- }
- }
- }
- }
- t.start()
- t
- }
-
- // this need to happen before allocateWorkers
- private def waitForSparkContextInitialized() {
- logInfo("Waiting for spark context initialization")
- try {
- var sparkContext: SparkContext = null
- ApplicationMaster.sparkContextRef.synchronized {
- var count = 0
- val waitTime = 10000L
- val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
- while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
- logInfo("Waiting for spark context initialization ... " + count)
- count = count + 1
- ApplicationMaster.sparkContextRef.wait(waitTime)
- }
- sparkContext = ApplicationMaster.sparkContextRef.get()
- assert(sparkContext != null || count >= numTries)
-
- if (null != sparkContext) {
- uiAddress = sparkContext.ui.appUIAddress
- this.yarnAllocator = YarnAllocationHandler.newAllocator(
- yarnConf,
- resourceManager,
- appAttemptId,
- args,
- sparkContext.preferredNodeLocationData,
- sparkContext.getConf)
- } else {
- logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".
- format(count * waitTime, numTries))
- this.yarnAllocator = YarnAllocationHandler.newAllocator(
- yarnConf,
- resourceManager,
- appAttemptId,
- args,
- sparkContext.getConf)
- }
- }
- } finally {
- // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
- // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
- ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
- }
- }
-
- private def allocateWorkers() {
- try {
- logInfo("Allocating " + args.numWorkers + " workers.")
- // Wait until all containers have finished
- // TODO: This is a bit ugly. Can we make it nicer?
- // TODO: Handle container failure
-
- // Exists the loop if the user thread exits.
- while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
- if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
- finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of worker failures reached")
- }
- yarnAllocator.allocateContainers(
- math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
- ApplicationMaster.incrementAllocatorLoop(1)
- Thread.sleep(100)
- }
- } finally {
- // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
- // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
- ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
- }
- logInfo("All workers have launched.")
-
- // Launch a progress reporter thread, else the app will get killed after expiration
- // (def: 10mins) timeout.
- // TODO(harvey): Verify the timeout
- if (userThread.isAlive) {
- // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
- val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
- // we want to be reasonably responsive without causing too many requests to RM.
- val schedulerInterval =
- sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
-
- // must be <= timeoutInterval / 2.
- val interval = math.min(timeoutInterval / 2, schedulerInterval)
-
- launchReporterThread(interval)
- }
- }
-
- private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
-
- val t = new Thread {
- override def run() {
- while (userThread.isAlive) {
- if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
- finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of worker failures reached")
- }
- val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
- if (missingWorkerCount > 0) {
- logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingWorkerCount))
- yarnAllocator.allocateContainers(missingWorkerCount)
- }
- else sendProgress()
- Thread.sleep(sleepTime)
- }
- }
- }
- // Setting to daemon status, though this is usually not a good idea.
- t.setDaemon(true)
- t.start()
- logInfo("Started progress reporter thread - sleep time : " + sleepTime)
- t
- }
-
- private def sendProgress() {
- logDebug("Sending progress")
- // Simulated with an allocate request with no nodes requested ...
- yarnAllocator.allocateContainers(0)
- }
-
- /*
- def printContainers(containers: List[Container]) = {
- for (container <- containers) {
- logInfo("Launching shell command on a new container."
- + ", containerId=" + container.getId()
- + ", containerNode=" + container.getNodeId().getHost()
- + ":" + container.getNodeId().getPort()
- + ", containerNodeURI=" + container.getNodeHttpAddress()
- + ", containerState" + container.getState()
- + ", containerResourceMemory"
- + container.getResource().getMemory())
- }
- }
- */
-
- def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
- synchronized {
- if (isFinished) {
- return
- }
- isFinished = true
- }
-
- logInfo("finishApplicationMaster with " + status)
- val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
- .asInstanceOf[FinishApplicationMasterRequest]
- finishReq.setAppAttemptId(appAttemptId)
- finishReq.setFinishApplicationStatus(status)
- finishReq.setDiagnostics(diagnostics)
- // Set tracking url to empty since we don't have a history server.
- finishReq.setTrackingUrl("")
- resourceManager.finishApplicationMaster(finishReq)
- }
-
- /**
- * Clean up the staging directory.
- */
- private def cleanupStagingDir() {
- var stagingDirPath: Path = null
- try {
- val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
- if (!preserveFiles) {
- stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
- if (stagingDirPath == null) {
- logError("Staging directory is null")
- return
- }
- logInfo("Deleting staging directory " + stagingDirPath)
- fs.delete(stagingDirPath, true)
- }
- } catch {
- case ioe: IOException =>
- logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
- }
- }
-
- // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
- class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
-
- def run() {
- logInfo("AppMaster received a signal.")
- // we need to clean up staging dir before HDFS is shut down
- // make sure we don't delete it until this is the last AM
- if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
- }
- }
-}
-
-object ApplicationMaster {
- // Number of times to wait for the allocator loop to complete.
- // Each loop iteration waits for 100ms, so maximum of 3 seconds.
- // This is to ensure that we have reasonable number of containers before we start
- // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
- // optimal as more containers are available. Might need to handle this better.
- private val ALLOCATOR_LOOP_WAIT_COUNT = 30
- def incrementAllocatorLoop(by: Int) {
- val count = yarnAllocatorLoop.getAndAdd(by)
- if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
- yarnAllocatorLoop.synchronized {
- // to wake threads off wait ...
- yarnAllocatorLoop.notifyAll()
- }
- }
- }
-
- private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
-
- def register(master: ApplicationMaster) {
- applicationMasters.add(master)
- }
-
- val sparkContextRef: AtomicReference[SparkContext] =
- new AtomicReference[SparkContext](null /* initialValue */)
- val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
-
- def sparkContextInitialized(sc: SparkContext): Boolean = {
- var modified = false
- sparkContextRef.synchronized {
- modified = sparkContextRef.compareAndSet(null, sc)
- sparkContextRef.notifyAll()
- }
-
- // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
- // System.exit.
- // Should not really have to do this, but it helps YARN to evict resources earlier.
- // Not to mention, prevent the Client from declaring failure even though we exited properly.
- // Note that this will unfortunately not properly clean up the staging files because it gets
- // called too late, after the filesystem is already shutdown.
- if (modified) {
- Runtime.getRuntime().addShutdownHook(new Thread with Logging {
- // This is not only logs, but also ensures that log system is initialized for this instance
- // when we are actually 'run'-ing.
- logInfo("Adding shutdown hook for context " + sc)
- override def run() {
- logInfo("Invoking sc stop from shutdown hook")
- sc.stop()
- // Best case ...
- for (master <- applicationMasters) {
- master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- }
- }
- } )
- }
-
- // Wait for initialization to complete and atleast 'some' nodes can get allocated.
- yarnAllocatorLoop.synchronized {
- while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
- yarnAllocatorLoop.wait(1000L)
- }
- }
- modified
- }
-
- def main(argStrings: Array[String]) {
- val args = new ApplicationMasterArguments(argStrings)
- new ApplicationMaster(args).run()
- }
-}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
deleted file mode 100644
index f76a5ddd39..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import org.apache.spark.util.IntParam
-import collection.mutable.ArrayBuffer
-
-class ApplicationMasterArguments(val args: Array[String]) {
- var userJar: String = null
- var userClass: String = null
- var userArgs: Seq[String] = Seq[String]()
- var workerMemory = 1024
- var workerCores = 1
- var numWorkers = 2
-
- parseArgs(args.toList)
-
- private def parseArgs(inputArgs: List[String]): Unit = {
- val userArgsBuffer = new ArrayBuffer[String]()
-
- var args = inputArgs
-
- while (! args.isEmpty) {
-
- args match {
- case ("--jar") :: value :: tail =>
- userJar = value
- args = tail
-
- case ("--class") :: value :: tail =>
- userClass = value
- args = tail
-
- case ("--args") :: value :: tail =>
- userArgsBuffer += value
- args = tail
-
- case ("--num-workers") :: IntParam(value) :: tail =>
- numWorkers = value
- args = tail
-
- case ("--worker-memory") :: IntParam(value) :: tail =>
- workerMemory = value
- args = tail
-
- case ("--worker-cores") :: IntParam(value) :: tail =>
- workerCores = value
- args = tail
-
- case Nil =>
- if (userJar == null || userClass == null) {
- printUsageAndExit(1)
- }
-
- case _ =>
- printUsageAndExit(1, args)
- }
- }
-
- userArgs = userArgsBuffer.readOnly
- }
-
- def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
- if (unknownParam != null) {
- System.err.println("Unknown/unsupported param " + unknownParam)
- }
- System.err.println(
- "Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] \n" +
- "Options:\n" +
- " --jar JAR_PATH Path to your application's JAR file (required)\n" +
- " --class CLASS_NAME Name of your application's main class (required)\n" +
- " --args ARGS Arguments to be passed to your application's main class.\n" +
- " Mutliple invocations are possible, each will be passed in order.\n" +
- " --num-workers NUM Number of workers to start (Default: 2)\n" +
- " --worker-cores NUM Number of cores for the workers (Default: 1)\n" +
- " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n")
- System.exit(exitCode)
- }
-}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
deleted file mode 100644
index 2bd047c97a..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ /dev/null
@@ -1,505 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.{InetAddress, UnknownHostException, URI}
-import java.nio.ByteBuffer
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.mapred.Master
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.YarnClientImpl
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, Records}
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.util.Utils
-import org.apache.spark.deploy.SparkHadoopUtil
-
-
-class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
-
- def this(args: ClientArguments) = this(new Configuration(), args)
-
- var rpc: YarnRPC = YarnRPC.create(conf)
- val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
- val credentials = UserGroupInformation.getCurrentUser().getCredentials()
- private val SPARK_STAGING: String = ".sparkStaging"
- private val distCacheMgr = new ClientDistributedCacheManager()
- private val sparkConf = new SparkConf
-
- // Staging directory is private! -> rwx--------
- val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
-
- // App files are world-wide readable and owner writable -> rw-r--r--
- val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
-
- // for client user who want to monitor app status by itself.
- def runApp() = {
- validateArgs()
-
- init(yarnConf)
- start()
- logClusterResourceDetails()
-
- val newApp = super.getNewApplication()
- val appId = newApp.getApplicationId()
-
- verifyClusterResources(newApp)
- val appContext = createApplicationSubmissionContext(appId)
- val appStagingDir = getAppStagingDir(appId)
- val localResources = prepareLocalResources(appStagingDir)
- val env = setupLaunchEnv(localResources, appStagingDir)
- val amContainer = createContainerLaunchContext(newApp, localResources, env)
-
- appContext.setQueue(args.amQueue)
- appContext.setAMContainerSpec(amContainer)
- appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
-
- submitApp(appContext)
- appId
- }
-
- def run() {
- val appId = runApp()
- monitorApplication(appId)
- System.exit(0)
- }
-
- def validateArgs() = {
- Map(
- (System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
- (args.userJar == null) -> "Error: You must specify a user jar!",
- (args.userClass == null) -> "Error: You must specify a user class!",
- (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
- (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be " +
- "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
- (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size " +
- "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD)
- ).foreach { case(cond, errStr) =>
- if (cond) {
- logError(errStr)
- args.printUsageAndExit(1)
- }
- }
- }
-
- def getAppStagingDir(appId: ApplicationId): String = {
- SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
- }
-
- def logClusterResourceDetails() {
- val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
- logInfo("Got Cluster metric info from ASM, numNodeManagers = " +
- clusterMetrics.getNumNodeManagers)
-
- val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
- logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
- queueApplicationCount = %s, queueChildQueueCount = %s""".format(
- queueInfo.getQueueName,
- queueInfo.getCurrentCapacity,
- queueInfo.getMaximumCapacity,
- queueInfo.getApplications.size,
- queueInfo.getChildQueues.size))
- }
-
- def verifyClusterResources(app: GetNewApplicationResponse) = {
- val maxMem = app.getMaximumResourceCapability().getMemory()
- logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
-
- // If we have requested more then the clusters max for a single resource then exit.
- if (args.workerMemory > maxMem) {
- logError("the worker size is to large to run on this cluster " + args.workerMemory)
- System.exit(1)
- }
- val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
- if (amMem > maxMem) {
- logError("AM size is to large to run on this cluster " + amMem)
- System.exit(1)
- }
-
- // We could add checks to make sure the entire cluster has enough resources but that involves
- // getting all the node reports and computing ourselves
- }
-
- def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
- logInfo("Setting up application submission context for ASM")
- val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
- appContext.setApplicationId(appId)
- appContext.setApplicationName(args.appName)
- return appContext
- }
-
- /** See if two file systems are the same or not. */
- private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
- val srcUri = srcFs.getUri()
- val dstUri = destFs.getUri()
- if (srcUri.getScheme() == null) {
- return false
- }
- if (!srcUri.getScheme().equals(dstUri.getScheme())) {
- return false
- }
- var srcHost = srcUri.getHost()
- var dstHost = dstUri.getHost()
- if ((srcHost != null) && (dstHost != null)) {
- try {
- srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
- dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
- } catch {
- case e: UnknownHostException =>
- return false
- }
- if (!srcHost.equals(dstHost)) {
- return false
- }
- } else if (srcHost == null && dstHost != null) {
- return false
- } else if (srcHost != null && dstHost == null) {
- return false
- }
- //check for ports
- if (srcUri.getPort() != dstUri.getPort()) {
- return false
- }
- return true
- }
-
- /** Copy the file into HDFS if needed. */
- private def copyRemoteFile(
- dstDir: Path,
- originalPath: Path,
- replication: Short,
- setPerms: Boolean = false): Path = {
- val fs = FileSystem.get(conf)
- val remoteFs = originalPath.getFileSystem(conf)
- var newPath = originalPath
- if (! compareFs(remoteFs, fs)) {
- newPath = new Path(dstDir, originalPath.getName())
- logInfo("Uploading " + originalPath + " to " + newPath)
- FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
- fs.setReplication(newPath, replication)
- if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
- }
- // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
- // version shows the specific version in the distributed cache configuration
- val qualPath = fs.makeQualified(newPath)
- val fc = FileContext.getFileContext(qualPath.toUri(), conf)
- val destPath = fc.resolvePath(qualPath)
- destPath
- }
-
- def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
- logInfo("Preparing Local resources")
- // Upload Spark and the application JAR to the remote file system if necessary. Add them as
- // local resources to the AM.
- val fs = FileSystem.get(conf)
-
- val delegTokenRenewer = Master.getMasterPrincipal(conf)
- if (UserGroupInformation.isSecurityEnabled()) {
- if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
- logError("Can't get Master Kerberos principal for use as renewer")
- System.exit(1)
- }
- }
- val dst = new Path(fs.getHomeDirectory(), appStagingDir)
- val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
-
- if (UserGroupInformation.isSecurityEnabled()) {
- val dstFs = dst.getFileSystem(conf)
- dstFs.addDelegationTokens(delegTokenRenewer, credentials)
- }
- val localResources = HashMap[String, LocalResource]()
- FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
-
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
-
- Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
- Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
- .foreach { case(destName, _localPath) =>
- val localPath: String = if (_localPath != null) _localPath.trim() else ""
- if (! localPath.isEmpty()) {
- var localURI = new URI(localPath)
- // if not specified assume these are in the local filesystem to keep behavior like Hadoop
- if (localURI.getScheme() == null) {
- localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString)
- }
- val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
- val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
- destName, statCache)
- }
- }
-
- // handle any add jars
- if ((args.addJars != null) && (!args.addJars.isEmpty())){
- args.addJars.split(',').foreach { case file: String =>
- val localURI = new URI(file.trim())
- val localPath = new Path(localURI)
- val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
- val destPath = copyRemoteFile(dst, localPath, replication)
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
- linkname, statCache, true)
- }
- }
-
- // handle any distributed cache files
- if ((args.files != null) && (!args.files.isEmpty())){
- args.files.split(',').foreach { case file: String =>
- val localURI = new URI(file.trim())
- val localPath = new Path(localURI)
- val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
- val destPath = copyRemoteFile(dst, localPath, replication)
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
- linkname, statCache)
- }
- }
-
- // handle any distributed cache archives
- if ((args.archives != null) && (!args.archives.isEmpty())) {
- args.archives.split(',').foreach { case file:String =>
- val localURI = new URI(file.trim())
- val localPath = new Path(localURI)
- val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
- val destPath = copyRemoteFile(dst, localPath, replication)
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
- linkname, statCache)
- }
- }
-
- UserGroupInformation.getCurrentUser().addCredentials(credentials)
- return localResources
- }
-
- def setupLaunchEnv(
- localResources: HashMap[String, LocalResource],
- stagingDir: String): HashMap[String, String] = {
- logInfo("Setting up the launch environment")
- val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null)
-
- val env = new HashMap[String, String]()
-
- Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
- env("SPARK_YARN_MODE") = "true"
- env("SPARK_YARN_STAGING_DIR") = stagingDir
-
- // Set the environment variables to be passed on to the Workers.
- distCacheMgr.setDistFilesEnv(env)
- distCacheMgr.setDistArchivesEnv(env)
-
- // Allow users to specify some environment variables.
- Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
-
- // Add each SPARK-* key to the environment.
- System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
- env
- }
-
- def userArgsToString(clientArgs: ClientArguments): String = {
- val prefix = " --args "
- val args = clientArgs.userArgs
- val retval = new StringBuilder()
- for (arg <- args){
- retval.append(prefix).append(" '").append(arg).append("' ")
- }
- retval.toString
- }
-
- def createContainerLaunchContext(
- newApp: GetNewApplicationResponse,
- localResources: HashMap[String, LocalResource],
- env: HashMap[String, String]): ContainerLaunchContext = {
- logInfo("Setting up container launch context")
- val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
- amContainer.setLocalResources(localResources)
- amContainer.setEnvironment(env)
-
- val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
-
- // TODO(harvey): This can probably be a val.
- var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
- ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
- YarnAllocationHandler.MEMORY_OVERHEAD)
-
- // Extra options for the JVM
- var JAVA_OPTS = ""
-
- // Add Xmx for am memory
- JAVA_OPTS += "-Xmx" + amMemory + "m "
-
- JAVA_OPTS += " -Djava.io.tmpdir=" +
- new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
-
- // Commenting it out for now - so that people can refer to the properties if required. Remove
- // it once cpuset version is pushed out. The context is, default gc for server class machines
- // end up using all cores to do gc - hence if there are multiple containers in same node,
- // spark gc effects all other containers performance (which can also be other spark containers)
- // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
- // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
- // of cores on a node.
- val useConcurrentAndIncrementalGC = env.isDefinedAt("SPARK_USE_CONC_INCR_GC") &&
- java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))
- if (useConcurrentAndIncrementalGC) {
- // In our expts, using (default) throughput collector has severe perf ramnifications in
- // multi-tenant machines
- JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
- JAVA_OPTS += " -XX:+CMSIncrementalMode "
- JAVA_OPTS += " -XX:+CMSIncrementalPacing "
- JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
- JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
- }
-
- if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
- JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
- }
-
- // Command for the ApplicationMaster
- var javaCommand = "java"
- val javaHome = System.getenv("JAVA_HOME")
- if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
- javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
- }
-
- val commands = List[String](javaCommand +
- " -server " +
- JAVA_OPTS +
- " " + args.amClass +
- " --class " + args.userClass +
- " --jar " + args.userJar +
- userArgsToString(args) +
- " --worker-memory " + args.workerMemory +
- " --worker-cores " + args.workerCores +
- " --num-workers " + args.numWorkers +
- " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
- " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
- logInfo("Command for the ApplicationMaster: " + commands(0))
- amContainer.setCommands(commands)
-
- val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
- // Memory for the ApplicationMaster.
- capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
- amContainer.setResource(capability)
-
- // Setup security tokens.
- val dob = new DataOutputBuffer()
- credentials.writeTokenStorageToStream(dob)
- amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
-
- amContainer
- }
-
- def submitApp(appContext: ApplicationSubmissionContext) = {
- // Submit the application to the applications manager.
- logInfo("Submitting application to ASM")
- super.submitApplication(appContext)
- }
-
- def monitorApplication(appId: ApplicationId): Boolean = {
- val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
-
- while (true) {
- Thread.sleep(interval)
- val report = super.getApplicationReport(appId)
-
- logInfo("Application report from ASM: \n" +
- "\t application identifier: " + appId.toString() + "\n" +
- "\t appId: " + appId.getId() + "\n" +
- "\t clientToken: " + report.getClientToken() + "\n" +
- "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
- "\t appMasterHost: " + report.getHost() + "\n" +
- "\t appQueue: " + report.getQueue() + "\n" +
- "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
- "\t appStartTime: " + report.getStartTime() + "\n" +
- "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
- "\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" +
- "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
- "\t appUser: " + report.getUser()
- )
-
- val state = report.getYarnApplicationState()
- val dsStatus = report.getFinalApplicationStatus()
- if (state == YarnApplicationState.FINISHED ||
- state == YarnApplicationState.FAILED ||
- state == YarnApplicationState.KILLED) {
- return true
- }
- }
- true
- }
-}
-
-object Client {
- val SPARK_JAR: String = "spark.jar"
- val APP_JAR: String = "app.jar"
- val LOG4J_PROP: String = "log4j.properties"
-
- def main(argStrings: Array[String]) {
- // Set an env variable indicating we are running in YARN mode.
- // Note that anything with SPARK prefix gets propagated to all (remote) processes
- System.setProperty("SPARK_YARN_MODE", "true")
-
- val args = new ClientArguments(argStrings)
-
- new Client(args).run
- }
-
- // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
- def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
- for (c <- conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim)
- }
- }
-
- def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
- // If log4j present, ensure ours overrides all others
- if (addLog4j) {
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
- Path.SEPARATOR + LOG4J_PROP)
- }
- // Normally the users app.jar is last in case conflicts with spark jars
- val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false").toBoolean
- if (userClasspathFirst) {
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
- Path.SEPARATOR + APP_JAR)
- }
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
- Path.SEPARATOR + SPARK_JAR)
- Client.populateHadoopClasspath(conf, env)
-
- if (!userClasspathFirst) {
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
- Path.SEPARATOR + APP_JAR)
- }
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
- Path.SEPARATOR + "*")
- }
-}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
deleted file mode 100644
index 9075ca71e7..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import org.apache.spark.SparkConf
-import org.apache.spark.util.MemoryParam
-import org.apache.spark.util.IntParam
-import collection.mutable.{ArrayBuffer, HashMap}
-import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
-
-// TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
-class ClientArguments(val args: Array[String]) {
- var addJars: String = null
- var files: String = null
- var archives: String = null
- var userJar: String = null
- var userClass: String = null
- var userArgs: Seq[String] = Seq[String]()
- var workerMemory = 1024
- var workerCores = 1
- var numWorkers = 2
- var amQueue = new SparkConf().get("QUEUE", "default")
- var amMemory: Int = 512
- var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
- var appName: String = "Spark"
- // TODO
- var inputFormatInfo: List[InputFormatInfo] = null
-
- parseArgs(args.toList)
-
- private def parseArgs(inputArgs: List[String]): Unit = {
- val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()
- val inputFormatMap: HashMap[String, InputFormatInfo] = new HashMap[String, InputFormatInfo]()
-
- var args = inputArgs
-
- while (! args.isEmpty) {
-
- args match {
- case ("--jar") :: value :: tail =>
- userJar = value
- args = tail
-
- case ("--class") :: value :: tail =>
- userClass = value
- args = tail
-
- case ("--args") :: value :: tail =>
- userArgsBuffer += value
- args = tail
-
- case ("--master-class") :: value :: tail =>
- amClass = value
- args = tail
-
- case ("--master-memory") :: MemoryParam(value) :: tail =>
- amMemory = value
- args = tail
-
- case ("--worker-memory") :: MemoryParam(value) :: tail =>
- workerMemory = value
- args = tail
-
- case ("--num-workers") :: IntParam(value) :: tail =>
- numWorkers = value
- args = tail
-
- case ("--worker-cores") :: IntParam(value) :: tail =>
- workerCores = value
- args = tail
-
- case ("--queue") :: value :: tail =>
- amQueue = value
- args = tail
-
- case ("--name") :: value :: tail =>
- appName = value
- args = tail
-
- case ("--addJars") :: value :: tail =>
- addJars = value
- args = tail
-
- case ("--files") :: value :: tail =>
- files = value
- args = tail
-
- case ("--archives") :: value :: tail =>
- archives = value
- args = tail
-
- case Nil =>
- if (userJar == null || userClass == null) {
- printUsageAndExit(1)
- }
-
- case _ =>
- printUsageAndExit(1, args)
- }
- }
-
- userArgs = userArgsBuffer.readOnly
- inputFormatInfo = inputFormatMap.values.toList
- }
-
-
- def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
- if (unknownParam != null) {
- System.err.println("Unknown/unsupported param " + unknownParam)
- }
- System.err.println(
- "Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
- "Options:\n" +
- " --jar JAR_PATH Path to your application's JAR file (required)\n" +
- " --class CLASS_NAME Name of your application's main class (required)\n" +
- " --args ARGS Arguments to be passed to your application's main class.\n" +
- " Mutliple invocations are possible, each will be passed in order.\n" +
- " --num-workers NUM Number of workers to start (Default: 2)\n" +
- " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
- " --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" +
- " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
- " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
- " --name NAME The name of your application (Default: Spark)\n" +
- " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
- " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
- " --files files Comma separated list of files to be distributed with the job.\n" +
- " --archives archives Comma separated list of archives to be distributed with the job."
- )
- System.exit(exitCode)
- }
-
-}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
deleted file mode 100644
index 5f159b073f..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.permission.FsAction
-import org.apache.hadoop.yarn.api.records.LocalResource
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
-import org.apache.hadoop.yarn.api.records.LocalResourceType
-import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
-
-import org.apache.spark.Logging
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.LinkedHashMap
-import scala.collection.mutable.Map
-
-
-/** Client side methods to setup the Hadoop distributed cache */
-class ClientDistributedCacheManager() extends Logging {
- private val distCacheFiles: Map[String, Tuple3[String, String, String]] =
- LinkedHashMap[String, Tuple3[String, String, String]]()
- private val distCacheArchives: Map[String, Tuple3[String, String, String]] =
- LinkedHashMap[String, Tuple3[String, String, String]]()
-
-
- /**
- * Add a resource to the list of distributed cache resources. This list can
- * be sent to the ApplicationMaster and possibly the workers so that it can
- * be downloaded into the Hadoop distributed cache for use by this application.
- * Adds the LocalResource to the localResources HashMap passed in and saves
- * the stats of the resources to they can be sent to the workers and verified.
- *
- * @param fs FileSystem
- * @param conf Configuration
- * @param destPath path to the resource
- * @param localResources localResource hashMap to insert the resource into
- * @param resourceType LocalResourceType
- * @param link link presented in the distributed cache to the destination
- * @param statCache cache to store the file/directory stats
- * @param appMasterOnly Whether to only add the resource to the app master
- */
- def addResource(
- fs: FileSystem,
- conf: Configuration,
- destPath: Path,
- localResources: HashMap[String, LocalResource],
- resourceType: LocalResourceType,
- link: String,
- statCache: Map[URI, FileStatus],
- appMasterOnly: Boolean = false) = {
- val destStatus = fs.getFileStatus(destPath)
- val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
- amJarRsrc.setType(resourceType)
- val visibility = getVisibility(conf, destPath.toUri(), statCache)
- amJarRsrc.setVisibility(visibility)
- amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath))
- amJarRsrc.setTimestamp(destStatus.getModificationTime())
- amJarRsrc.setSize(destStatus.getLen())
- if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name")
- localResources(link) = amJarRsrc
-
- if (appMasterOnly == false) {
- val uri = destPath.toUri()
- val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
- if (resourceType == LocalResourceType.FILE) {
- distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(),
- destStatus.getModificationTime().toString(), visibility.name())
- } else {
- distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(),
- destStatus.getModificationTime().toString(), visibility.name())
- }
- }
- }
-
- /**
- * Adds the necessary cache file env variables to the env passed in
- * @param env
- */
- def setDistFilesEnv(env: Map[String, String]) = {
- val (keys, tupleValues) = distCacheFiles.unzip
- val (sizes, timeStamps, visibilities) = tupleValues.unzip3
-
- if (keys.size > 0) {
- env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
- env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") =
- timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
- env("SPARK_YARN_CACHE_FILES_FILE_SIZES") =
- sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
- env("SPARK_YARN_CACHE_FILES_VISIBILITIES") =
- visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
- }
- }
-
- /**
- * Adds the necessary cache archive env variables to the env passed in
- * @param env
- */
- def setDistArchivesEnv(env: Map[String, String]) = {
- val (keys, tupleValues) = distCacheArchives.unzip
- val (sizes, timeStamps, visibilities) = tupleValues.unzip3
-
- if (keys.size > 0) {
- env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
- env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") =
- timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
- env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
- sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
- env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") =
- visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
- }
- }
-
- /**
- * Returns the local resource visibility depending on the cache file permissions
- * @param conf
- * @param uri
- * @param statCache
- * @return LocalResourceVisibility
- */
- def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
- LocalResourceVisibility = {
- if (isPublic(conf, uri, statCache)) {
- return LocalResourceVisibility.PUBLIC
- }
- return LocalResourceVisibility.PRIVATE
- }
-
- /**
- * Returns a boolean to denote whether a cache file is visible to all(public)
- * or not
- * @param conf
- * @param uri
- * @param statCache
- * @return true if the path in the uri is visible to all, false otherwise
- */
- def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
- val fs = FileSystem.get(uri, conf)
- val current = new Path(uri.getPath())
- //the leaf level file should be readable by others
- if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
- return false
- }
- return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
- }
-
- /**
- * Returns true if all ancestors of the specified path have the 'execute'
- * permission set for all users (i.e. that other users can traverse
- * the directory heirarchy to the given path)
- * @param fs
- * @param path
- * @param statCache
- * @return true if all ancestors have the 'execute' permission set for all users
- */
- def ancestorsHaveExecutePermissions(fs: FileSystem, path: Path,
- statCache: Map[URI, FileStatus]): Boolean = {
- var current = path
- while (current != null) {
- //the subdirs in the path should have execute permissions for others
- if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
- return false
- }
- current = current.getParent()
- }
- return true
- }
-
- /**
- * Checks for a given path whether the Other permissions on it
- * imply the permission in the passed FsAction
- * @param fs
- * @param path
- * @param action
- * @param statCache
- * @return true if the path in the uri is visible to all, false otherwise
- */
- def checkPermissionOfOther(fs: FileSystem, path: Path,
- action: FsAction, statCache: Map[URI, FileStatus]): Boolean = {
- val status = getFileStatus(fs, path.toUri(), statCache)
- val perms = status.getPermission()
- val otherAction = perms.getOtherAction()
- if (otherAction.implies(action)) {
- return true
- }
- return false
- }
-
- /**
- * Checks to see if the given uri exists in the cache, if it does it
- * returns the existing FileStatus, otherwise it stats the uri, stores
- * it in the cache, and returns the FileStatus.
- * @param fs
- * @param uri
- * @param statCache
- * @return FileStatus
- */
- def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = {
- val stat = statCache.get(uri) match {
- case Some(existstat) => existstat
- case None =>
- val newStat = fs.getFileStatus(new Path(uri))
- statCache.put(uri, newStat)
- newStat
- }
- return stat
- }
-}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
deleted file mode 100644
index a8de89c670..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.Socket
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import akka.actor._
-import akka.remote._
-import akka.actor.Terminated
-import org.apache.spark.{SparkConf, SparkContext, Logging}
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.SplitInfo
-
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
-
- def this(args: ApplicationMasterArguments) = this(args, new Configuration())
-
- private val rpc: YarnRPC = YarnRPC.create(conf)
- private var resourceManager: AMRMProtocol = null
- private var appAttemptId: ApplicationAttemptId = null
- private var reporterThread: Thread = null
- private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
- private var yarnAllocator: YarnAllocationHandler = null
- private var driverClosed:Boolean = false
- private val sparkConf = new SparkConf
-
- val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
- conf = sparkConf)._1
- var actor: ActorRef = null
-
- // This actor just working as a monitor to watch on Driver Actor.
- class MonitorActor(driverUrl: String) extends Actor {
-
- var driver: ActorSelection = null
-
- override def preStart() {
- logInfo("Listen to driver: " + driverUrl)
- driver = context.actorSelection(driverUrl)
- driver ! "hello"
- context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
- }
-
- override def receive = {
- case x: DisassociatedEvent =>
- logInfo(s"Driver terminated or disconnected! Shutting down. $x")
- driverClosed = true
- }
- }
-
- def run() {
-
- appAttemptId = getApplicationAttemptId()
- resourceManager = registerWithResourceManager()
- val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
-
- // Compute number of threads for akka
- val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
-
- if (minimumMemory > 0) {
- val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
- val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
-
- if (numCore > 0) {
- // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
- // TODO: Uncomment when hadoop is on a version which has this fixed.
- // args.workerCores = numCore
- }
- }
-
- waitForSparkMaster()
-
- // Allocate all containers
- allocateWorkers()
-
- // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
- // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
-
- val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
- // must be <= timeoutInterval/ 2.
- // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
- // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
- val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
- reporterThread = launchReporterThread(interval)
-
- // Wait for the reporter thread to Finish.
- reporterThread.join()
-
- finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- actorSystem.shutdown()
-
- logInfo("Exited")
- System.exit(0)
- }
-
- private def getApplicationAttemptId(): ApplicationAttemptId = {
- val envs = System.getenv()
- val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
- val containerId = ConverterUtils.toContainerId(containerIdString)
- val appAttemptId = containerId.getApplicationAttemptId()
- logInfo("ApplicationAttemptId: " + appAttemptId)
- return appAttemptId
- }
-
- private def registerWithResourceManager(): AMRMProtocol = {
- val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
- logInfo("Connecting to ResourceManager at " + rmAddress)
- return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
- }
-
- private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
- logInfo("Registering the ApplicationMaster")
- val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
- .asInstanceOf[RegisterApplicationMasterRequest]
- appMasterRequest.setApplicationAttemptId(appAttemptId)
- // Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
- // Users can then monitor stderr/stdout on that node if required.
- appMasterRequest.setHost(Utils.localHostName())
- appMasterRequest.setRpcPort(0)
- // What do we provide here ? Might make sense to expose something sensible later ?
- appMasterRequest.setTrackingUrl("")
- return resourceManager.registerApplicationMaster(appMasterRequest)
- }
-
- private def waitForSparkMaster() {
- logInfo("Waiting for spark driver to be reachable.")
- var driverUp = false
- val hostport = args.userArgs(0)
- val (driverHost, driverPort) = Utils.parseHostPort(hostport)
- while(!driverUp) {
- try {
- val socket = new Socket(driverHost, driverPort)
- socket.close()
- logInfo("Master now available: " + driverHost + ":" + driverPort)
- driverUp = true
- } catch {
- case e: Exception =>
- logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
- Thread.sleep(100)
- }
- }
- sparkConf.set("spark.driver.host", driverHost)
- sparkConf.set("spark.driver.port", driverPort.toString)
-
- val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
- actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
- }
-
-
- private def allocateWorkers() {
-
- // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
- val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
- scala.collection.immutable.Map()
-
- yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
- args, preferredNodeLocationData, sparkConf)
-
- logInfo("Allocating " + args.numWorkers + " workers.")
- // Wait until all containers have finished
- // TODO: This is a bit ugly. Can we make it nicer?
- // TODO: Handle container failure
- while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
- yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
- Thread.sleep(100)
- }
-
- logInfo("All workers have launched.")
-
- }
-
- // TODO: We might want to extend this to allocate more containers in case they die !
- private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
-
- val t = new Thread {
- override def run() {
- while (!driverClosed) {
- val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
- if (missingWorkerCount > 0) {
- logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
- yarnAllocator.allocateContainers(missingWorkerCount)
- }
- else sendProgress()
- Thread.sleep(sleepTime)
- }
- }
- }
- // setting to daemon status, though this is usually not a good idea.
- t.setDaemon(true)
- t.start()
- logInfo("Started progress reporter thread - sleep time : " + sleepTime)
- return t
- }
-
- private def sendProgress() {
- logDebug("Sending progress")
- // simulated with an allocate request with no nodes requested ...
- yarnAllocator.allocateContainers(0)
- }
-
- def finishApplicationMaster(status: FinalApplicationStatus) {
-
- logInfo("finish ApplicationMaster with " + status)
- val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
- .asInstanceOf[FinishApplicationMasterRequest]
- finishReq.setAppAttemptId(appAttemptId)
- finishReq.setFinishApplicationStatus(status)
- resourceManager.finishApplicationMaster(finishReq)
- }
-
-}
-
-
-object WorkerLauncher {
- def main(argStrings: Array[String]) {
- val args = new ApplicationMasterArguments(argStrings)
- new WorkerLauncher(args).run()
- }
-}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
deleted file mode 100644
index 6a90cc51cf..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
-
-import org.apache.spark.Logging
-
-
-class WorkerRunnable(
- container: Container,
- conf: Configuration,
- masterAddress: String,
- slaveId: String,
- hostname: String,
- workerMemory: Int,
- workerCores: Int)
- extends Runnable with Logging {
-
- var rpc: YarnRPC = YarnRPC.create(conf)
- var cm: ContainerManager = null
- val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
- def run = {
- logInfo("Starting Worker Container")
- cm = connectToCM
- startContainer
- }
-
- def startContainer = {
- logInfo("Setting up ContainerLaunchContext")
-
- val ctx = Records.newRecord(classOf[ContainerLaunchContext])
- .asInstanceOf[ContainerLaunchContext]
-
- ctx.setContainerId(container.getId())
- ctx.setResource(container.getResource())
- val localResources = prepareLocalResources
- ctx.setLocalResources(localResources)
-
- val env = prepareEnvironment
- ctx.setEnvironment(env)
-
- // Extra options for the JVM
- var JAVA_OPTS = ""
- // Set the JVM memory
- val workerMemoryString = workerMemory + "m"
- JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " "
- if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
- JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
- }
-
- JAVA_OPTS += " -Djava.io.tmpdir=" +
- new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
-
- // Commenting it out for now - so that people can refer to the properties if required. Remove
- // it once cpuset version is pushed out.
- // The context is, default gc for server class machines end up using all cores to do gc - hence
- // if there are multiple containers in same node, spark gc effects all other containers
- // performance (which can also be other spark containers)
- // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
- // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
- // of cores on a node.
-/*
- else {
- // If no java_opts specified, default to using -XX:+CMSIncrementalMode
- // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont
- // want to mess with it.
- // In our expts, using (default) throughput collector has severe perf ramnifications in
- // multi-tennent machines
- // The options are based on
- // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
- JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
- JAVA_OPTS += " -XX:+CMSIncrementalMode "
- JAVA_OPTS += " -XX:+CMSIncrementalPacing "
- JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
- JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
- }
-*/
-
- ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
-
- val credentials = UserGroupInformation.getCurrentUser().getCredentials()
- val dob = new DataOutputBuffer()
- credentials.writeTokenStorageToStream(dob)
- ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
-
- var javaCommand = "java"
- val javaHome = System.getenv("JAVA_HOME")
- if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
- javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
- }
-
- val commands = List[String](javaCommand +
- " -server " +
- // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
- // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in
- // an inconsistent state.
- // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
- // 'something' to fail job ... akin to blacklisting trackers in mapred ?
- " -XX:OnOutOfMemoryError='kill %p' " +
- JAVA_OPTS +
- " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
- masterAddress + " " +
- slaveId + " " +
- hostname + " " +
- workerCores +
- " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
- " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
- logInfo("Setting up worker with commands: " + commands)
- ctx.setCommands(commands)
-
- // Send the start request to the ContainerManager
- val startReq = Records.newRecord(classOf[StartContainerRequest])
- .asInstanceOf[StartContainerRequest]
- startReq.setContainerLaunchContext(ctx)
- cm.startContainer(startReq)
- }
-
- private def setupDistributedCache(
- file: String,
- rtype: LocalResourceType,
- localResources: HashMap[String, LocalResource],
- timestamp: String,
- size: String,
- vis: String) = {
- val uri = new URI(file)
- val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
- amJarRsrc.setType(rtype)
- amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
- amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
- amJarRsrc.setTimestamp(timestamp.toLong)
- amJarRsrc.setSize(size.toLong)
- localResources(uri.getFragment()) = amJarRsrc
- }
-
- def prepareLocalResources: HashMap[String, LocalResource] = {
- logInfo("Preparing Local resources")
- val localResources = HashMap[String, LocalResource]()
-
- if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
- val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
- val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
- val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
- val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
- for( i <- 0 to distFiles.length - 1) {
- setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
- fileSizes(i), visibilities(i))
- }
- }
-
- if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
- val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
- val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
- val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
- val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
- for( i <- 0 to distArchives.length - 1) {
- setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
- timeStamps(i), fileSizes(i), visibilities(i))
- }
- }
-
- logInfo("Prepared Local resources " + localResources)
- return localResources
- }
-
- def prepareEnvironment: HashMap[String, String] = {
- val env = new HashMap[String, String]()
-
- Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
-
- // Allow users to specify some environment variables
- Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
-
- System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
- return env
- }
-
- def connectToCM: ContainerManager = {
- val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
- val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
- logInfo("Connecting to ContainerManager at " + cmHostPortStr)
-
- // Use doAs and remoteUser here so we can add the container token and not pollute the current
- // users credentials with all of the individual container tokens
- val user = UserGroupInformation.createRemoteUser(container.getId().toString())
- val containerToken = container.getContainerToken()
- if (containerToken != null) {
- user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
- }
-
- val proxy = user
- .doAs(new PrivilegedExceptionAction[ContainerManager] {
- def run: ContainerManager = {
- return rpc.getProxy(classOf[ContainerManager],
- cmAddress, conf).asInstanceOf[ContainerManager]
- }
- })
- proxy
- }
-
-}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
deleted file mode 100644
index c8af653b3f..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ /dev/null
@@ -1,680 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.lang.{Boolean => JBoolean}
-import java.util.{Collections, Set => JSet}
-import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.Utils
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.AMRMProtocol
-import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId}
-import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus}
-import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
-import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
-import org.apache.hadoop.yarn.util.{RackResolver, Records}
-
-
-object AllocationType extends Enumeration {
- type AllocationType = Value
- val HOST, RACK, ANY = Value
-}
-
-// TODO:
-// Too many params.
-// Needs to be mt-safe
-// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
-// make it more proactive and decoupled.
-
-// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
-// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
-// more info on how we are requesting for containers.
-private[yarn] class YarnAllocationHandler(
- val conf: Configuration,
- val resourceManager: AMRMProtocol,
- val appAttemptId: ApplicationAttemptId,
- val maxWorkers: Int,
- val workerMemory: Int,
- val workerCores: Int,
- val preferredHostToCount: Map[String, Int],
- val preferredRackToCount: Map[String, Int],
- val sparkConf: SparkConf)
- extends Logging {
- // These three are locked on allocatedHostToContainersMap. Complementary data structures
- // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
- // allocatedContainerToHostMap: container to host mapping.
- private val allocatedHostToContainersMap =
- new HashMap[String, collection.mutable.Set[ContainerId]]()
-
- private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
-
- // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
- // allocated node)
- // As with the two data structures above, tightly coupled with them, and to be locked on
- // allocatedHostToContainersMap
- private val allocatedRackCount = new HashMap[String, Int]()
-
- // Containers which have been released.
- private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]()
- // Containers to be released in next request to RM
- private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
-
- private val numWorkersRunning = new AtomicInteger()
- // Used to generate a unique id per worker
- private val workerIdCounter = new AtomicInteger()
- private val lastResponseId = new AtomicInteger()
- private val numWorkersFailed = new AtomicInteger()
-
- def getNumWorkersRunning: Int = numWorkersRunning.intValue
-
- def getNumWorkersFailed: Int = numWorkersFailed.intValue
-
- def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
- }
-
- def allocateContainers(workersToRequest: Int) {
- // We need to send the request only once from what I understand ... but for now, not modifying
- // this much.
-
- // Keep polling the Resource Manager for containers
- val amResp = allocateWorkerResources(workersToRequest).getAMResponse
-
- val _allocatedContainers = amResp.getAllocatedContainers()
-
- if (_allocatedContainers.size > 0) {
- logDebug("""
- Allocated containers: %d
- Current worker count: %d
- Containers released: %s
- Containers to be released: %s
- Cluster resources: %s
- """.format(
- _allocatedContainers.size,
- numWorkersRunning.get(),
- releasedContainerList,
- pendingReleaseContainers,
- amResp.getAvailableResources))
-
- val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
-
- // Ignore if not satisfying constraints {
- for (container <- _allocatedContainers) {
- if (isResourceConstraintSatisfied(container)) {
- // allocatedContainers += container
-
- val host = container.getNodeId.getHost
- val containers = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]())
-
- containers += container
- }
- // Add all ignored containers to released list
- else releasedContainerList.add(container.getId())
- }
-
- // Find the appropriate containers to use. Slightly non trivial groupBy ...
- val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
- val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
- val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
-
- for (candidateHost <- hostToContainers.keySet)
- {
- val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
- val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
-
- var remainingContainers = hostToContainers.get(candidateHost).getOrElse(null)
- assert(remainingContainers != null)
-
- if (requiredHostCount >= remainingContainers.size){
- // Since we got <= required containers, add all to dataLocalContainers
- dataLocalContainers.put(candidateHost, remainingContainers)
- // all consumed
- remainingContainers = null
- }
- else if (requiredHostCount > 0) {
- // Container list has more containers than we need for data locality.
- // Split into two : data local container count of (remainingContainers.size -
- // requiredHostCount) and rest as remainingContainer
- val (dataLocal, remaining) = remainingContainers.splitAt(
- remainingContainers.size - requiredHostCount)
- dataLocalContainers.put(candidateHost, dataLocal)
- // remainingContainers = remaining
-
- // yarn has nasty habit of allocating a tonne of containers on a host - discourage this :
- // add remaining to release list. If we have insufficient containers, next allocation
- // cycle will reallocate (but wont treat it as data local)
- for (container <- remaining) releasedContainerList.add(container.getId())
- remainingContainers = null
- }
-
- // Now rack local
- if (remainingContainers != null){
- val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
-
- if (rack != null){
- val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
- val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
- rackLocalContainers.get(rack).getOrElse(List()).size
-
-
- if (requiredRackCount >= remainingContainers.size){
- // Add all to dataLocalContainers
- dataLocalContainers.put(rack, remainingContainers)
- // All consumed
- remainingContainers = null
- }
- else if (requiredRackCount > 0) {
- // container list has more containers than we need for data locality.
- // Split into two : data local container count of (remainingContainers.size -
- // requiredRackCount) and rest as remainingContainer
- val (rackLocal, remaining) = remainingContainers.splitAt(
- remainingContainers.size - requiredRackCount)
- val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
- new ArrayBuffer[Container]())
-
- existingRackLocal ++= rackLocal
- remainingContainers = remaining
- }
- }
- }
-
- // If still not consumed, then it is off rack host - add to that list.
- if (remainingContainers != null){
- offRackContainers.put(candidateHost, remainingContainers)
- }
- }
-
- // Now that we have split the containers into various groups, go through them in order :
- // first host local, then rack local and then off rack (everything else).
- // Note that the list we create below tries to ensure that not all containers end up within a
- // host if there are sufficiently large number of hosts/containers.
-
- val allocatedContainers = new ArrayBuffer[Container](_allocatedContainers.size)
- allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
- allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
- allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
-
- // Run each of the allocated containers
- for (container <- allocatedContainers) {
- val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
- val workerHostname = container.getNodeId.getHost
- val containerId = container.getId
-
- assert(
- container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
-
- if (numWorkersRunningNow > maxWorkers) {
- logInfo("""Ignoring container %s at host %s, since we already have the required number of
- containers for it.""".format(containerId, workerHostname))
- releasedContainerList.add(containerId)
- // reset counter back to old value.
- numWorkersRunning.decrementAndGet()
- }
- else {
- // Deallocate + allocate can result in reusing id's wrongly - so use a different counter
- // (workerIdCounter)
- val workerId = workerIdCounter.incrementAndGet().toString
- val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"),
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
- logInfo("launching container on " + containerId + " host " + workerHostname)
- // Just to be safe, simply remove it from pendingReleaseContainers.
- // Should not be there, but ..
- pendingReleaseContainers.remove(containerId)
-
- val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
- allocatedHostToContainersMap.synchronized {
- val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
- new HashSet[ContainerId]())
-
- containerSet += containerId
- allocatedContainerToHostMap.put(containerId, workerHostname)
- if (rack != null) {
- allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
- }
- }
-
- new Thread(
- new WorkerRunnable(container, conf, driverUrl, workerId,
- workerHostname, workerMemory, workerCores)
- ).start()
- }
- }
- logDebug("""
- Finished processing %d containers.
- Current number of workers running: %d,
- releasedContainerList: %s,
- pendingReleaseContainers: %s
- """.format(
- allocatedContainers.size,
- numWorkersRunning.get(),
- releasedContainerList,
- pendingReleaseContainers))
- }
-
-
- val completedContainers = amResp.getCompletedContainersStatuses()
- if (completedContainers.size > 0){
- logDebug("Completed %d containers, to-be-released: %s".format(
- completedContainers.size, releasedContainerList))
- for (completedContainer <- completedContainers){
- val containerId = completedContainer.getContainerId
-
- // Was this released by us ? If yes, then simply remove from containerSet and move on.
- if (pendingReleaseContainers.containsKey(containerId)) {
- pendingReleaseContainers.remove(containerId)
- }
- else {
- // Simply decrement count - next iteration of ReporterThread will take care of allocating.
- numWorkersRunning.decrementAndGet()
- logInfo("Completed container %s (state: %s, exit status: %s)".format(
- containerId,
- completedContainer.getState,
- completedContainer.getExitStatus()))
- // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
- // there are some exit status' we shouldn't necessarily count against us, but for
- // now I think its ok as none of the containers are expected to exit
- if (completedContainer.getExitStatus() != 0) {
- logInfo("Container marked as failed: " + containerId)
- numWorkersFailed.incrementAndGet()
- }
- }
-
- allocatedHostToContainersMap.synchronized {
- if (allocatedContainerToHostMap.containsKey(containerId)) {
- val host = allocatedContainerToHostMap.get(containerId).getOrElse(null)
- assert (host != null)
-
- val containerSet = allocatedHostToContainersMap.get(host).getOrElse(null)
- assert (containerSet != null)
-
- containerSet -= containerId
- if (containerSet.isEmpty) allocatedHostToContainersMap.remove(host)
- else allocatedHostToContainersMap.update(host, containerSet)
-
- allocatedContainerToHostMap -= containerId
-
- // Doing this within locked context, sigh ... move to outside ?
- val rack = YarnAllocationHandler.lookupRack(conf, host)
- if (rack != null) {
- val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
- if (rackCount > 0) allocatedRackCount.put(rack, rackCount)
- else allocatedRackCount.remove(rack)
- }
- }
- }
- }
- logDebug("""
- Finished processing %d completed containers.
- Current number of workers running: %d,
- releasedContainerList: %s,
- pendingReleaseContainers: %s
- """.format(
- completedContainers.size,
- numWorkersRunning.get(),
- releasedContainerList,
- pendingReleaseContainers))
- }
- }
-
- def createRackResourceRequests(hostContainers: List[ResourceRequest]): List[ResourceRequest] = {
- // First generate modified racks and new set of hosts under it : then issue requests
- val rackToCounts = new HashMap[String, Int]()
-
- // Within this lock - used to read/write to the rack related maps too.
- for (container <- hostContainers) {
- val candidateHost = container.getHostName
- val candidateNumContainers = container.getNumContainers
- assert(YarnAllocationHandler.ANY_HOST != candidateHost)
-
- val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
- if (rack != null) {
- var count = rackToCounts.getOrElse(rack, 0)
- count += candidateNumContainers
- rackToCounts.put(rack, count)
- }
- }
-
- val requestedContainers: ArrayBuffer[ResourceRequest] =
- new ArrayBuffer[ResourceRequest](rackToCounts.size)
- for ((rack, count) <- rackToCounts){
- requestedContainers +=
- createResourceRequest(AllocationType.RACK, rack, count, YarnAllocationHandler.PRIORITY)
- }
-
- requestedContainers.toList
- }
-
- def allocatedContainersOnHost(host: String): Int = {
- var retval = 0
- allocatedHostToContainersMap.synchronized {
- retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
- }
- retval
- }
-
- def allocatedContainersOnRack(rack: String): Int = {
- var retval = 0
- allocatedHostToContainersMap.synchronized {
- retval = allocatedRackCount.getOrElse(rack, 0)
- }
- retval
- }
-
- private def allocateWorkerResources(numWorkers: Int): AllocateResponse = {
-
- var resourceRequests: List[ResourceRequest] = null
-
- // default.
- if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
- logDebug("numWorkers: " + numWorkers + ", host preferences: " + preferredHostToCount.isEmpty)
- resourceRequests = List(
- createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY))
- }
- else {
- // request for all hosts in preferred nodes and for numWorkers -
- // candidates.size, request by default allocation policy.
- val hostContainerRequests: ArrayBuffer[ResourceRequest] =
- new ArrayBuffer[ResourceRequest](preferredHostToCount.size)
- for ((candidateHost, candidateCount) <- preferredHostToCount) {
- val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
-
- if (requiredCount > 0) {
- hostContainerRequests += createResourceRequest(
- AllocationType.HOST,
- candidateHost,
- requiredCount,
- YarnAllocationHandler.PRIORITY)
- }
- }
- val rackContainerRequests: List[ResourceRequest] = createRackResourceRequests(
- hostContainerRequests.toList)
-
- val anyContainerRequests: ResourceRequest = createResourceRequest(
- AllocationType.ANY,
- resource = null,
- numWorkers,
- YarnAllocationHandler.PRIORITY)
-
- val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
- hostContainerRequests.size + rackContainerRequests.size + 1)
-
- containerRequests ++= hostContainerRequests
- containerRequests ++= rackContainerRequests
- containerRequests += anyContainerRequests
-
- resourceRequests = containerRequests.toList
- }
-
- val req = Records.newRecord(classOf[AllocateRequest])
- req.setResponseId(lastResponseId.incrementAndGet)
- req.setApplicationAttemptId(appAttemptId)
-
- req.addAllAsks(resourceRequests)
-
- val releasedContainerList = createReleasedContainerList()
- req.addAllReleases(releasedContainerList)
-
- if (numWorkers > 0) {
- logInfo("Allocating %d worker containers with %d of memory each.".format(numWorkers,
- workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
- }
- else {
- logDebug("Empty allocation req .. release : " + releasedContainerList)
- }
-
- for (request <- resourceRequests) {
- logInfo("ResourceRequest (host : %s, num containers: %d, priority = %s , capability : %s)".
- format(
- request.getHostName,
- request.getNumContainers,
- request.getPriority,
- request.getCapability))
- }
- resourceManager.allocate(req)
- }
-
-
- private def createResourceRequest(
- requestType: AllocationType.AllocationType,
- resource:String,
- numWorkers: Int,
- priority: Int): ResourceRequest = {
-
- // If hostname specified, we need atleast two requests - node local and rack local.
- // There must be a third request - which is ANY : that will be specially handled.
- requestType match {
- case AllocationType.HOST => {
- assert(YarnAllocationHandler.ANY_HOST != resource)
- val hostname = resource
- val nodeLocal = createResourceRequestImpl(hostname, numWorkers, priority)
-
- // Add to host->rack mapping
- YarnAllocationHandler.populateRackInfo(conf, hostname)
-
- nodeLocal
- }
- case AllocationType.RACK => {
- val rack = resource
- createResourceRequestImpl(rack, numWorkers, priority)
- }
- case AllocationType.ANY => createResourceRequestImpl(
- YarnAllocationHandler.ANY_HOST, numWorkers, priority)
- case _ => throw new IllegalArgumentException(
- "Unexpected/unsupported request type: " + requestType)
- }
- }
-
- private def createResourceRequestImpl(
- hostname:String,
- numWorkers: Int,
- priority: Int): ResourceRequest = {
-
- val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
- val memCapability = Records.newRecord(classOf[Resource])
- // There probably is some overhead here, let's reserve a bit more memory.
- memCapability.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
- rsrcRequest.setCapability(memCapability)
-
- val pri = Records.newRecord(classOf[Priority])
- pri.setPriority(priority)
- rsrcRequest.setPriority(pri)
-
- rsrcRequest.setHostName(hostname)
-
- rsrcRequest.setNumContainers(java.lang.Math.max(numWorkers, 0))
- rsrcRequest
- }
-
- def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
-
- val retval = new ArrayBuffer[ContainerId](1)
- // Iterator on COW list ...
- for (container <- releasedContainerList.iterator()){
- retval += container
- }
- // Remove from the original list.
- if (! retval.isEmpty) {
- releasedContainerList.removeAll(retval)
- for (v <- retval) pendingReleaseContainers.put(v, true)
- logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " +
- pendingReleaseContainers)
- }
-
- retval
- }
-}
-
-object YarnAllocationHandler {
-
- val ANY_HOST = "*"
- // All requests are issued with same priority : we do not (yet) have any distinction between
- // request types (like map/reduce in hadoop for example)
- val PRIORITY = 1
-
- // Additional memory overhead - in mb
- val MEMORY_OVERHEAD = 384
-
- // Host to rack map - saved from allocation requests
- // We are expecting this not to change.
- // Note that it is possible for this to change : and RM will indicate that to us via update
- // response to allocate. But we are punting on handling that for now.
- private val hostToRack = new ConcurrentHashMap[String, String]()
- private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
-
-
- def newAllocator(
- conf: Configuration,
- resourceManager: AMRMProtocol,
- appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
- sparkConf: SparkConf): YarnAllocationHandler = {
-
- new YarnAllocationHandler(
- conf,
- resourceManager,
- appAttemptId,
- args.numWorkers,
- args.workerMemory,
- args.workerCores,
- Map[String, Int](),
- Map[String, Int](),
- sparkConf)
- }
-
- def newAllocator(
- conf: Configuration,
- resourceManager: AMRMProtocol,
- appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
- map: collection.Map[String,
- collection.Set[SplitInfo]],
- sparkConf: SparkConf): YarnAllocationHandler = {
-
- val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
- new YarnAllocationHandler(
- conf,
- resourceManager,
- appAttemptId,
- args.numWorkers,
- args.workerMemory,
- args.workerCores,
- hostToCount,
- rackToCount,
- sparkConf)
- }
-
- def newAllocator(
- conf: Configuration,
- resourceManager: AMRMProtocol,
- appAttemptId: ApplicationAttemptId,
- maxWorkers: Int,
- workerMemory: Int,
- workerCores: Int,
- map: collection.Map[String, collection.Set[SplitInfo]],
- sparkConf: SparkConf): YarnAllocationHandler = {
-
- val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
-
- new YarnAllocationHandler(
- conf,
- resourceManager,
- appAttemptId,
- maxWorkers,
- workerMemory,
- workerCores,
- hostToCount,
- rackToCount,
- sparkConf)
- }
-
- // A simple method to copy the split info map.
- private def generateNodeToWeight(
- conf: Configuration,
- input: collection.Map[String, collection.Set[SplitInfo]]) :
- // host to count, rack to count
- (Map[String, Int], Map[String, Int]) = {
-
- if (input == null) return (Map[String, Int](), Map[String, Int]())
-
- val hostToCount = new HashMap[String, Int]
- val rackToCount = new HashMap[String, Int]
-
- for ((host, splits) <- input) {
- val hostCount = hostToCount.getOrElse(host, 0)
- hostToCount.put(host, hostCount + splits.size)
-
- val rack = lookupRack(conf, host)
- if (rack != null){
- val rackCount = rackToCount.getOrElse(host, 0)
- rackToCount.put(host, rackCount + splits.size)
- }
- }
-
- (hostToCount.toMap, rackToCount.toMap)
- }
-
- def lookupRack(conf: Configuration, host: String): String = {
- if (!hostToRack.contains(host)) populateRackInfo(conf, host)
- hostToRack.get(host)
- }
-
- def fetchCachedHostsForRack(rack: String): Option[Set[String]] = {
- val set = rackToHostSet.get(rack)
- if (set == null) return None
-
- // No better way to get a Set[String] from JSet ?
- val convertedSet: collection.mutable.Set[String] = set
- Some(convertedSet.toSet)
- }
-
- def populateRackInfo(conf: Configuration, hostname: String) {
- Utils.checkHost(hostname)
-
- if (!hostToRack.containsKey(hostname)) {
- // If there are repeated failures to resolve, all to an ignore list ?
- val rackInfo = RackResolver.resolve(conf, hostname)
- if (rackInfo != null && rackInfo.getNetworkLocation != null) {
- val rack = rackInfo.getNetworkLocation
- hostToRack.put(hostname, rack)
- if (! rackToHostSet.containsKey(rack)) {
- rackToHostSet.putIfAbsent(rack,
- Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
- }
- rackToHostSet.get(rack).add(hostname)
-
- // TODO(harvey): Figure out this comment...
- // Since RackResolver caches, we are disabling this for now ...
- } /* else {
- // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
- hostToRack.put(hostname, null)
- } */
- }
- }
-}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
deleted file mode 100644
index 2ba2366ead..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.conf.Configuration
-
-/**
- * Contains util methods to interact with Hadoop from spark.
- */
-class YarnSparkHadoopUtil extends SparkHadoopUtil {
-
- // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
- override def isYarnMode(): Boolean = { true }
-
- // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
- // Always create a new config, dont reuse yarnConf.
- override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
-
- // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
- override def addCredentials(conf: JobConf) {
- val jobCreds = conf.getCredentials()
- jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
- }
-}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
deleted file mode 100644
index 522e0a9ad7..0000000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.spark._
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.deploy.yarn.YarnAllocationHandler
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.Utils
-
-/**
- *
- * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM.
- */
-private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
-
- def this(sc: SparkContext) = this(sc, new Configuration())
-
- // By default, rack is unknown
- override def getRackForHost(hostPort: String): Option[String] = {
- val host = Utils.parseHostPort(hostPort)._1
- val retval = YarnAllocationHandler.lookupRack(conf, host)
- if (retval != null) Some(retval) else None
- }
-
- override def postStartHook() {
-
- // The yarn application is running, but the worker might not yet ready
- // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
- Thread.sleep(2000L)
- logInfo("YarnClientClusterScheduler.postStartHook done")
- }
-}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
deleted file mode 100644
index 4b69f5078b..0000000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
-import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments}
-import org.apache.spark.scheduler.TaskSchedulerImpl
-
-private[spark] class YarnClientSchedulerBackend(
- scheduler: TaskSchedulerImpl,
- sc: SparkContext)
- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
- with Logging {
-
- var client: Client = null
- var appId: ApplicationId = null
-
- override def start() {
- super.start()
-
- val defalutWorkerCores = "2"
- val defalutWorkerMemory = "512m"
- val defaultWorkerNumber = "1"
-
- val userJar = System.getenv("SPARK_YARN_APP_JAR")
- var workerCores = System.getenv("SPARK_WORKER_CORES")
- var workerMemory = System.getenv("SPARK_WORKER_MEMORY")
- var workerNumber = System.getenv("SPARK_WORKER_INSTANCES")
-
- if (userJar == null)
- throw new SparkException("env SPARK_YARN_APP_JAR is not set")
-
- if (workerCores == null)
- workerCores = defalutWorkerCores
- if (workerMemory == null)
- workerMemory = defalutWorkerMemory
- if (workerNumber == null)
- workerNumber = defaultWorkerNumber
-
- val driverHost = conf.get("spark.driver.host")
- val driverPort = conf.get("spark.driver.port")
- val hostport = driverHost + ":" + driverPort
-
- val argsArray = Array[String](
- "--class", "notused",
- "--jar", userJar,
- "--args", hostport,
- "--worker-memory", workerMemory,
- "--worker-cores", workerCores,
- "--num-workers", workerNumber,
- "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
- )
-
- val args = new ClientArguments(argsArray)
- client = new Client(args)
- appId = client.runApp()
- waitForApp()
- }
-
- def waitForApp() {
-
- // TODO : need a better way to find out whether the workers are ready or not
- // maybe by resource usage report?
- while(true) {
- val report = client.getApplicationReport(appId)
-
- logInfo("Application report from ASM: \n" +
- "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
- "\t appStartTime: " + report.getStartTime() + "\n" +
- "\t yarnAppState: " + report.getYarnApplicationState() + "\n"
- )
-
- // Ready to go, or already gone.
- val state = report.getYarnApplicationState()
- if (state == YarnApplicationState.RUNNING) {
- return
- } else if (state == YarnApplicationState.FINISHED ||
- state == YarnApplicationState.FAILED ||
- state == YarnApplicationState.KILLED) {
- throw new SparkException("Yarn application already ended," +
- "might be killed or not able to launch application master.")
- }
-
- Thread.sleep(1000)
- }
- }
-
- override def stop() {
- super.stop()
- client.stop()
- logInfo("Stoped")
- }
-
-}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
deleted file mode 100644
index 2d9fbcb400..0000000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.hadoop.conf.Configuration
-
-import org.apache.spark._
-import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler}
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.Utils
-
-/**
- *
- * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
- * ApplicationMaster, etc. is done
- */
-private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
- extends TaskSchedulerImpl(sc) {
-
- logInfo("Created YarnClusterScheduler")
-
- def this(sc: SparkContext) = this(sc, new Configuration())
-
- // Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate
- // Note that only the first creation of SparkContext influences (and ideally, there must be only one SparkContext, right ?)
- // Subsequent creations are ignored - since nodes are already allocated by then.
-
-
- // By default, rack is unknown
- override def getRackForHost(hostPort: String): Option[String] = {
- val host = Utils.parseHostPort(hostPort)._1
- val retval = YarnAllocationHandler.lookupRack(conf, host)
- if (retval != null) Some(retval) else None
- }
-
- override def postStartHook() {
- val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
- if (sparkContextInitialized){
- // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
- Thread.sleep(3000L)
- }
- logInfo("YarnClusterScheduler.postStartHook done")
- }
-}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
deleted file mode 100644
index 2941356bc5..0000000000
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-
-import org.scalatest.FunSuite
-import org.scalatest.mock.MockitoSugar
-import org.mockito.Mockito.when
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.permission.FsAction
-import org.apache.hadoop.yarn.api.records.LocalResource
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
-import org.apache.hadoop.yarn.api.records.LocalResourceType
-import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
-
-
-class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar {
-
- class MockClientDistributedCacheManager extends ClientDistributedCacheManager {
- override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
- LocalResourceVisibility = {
- return LocalResourceVisibility.PRIVATE
- }
- }
-
- test("test getFileStatus empty") {
- val distMgr = new ClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val uri = new URI("/tmp/testing")
- when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- val stat = distMgr.getFileStatus(fs, uri, statCache)
- assert(stat.getPath() === null)
- }
-
- test("test getFileStatus cached") {
- val distMgr = new ClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val uri = new URI("/tmp/testing")
- val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner",
- null, new Path("/tmp/testing"))
- when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus)
- val stat = distMgr.getFileStatus(fs, uri, statCache)
- assert(stat.getPath().toString() === "/tmp/testing")
- }
-
- test("test addResource") {
- val distMgr = new MockClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val conf = new Configuration()
- val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
- val localResources = HashMap[String, LocalResource]()
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
-
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link",
- statCache, false)
- val resource = localResources("link")
- assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
- assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
- assert(resource.getTimestamp() === 0)
- assert(resource.getSize() === 0)
- assert(resource.getType() === LocalResourceType.FILE)
-
- val env = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env)
- assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0")
- assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0")
- assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
-
- distMgr.setDistArchivesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
-
- //add another one and verify both there and order correct
- val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
- null, new Path("/tmp/testing2"))
- val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")
- when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus)
- distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2",
- statCache, false)
- val resource2 = localResources("link2")
- assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE)
- assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === destPath2)
- assert(resource2.getTimestamp() === 10)
- assert(resource2.getSize() === 20)
- assert(resource2.getType() === LocalResourceType.FILE)
-
- val env2 = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env2)
- val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
- val files = env2("SPARK_YARN_CACHE_FILES").split(',')
- val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
- val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',')
- assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(timestamps(0) === "0")
- assert(sizes(0) === "0")
- assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name())
-
- assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2")
- assert(timestamps(1) === "10")
- assert(sizes(1) === "20")
- assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name())
- }
-
- test("test addResource link null") {
- val distMgr = new MockClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val conf = new Configuration()
- val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
- val localResources = HashMap[String, LocalResource]()
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
-
- intercept[Exception] {
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null,
- statCache, false)
- }
- assert(localResources.get("link") === None)
- assert(localResources.size === 0)
- }
-
- test("test addResource appmaster only") {
- val distMgr = new MockClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val conf = new Configuration()
- val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
- val localResources = HashMap[String, LocalResource]()
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
- null, new Path("/tmp/testing"))
- when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
-
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
- statCache, true)
- val resource = localResources("link")
- assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
- assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
- assert(resource.getTimestamp() === 10)
- assert(resource.getSize() === 20)
- assert(resource.getType() === LocalResourceType.ARCHIVE)
-
- val env = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_FILES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
-
- distMgr.setDistArchivesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
- }
-
- test("test addResource archive") {
- val distMgr = new MockClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val conf = new Configuration()
- val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
- val localResources = HashMap[String, LocalResource]()
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
- null, new Path("/tmp/testing"))
- when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
-
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
- statCache, false)
- val resource = localResources("link")
- assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
- assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
- assert(resource.getTimestamp() === 10)
- assert(resource.getSize() === 20)
- assert(resource.getType() === LocalResourceType.ARCHIVE)
-
- val env = new HashMap[String, String]()
-
- distMgr.setDistArchivesEnv(env)
- assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
-
- distMgr.setDistFilesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_FILES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
- }
-
-
-}