aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-12-08 10:56:22 -0800
committerAaron Davidson <aaron@databricks.com>2013-12-08 11:16:52 -0800
commit40f63eb034ee5669dba87deb5f8f37c10bf5df0c (patch)
tree36da47eed091273fe5b59012c2a87f1c3d5f54e1 /yarn
parent2b0a6e7d9210ed828395243027c7001f7dae77a4 (diff)
parent10c3c0c6524d0cf6c59b6f2227bf316cdeb7d06c (diff)
downloadspark-40f63eb034ee5669dba87deb5f8f37c10bf5df0c.tar.gz
spark-40f63eb034ee5669dba87deb5f8f37c10bf5df0c.tar.bz2
spark-40f63eb034ee5669dba87deb5f8f37c10bf5df0c.zip
Merge master into 127
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala197
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala197
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala40
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala246
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala98
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala354
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala5
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala47
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala109
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala2
11 files changed, 936 insertions, 365 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4302ef4cda..240ed8b32a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -17,14 +17,17 @@
package org.apache.spark.deploy.yarn
-import java.io.IOException;
+import java.io.IOException
import java.net.Socket
-import java.security.PrivilegedExceptionAction
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+
+import scala.collection.JavaConversions._
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
@@ -32,47 +35,49 @@ import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+
import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.util.Utils
-import org.apache.hadoop.security.UserGroupInformation
-import scala.collection.JavaConversions._
+
class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
def this(args: ApplicationMasterArguments) = this(args, new Configuration())
private var rpc: YarnRPC = YarnRPC.create(conf)
- private var resourceManager: AMRMProtocol = null
- private var appAttemptId: ApplicationAttemptId = null
- private var userThread: Thread = null
+ private var resourceManager: AMRMProtocol = _
+ private var appAttemptId: ApplicationAttemptId = _
+ private var userThread: Thread = _
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private val fs = FileSystem.get(yarnConf)
- private var yarnAllocator: YarnAllocationHandler = null
- private var isFinished:Boolean = false
- private var uiAddress: String = ""
+ private var yarnAllocator: YarnAllocationHandler = _
+ private var isFinished: Boolean = false
+ private var uiAddress: String = _
private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true
-
+ // default to numWorkers * 2, with minimum of 3
+ private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures",
+ math.max(args.numWorkers * 2, 3).toString()).toInt
def run() {
- // setup the directories so things go to yarn approved directories rather
- // then user specified and /tmp
+ // Setup the directories so things go to yarn approved directories rather
+ // then user specified and /tmp.
System.setProperty("spark.local.dir", getLocalDirs())
- // use priority 30 as its higher then HDFS. Its same priority as MapReduce is using
+ // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
appAttemptId = getApplicationAttemptId()
- isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts;
+ isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
resourceManager = registerWithResourceManager()
// Workaround until hadoop moves to something which has
// https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
- // ignore result
+ // ignore result.
// This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
- // Hence args.workerCores = numCore disabled above. Any better option ?
+ // Hence args.workerCores = numCore disabled above. Any better option?
// Compute number of threads for akka
//val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
@@ -98,7 +103,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
waitForSparkContextInitialized()
- // do this after spark master is up and SparkContext is created so that we can register UI Url
+ // Do this after spark master is up and SparkContext is created so that we can register UI Url
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
// Allocate all containers
@@ -117,12 +122,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.getOrElse(Option(System.getenv("LOCAL_DIRS"))
- .getOrElse(""))
+ .getOrElse(""))
if (localDirs.isEmpty()) {
throw new Exception("Yarn Local dirs can't be empty")
}
- return localDirs
+ localDirs
}
private def getApplicationAttemptId(): ApplicationAttemptId = {
@@ -131,7 +136,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
val containerId = ConverterUtils.toContainerId(containerIdString)
val appAttemptId = containerId.getApplicationAttemptId()
logInfo("ApplicationAttemptId: " + appAttemptId)
- return appAttemptId
+ appAttemptId
}
private def registerWithResourceManager(): AMRMProtocol = {
@@ -139,7 +144,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
logInfo("Connecting to ResourceManager at " + rmAddress)
- return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
+ rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
}
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
@@ -147,12 +152,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
.asInstanceOf[RegisterApplicationMasterRequest]
appMasterRequest.setApplicationAttemptId(appAttemptId)
- // Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
+ // Setting this to master host,port - so that the ApplicationReport at client has some
+ // sensible info.
// Users can then monitor stderr/stdout on that node if required.
appMasterRequest.setHost(Utils.localHostName())
appMasterRequest.setRpcPort(0)
appMasterRequest.setTrackingUrl(uiAddress)
- return resourceManager.registerApplicationMaster(appMasterRequest)
+ resourceManager.registerApplicationMaster(appMasterRequest)
}
private def waitForSparkMaster() {
@@ -166,28 +172,32 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
try {
val socket = new Socket(driverHost, driverPort.toInt)
socket.close()
- logInfo("Driver now available: " + driverHost + ":" + driverPort)
+ logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
driverUp = true
} catch {
- case e: Exception =>
- logWarning("Failed to connect to driver at " + driverHost + ":" + driverPort + ", retrying")
- Thread.sleep(100)
- tries = tries + 1
+ case e: Exception => {
+ logWarning("Failed to connect to driver at %s:%s, retrying ...".
+ format(driverHost, driverPort))
+ Thread.sleep(100)
+ tries = tries + 1
+ }
}
}
}
private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
- val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader)
- .getMethod("main", classOf[Array[String]])
+ val mainMethod = Class.forName(
+ args.userClass,
+ false /* initialize */,
+ Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
var successed = false
try {
// Copy
- var mainArgs: Array[String] = new Array[String](args.userArgs.size())
- args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
+ var mainArgs: Array[String] = new Array[String](args.userArgs.size)
+ args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
mainMethod.invoke(null, mainArgs)
// some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
// userThread will stop here unless it has uncaught exception thrown out
@@ -195,7 +205,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
successed = true
} finally {
logDebug("finishing main")
- isLastAMRetry = true;
+ isLastAMRetry = true
if (successed) {
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
} else {
@@ -205,7 +215,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
t.start()
- return t
+ t
}
// this need to happen before allocateWorkers
@@ -227,12 +237,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
if (null != sparkContext) {
uiAddress = sparkContext.ui.appUIAddress
- this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args,
- sparkContext.preferredNodeLocationData)
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(
+ yarnConf,
+ resourceManager,
+ appAttemptId,
+ args,
+ sparkContext.preferredNodeLocationData)
} else {
- logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime +
- ", numTries = " + numTries)
- this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args)
+ logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".
+ format(count * waitTime, numTries))
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(
+ yarnConf,
+ resourceManager,
+ appAttemptId,
+ args)
}
}
} finally {
@@ -248,44 +266,57 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
- while(yarnAllocator.getNumWorkersRunning < args.numWorkers &&
- // If user thread exists, then quit !
- userThread.isAlive) {
- this.yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
- ApplicationMaster.incrementAllocatorLoop(1)
- Thread.sleep(100)
+ // Exists the loop if the user thread exits.
+ while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
+ if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of worker failures reached")
+ }
+ yarnAllocator.allocateContainers(
+ math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+ ApplicationMaster.incrementAllocatorLoop(1)
+ Thread.sleep(100)
}
} finally {
- // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
- // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
+ // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
+ // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
logInfo("All workers have launched.")
- // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
+ // Launch a progress reporter thread, else the app will get killed after expiration
+ // (def: 10mins) timeout.
+ // TODO(harvey): Verify the timeout
if (userThread.isAlive) {
- // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
-
+ // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
- // must be <= timeoutInterval/ 2.
- // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
- // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
- val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+
+ // we want to be reasonably responsive without causing too many requests to RM.
+ val schedulerInterval =
+ System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+
+ // must be <= timeoutInterval / 2.
+ val interval = math.min(timeoutInterval / 2, schedulerInterval)
+
launchReporterThread(interval)
}
}
- // TODO: We might want to extend this to allocate more containers in case they die !
private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
val t = new Thread {
override def run() {
while (userThread.isAlive) {
+ if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of worker failures reached")
+ }
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) {
- logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
+ logInfo("Allocating %d containers to make up for (potentially) lost containers".
+ format(missingWorkerCount))
yarnAllocator.allocateContainers(missingWorkerCount)
}
else sendProgress()
@@ -293,16 +324,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
}
- // setting to daemon status, though this is usually not a good idea.
+ // Setting to daemon status, though this is usually not a good idea.
t.setDaemon(true)
t.start()
logInfo("Started progress reporter thread - sleep time : " + sleepTime)
- return t
+ t
}
private def sendProgress() {
logDebug("Sending progress")
- // simulated with an allocate request with no nodes requested ...
+ // Simulated with an allocate request with no nodes requested ...
yarnAllocator.allocateContainers(0)
}
@@ -321,8 +352,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
*/
- def finishApplicationMaster(status: FinalApplicationStatus) {
-
+ def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
synchronized {
if (isFinished) {
return
@@ -335,14 +365,14 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
- // set tracking url to empty since we don't have a history server
+ finishReq.setDiagnostics(diagnostics)
+ // Set tracking url to empty since we don't have a history server.
finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)
-
}
/**
- * clean up the staging directory.
+ * Clean up the staging directory.
*/
private def cleanupStagingDir() {
var stagingDirPath: Path = null
@@ -358,13 +388,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
fs.delete(stagingDirPath, true)
}
} catch {
- case e: IOException =>
- logError("Failed to cleanup staging dir " + stagingDirPath, e)
+ case ioe: IOException =>
+ logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
}
}
- // The shutdown hook that runs when a signal is received AND during normal
- // close of the JVM.
+ // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
def run() {
@@ -374,15 +403,14 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
}
}
-
}
object ApplicationMaster {
- // number of times to wait for the allocator loop to complete.
- // each loop iteration waits for 100ms, so maximum of 3 seconds.
+ // Number of times to wait for the allocator loop to complete.
+ // Each loop iteration waits for 100ms, so maximum of 3 seconds.
// This is to ensure that we have reasonable number of containers before we start
- // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be optimal as more
- // containers are available. Might need to handle this better.
+ // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
+ // optimal as more containers are available. Might need to handle this better.
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
def incrementAllocatorLoop(by: Int) {
val count = yarnAllocatorLoop.getAndAdd(by)
@@ -400,7 +428,8 @@ object ApplicationMaster {
applicationMasters.add(master)
}
- val sparkContextRef: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null)
+ val sparkContextRef: AtomicReference[SparkContext] =
+ new AtomicReference[SparkContext](null /* initialValue */)
val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
def sparkContextInitialized(sc: SparkContext): Boolean = {
@@ -410,19 +439,21 @@ object ApplicationMaster {
sparkContextRef.notifyAll()
}
- // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do System.exit
- // Should not really have to do this, but it helps yarn to evict resources earlier.
- // not to mention, prevent Client declaring failure even though we exit'ed properly.
- // Note that this will unfortunately not properly clean up the staging files because it gets called to
- // late and the filesystem is already shutdown.
+ // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
+ // System.exit.
+ // Should not really have to do this, but it helps YARN to evict resources earlier.
+ // Not to mention, prevent the Client from declaring failure even though we exited properly.
+ // Note that this will unfortunately not properly clean up the staging files because it gets
+ // called too late, after the filesystem is already shutdown.
if (modified) {
Runtime.getRuntime().addShutdownHook(new Thread with Logging {
- // This is not just to log, but also to ensure that log system is initialized for this instance when we actually are 'run'
+ // This is not only logs, but also ensures that log system is initialized for this instance
+ // when we are actually 'run'-ing.
logInfo("Adding shutdown hook for context " + sc)
override def run() {
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
- // best case ...
+ // Best case ...
for (master <- applicationMasters) {
master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
}
@@ -430,7 +461,7 @@ object ApplicationMaster {
} )
}
- // Wait for initialization to complete and atleast 'some' nodes can get allocated
+ // Wait for initialization to complete and atleast 'some' nodes can get allocated.
yarnAllocatorLoop.synchronized {
while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.wait(1000L)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 4e0e060ddc..79dd038065 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,49 +17,54 @@
package org.apache.spark.deploy.yarn
-import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI}
+import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.YarnClientImpl
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, Records}
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
-import scala.collection.JavaConversions._
-
import org.apache.spark.Logging
import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil
+
class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
-
+
def this(args: ClientArguments) = this(new Configuration(), args)
-
+
var rpc: YarnRPC = YarnRPC.create(conf)
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
private val SPARK_STAGING: String = ".sparkStaging"
private val distCacheMgr = new ClientDistributedCacheManager()
- // staging directory is private! -> rwx--------
+ // Staging directory is private! -> rwx--------
val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
- // app files are world-wide readable and owner writable -> rw-r--r--
+
+ // App files are world-wide readable and owner writable -> rw-r--r--
val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
- def run() {
+ // for client user who want to monitor app status by itself.
+ def runApp() = {
+ validateArgs()
+
init(yarnConf)
start()
logClusterResourceDetails()
@@ -79,33 +84,59 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
submitApp(appContext)
-
+ appId
+ }
+
+ def run() {
+ val appId = runApp()
monitorApplication(appId)
System.exit(0)
}
+ def validateArgs() = {
+ Map(
+ (System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
+ (args.userJar == null) -> "Error: You must specify a user jar!",
+ (args.userClass == null) -> "Error: You must specify a user class!",
+ (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
+ (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be " +
+ "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
+ (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size " +
+ "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD)
+ ).foreach { case(cond, errStr) =>
+ if (cond) {
+ logError(errStr)
+ args.printUsageAndExit(1)
+ }
+ }
+ }
+
def getAppStagingDir(appId: ApplicationId): String = {
SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
}
def logClusterResourceDetails() {
val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
- logInfo("Got Cluster metric info from ASM, numNodeManagers=" + clusterMetrics.getNumNodeManagers)
+ logInfo("Got Cluster metric info from ASM, numNodeManagers = " +
+ clusterMetrics.getNumNodeManagers)
val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
- logInfo("Queue info .. queueName=" + queueInfo.getQueueName + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity +
- ", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size +
- ", queueChildQueueCount=" + queueInfo.getChildQueues.size)
+ logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
+ queueApplicationCount = %s, queueChildQueueCount = %s""".format(
+ queueInfo.getQueueName,
+ queueInfo.getCurrentCapacity,
+ queueInfo.getMaximumCapacity,
+ queueInfo.getApplications.size,
+ queueInfo.getChildQueues.size))
}
-
def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory()
logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
-
- // if we have requested more then the clusters max for a single resource then exit.
+
+ // If we have requested more then the clusters max for a single resource then exit.
if (args.workerMemory > maxMem) {
- logError("the worker size is to large to run on this cluster " + args.workerMemory);
+ logError("the worker size is to large to run on this cluster " + args.workerMemory)
System.exit(1)
}
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
@@ -114,10 +145,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
System.exit(1)
}
- // We could add checks to make sure the entire cluster has enough resources but that involves getting
- // all the node reports and computing ourselves
+ // We could add checks to make sure the entire cluster has enough resources but that involves
+ // getting all the node reports and computing ourselves
}
-
+
def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
logInfo("Setting up application submission context for ASM")
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
@@ -126,9 +157,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
return appContext
}
- /*
- * see if two file systems are the same or not.
- */
+ /** See if two file systems are the same or not. */
private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
val srcUri = srcFs.getUri()
val dstUri = destFs.getUri()
@@ -142,8 +171,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
var dstHost = dstUri.getHost()
if ((srcHost != null) && (dstHost != null)) {
try {
- srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
- dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
+ srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
+ dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
} catch {
case e: UnknownHostException =>
return false
@@ -160,30 +189,27 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
if (srcUri.getPort() != dstUri.getPort()) {
return false
}
- return true;
+ return true
}
- /**
- * Copy the file into HDFS if needed.
- */
+ /** Copy the file into HDFS if needed. */
private def copyRemoteFile(
dstDir: Path,
originalPath: Path,
replication: Short,
setPerms: Boolean = false): Path = {
val fs = FileSystem.get(conf)
- val remoteFs = originalPath.getFileSystem(conf);
+ val remoteFs = originalPath.getFileSystem(conf)
var newPath = originalPath
if (! compareFs(remoteFs, fs)) {
newPath = new Path(dstDir, originalPath.getName())
logInfo("Uploading " + originalPath + " to " + newPath)
- FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf);
- fs.setReplication(newPath, replication);
+ FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
+ fs.setReplication(newPath, replication)
if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
}
- // resolve any symlinks in the URI path so using a "current" symlink
- // to point to a specific version shows the specific version
- // in the distributed cache configuration
+ // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
+ // version shows the specific version in the distributed cache configuration
val qualPath = fs.makeQualified(newPath)
val fc = FileContext.getFileContext(qualPath.toUri(), conf)
val destPath = fc.resolvePath(qualPath)
@@ -192,11 +218,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
logInfo("Preparing Local resources")
- // Upload Spark and the application JAR to the remote file system if necessary
- // Add them as local resources to the AM
+ // Upload Spark and the application JAR to the remote file system if necessary. Add them as
+ // local resources to the AM.
val fs = FileSystem.get(conf)
- val delegTokenRenewer = Master.getMasterPrincipal(conf);
+ val delegTokenRenewer = Master.getMasterPrincipal(conf)
if (UserGroupInformation.isSecurityEnabled()) {
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
logError("Can't get Master Kerberos principal for use as renewer")
@@ -208,18 +234,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
if (UserGroupInformation.isSecurityEnabled()) {
val dstFs = dst.getFileSystem(conf)
- dstFs.addDelegationTokens(delegTokenRenewer, credentials);
+ dstFs.addDelegationTokens(delegTokenRenewer, credentials)
}
val localResources = HashMap[String, LocalResource]()
FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- if (System.getenv("SPARK_JAR") == null || args.userJar == null) {
- logError("Error: You must set SPARK_JAR environment variable and specify a user jar!")
- System.exit(1)
- }
-
Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
.foreach { case(destName, _localPath) =>
@@ -228,7 +249,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
var localURI = new URI(localPath)
// if not specified assume these are in the local filesystem to keep behavior like Hadoop
if (localURI.getScheme() == null) {
- localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString())
+ localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString)
}
val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
@@ -273,10 +294,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
}
- UserGroupInformation.getCurrentUser().addCredentials(credentials);
+ UserGroupInformation.getCurrentUser().addCredentials(credentials)
return localResources
}
-
+
def setupLaunchEnv(
localResources: HashMap[String, LocalResource],
stagingDir: String): HashMap[String, String] = {
@@ -289,16 +310,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
- // set the environment variables to be passed on to the Workers
+ // Set the environment variables to be passed on to the Workers.
distCacheMgr.setDistFilesEnv(env)
distCacheMgr.setDistArchivesEnv(env)
- // allow users to specify some environment variables
+ // Allow users to specify some environment variables.
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
- // Add each SPARK-* key to the environment
+ // Add each SPARK-* key to the environment.
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
- return env
+ env
}
def userArgsToString(clientArgs: ClientArguments): String = {
@@ -308,13 +329,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
for (arg <- args){
retval.append(prefix).append(" '").append(arg).append("' ")
}
-
retval.toString
}
- def createContainerLaunchContext(newApp: GetNewApplicationResponse,
- localResources: HashMap[String, LocalResource],
- env: HashMap[String, String]): ContainerLaunchContext = {
+ def createContainerLaunchContext(
+ newApp: GetNewApplicationResponse,
+ localResources: HashMap[String, LocalResource],
+ env: HashMap[String, String]): ContainerLaunchContext = {
logInfo("Setting up container launch context")
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources)
@@ -322,8 +343,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
+ // TODO(harvey): This can probably be a val.
var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
- (if (0 != (args.amMemory % minResMemory)) minResMemory else 0) - YarnAllocationHandler.MEMORY_OVERHEAD
+ ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
+ YarnAllocationHandler.MEMORY_OVERHEAD)
// Extra options for the JVM
var JAVA_OPTS = ""
@@ -334,14 +357,18 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
-
- // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
- // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
- // node, spark gc effects all other containers performance (which can also be other spark containers)
- // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in multi-tenant environments. Not sure how default java gc behaves if it is
- // limited to subset of cores on a node.
- if (env.isDefinedAt("SPARK_USE_CONC_INCR_GC") && java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))) {
- // In our expts, using (default) throughput collector has severe perf ramnifications in multi-tenant machines
+ // Commenting it out for now - so that people can refer to the properties if required. Remove
+ // it once cpuset version is pushed out. The context is, default gc for server class machines
+ // end up using all cores to do gc - hence if there are multiple containers in same node,
+ // spark gc effects all other containers performance (which can also be other spark containers)
+ // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
+ // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
+ // of cores on a node.
+ val useConcurrentAndIncrementalGC = env.isDefinedAt("SPARK_USE_CONC_INCR_GC") &&
+ java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))
+ if (useConcurrentAndIncrementalGC) {
+ // In our expts, using (default) throughput collector has severe perf ramnifications in
+ // multi-tenant machines
JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
JAVA_OPTS += " -XX:+CMSIncrementalMode "
JAVA_OPTS += " -XX:+CMSIncrementalPacing "
@@ -354,21 +381,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
// Command for the ApplicationMaster
- var javaCommand = "java";
+ var javaCommand = "java"
val javaHome = System.getenv("JAVA_HOME")
if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
}
- if (args.userClass == null) {
- logError("Error: You must specify a user class!")
- System.exit(1)
- }
-
val commands = List[String](javaCommand +
" -server " +
JAVA_OPTS +
- " org.apache.spark.deploy.yarn.ApplicationMaster" +
+ " " + args.amClass +
" --class " + args.userClass +
" --jar " + args.userJar +
userArgsToString(args) +
@@ -379,28 +401,28 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
" 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
logInfo("Command for the ApplicationMaster: " + commands(0))
amContainer.setCommands(commands)
-
+
val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
- // Memory for the ApplicationMaster
+ // Memory for the ApplicationMaster.
capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
amContainer.setResource(capability)
- // Setup security tokens
+ // Setup security tokens.
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
- return amContainer
+ amContainer
}
-
+
def submitApp(appContext: ApplicationSubmissionContext) = {
- // Submit the application to the applications manager
+ // Submit the application to the applications manager.
logInfo("Submitting application to ASM")
super.submitApplication(appContext)
}
-
+
def monitorApplication(appId: ApplicationId): Boolean = {
- while(true) {
+ while (true) {
Thread.sleep(1000)
val report = super.getApplicationReport(appId)
@@ -418,16 +440,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
"\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
"\t appUser: " + report.getUser()
)
-
+
val state = report.getYarnApplicationState()
val dsStatus = report.getFinalApplicationStatus()
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
- return true
+ return true
}
}
- return true
+ true
}
}
@@ -442,6 +464,7 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true")
val args = new ClientArguments(argStrings)
+
new Client(args).run
}
@@ -459,7 +482,7 @@ object Client {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + LOG4J_PROP)
}
- // normally the users app.jar is last in case conflicts with spark jars
+ // Normally the users app.jar is last in case conflicts with spark jars
val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 852dbd7dab..b9dbc3fb87 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -35,6 +35,7 @@ class ClientArguments(val args: Array[String]) {
var numWorkers = 2
var amQueue = System.getProperty("QUEUE", "default")
var amMemory: Int = 512
+ var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
var appName: String = "Spark"
// TODO
var inputFormatInfo: List[InputFormatInfo] = null
@@ -62,18 +63,22 @@ class ClientArguments(val args: Array[String]) {
userArgsBuffer += value
args = tail
- case ("--master-memory") :: MemoryParam(value) :: tail =>
- amMemory = value
+ case ("--master-class") :: value :: tail =>
+ amClass = value
args = tail
- case ("--num-workers") :: IntParam(value) :: tail =>
- numWorkers = value
+ case ("--master-memory") :: MemoryParam(value) :: tail =>
+ amMemory = value
args = tail
case ("--worker-memory") :: MemoryParam(value) :: tail =>
workerMemory = value
args = tail
+ case ("--num-workers") :: IntParam(value) :: tail =>
+ numWorkers = value
+ args = tail
+
case ("--worker-cores") :: IntParam(value) :: tail =>
workerCores = value
args = tail
@@ -119,19 +124,20 @@ class ClientArguments(val args: Array[String]) {
System.err.println(
"Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
"Options:\n" +
- " --jar JAR_PATH Path to your application's JAR file (required)\n" +
- " --class CLASS_NAME Name of your application's main class (required)\n" +
- " --args ARGS Arguments to be passed to your application's main class.\n" +
- " Mutliple invocations are possible, each will be passed in order.\n" +
- " --num-workers NUM Number of workers to start (Default: 2)\n" +
- " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
- " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
- " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
- " --name NAME The name of your application (Default: Spark)\n" +
- " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
- " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
- " --files files Comma separated list of files to be distributed with the job.\n" +
- " --archives archives Comma separated list of archives to be distributed with the job."
+ " --jar JAR_PATH Path to your application's JAR file (required)\n" +
+ " --class CLASS_NAME Name of your application's main class (required)\n" +
+ " --args ARGS Arguments to be passed to your application's main class.\n" +
+ " Mutliple invocations are possible, each will be passed in order.\n" +
+ " --num-workers NUM Number of workers to start (Default: 2)\n" +
+ " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
+ " --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" +
+ " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
+ " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
+ " --name NAME The name of your application (Default: Spark)\n" +
+ " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
+ " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
+ " --files files Comma separated list of files to be distributed with the job.\n" +
+ " --archives archives Comma separated list of archives to be distributed with the job."
)
System.exit(exitCode)
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 07686fefd7..5f159b073f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.yarn
-import java.net.URI;
+import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
@@ -197,11 +197,11 @@ class ClientDistributedCacheManager() extends Logging {
*/
def checkPermissionOfOther(fs: FileSystem, path: Path,
action: FsAction, statCache: Map[URI, FileStatus]): Boolean = {
- val status = getFileStatus(fs, path.toUri(), statCache);
+ val status = getFileStatus(fs, path.toUri(), statCache)
val perms = status.getPermission()
val otherAction = perms.getOtherAction()
if (otherAction.implies(action)) {
- return true;
+ return true
}
return false
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
new file mode 100644
index 0000000000..421a83c87a
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.Socket
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import akka.actor._
+import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
+import akka.remote.RemoteClientShutdown
+import akka.actor.Terminated
+import akka.remote.RemoteClientDisconnected
+import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.SplitInfo
+
+class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+
+ def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+
+ private val rpc: YarnRPC = YarnRPC.create(conf)
+ private var resourceManager: AMRMProtocol = null
+ private var appAttemptId: ApplicationAttemptId = null
+ private var reporterThread: Thread = null
+ private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+
+ private var yarnAllocator: YarnAllocationHandler = null
+ private var driverClosed:Boolean = false
+
+ val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0)._1
+ var actor: ActorRef = null
+
+ // This actor just working as a monitor to watch on Driver Actor.
+ class MonitorActor(driverUrl: String) extends Actor {
+
+ var driver: ActorRef = null
+
+ override def preStart() {
+ logInfo("Listen to driver: " + driverUrl)
+ driver = context.actorFor(driverUrl)
+ driver ! "hello"
+ context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.watch(driver) // Doesn't work with remote actors, but useful for testing
+ }
+
+ override def receive = {
+ case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ logInfo("Driver terminated or disconnected! Shutting down.")
+ driverClosed = true
+ }
+ }
+
+ def run() {
+
+ appAttemptId = getApplicationAttemptId()
+ resourceManager = registerWithResourceManager()
+ val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+
+ // Compute number of threads for akka
+ val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+
+ if (minimumMemory > 0) {
+ val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+
+ if (numCore > 0) {
+ // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
+ // TODO: Uncomment when hadoop is on a version which has this fixed.
+ // args.workerCores = numCore
+ }
+ }
+
+ waitForSparkMaster()
+
+ // Allocate all containers
+ allocateWorkers()
+
+ // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
+ // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
+
+ val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+ // must be <= timeoutInterval/ 2.
+ // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
+ // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
+ val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+ reporterThread = launchReporterThread(interval)
+
+ // Wait for the reporter thread to Finish.
+ reporterThread.join()
+
+ finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+ actorSystem.shutdown()
+
+ logInfo("Exited")
+ System.exit(0)
+ }
+
+ private def getApplicationAttemptId(): ApplicationAttemptId = {
+ val envs = System.getenv()
+ val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
+ val containerId = ConverterUtils.toContainerId(containerIdString)
+ val appAttemptId = containerId.getApplicationAttemptId()
+ logInfo("ApplicationAttemptId: " + appAttemptId)
+ return appAttemptId
+ }
+
+ private def registerWithResourceManager(): AMRMProtocol = {
+ val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+ YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
+ logInfo("Connecting to ResourceManager at " + rmAddress)
+ return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
+ }
+
+ private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
+ logInfo("Registering the ApplicationMaster")
+ val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
+ .asInstanceOf[RegisterApplicationMasterRequest]
+ appMasterRequest.setApplicationAttemptId(appAttemptId)
+ // Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
+ // Users can then monitor stderr/stdout on that node if required.
+ appMasterRequest.setHost(Utils.localHostName())
+ appMasterRequest.setRpcPort(0)
+ // What do we provide here ? Might make sense to expose something sensible later ?
+ appMasterRequest.setTrackingUrl("")
+ return resourceManager.registerApplicationMaster(appMasterRequest)
+ }
+
+ private def waitForSparkMaster() {
+ logInfo("Waiting for spark driver to be reachable.")
+ var driverUp = false
+ val hostport = args.userArgs(0)
+ val (driverHost, driverPort) = Utils.parseHostPort(hostport)
+ while(!driverUp) {
+ try {
+ val socket = new Socket(driverHost, driverPort)
+ socket.close()
+ logInfo("Master now available: " + driverHost + ":" + driverPort)
+ driverUp = true
+ } catch {
+ case e: Exception =>
+ logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
+ Thread.sleep(100)
+ }
+ }
+ System.setProperty("spark.driver.host", driverHost)
+ System.setProperty("spark.driver.port", driverPort.toString)
+
+ val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
+ actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
+ }
+
+
+ private def allocateWorkers() {
+
+ // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
+ val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map()
+
+ yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, preferredNodeLocationData)
+
+ logInfo("Allocating " + args.numWorkers + " workers.")
+ // Wait until all containers have finished
+ // TODO: This is a bit ugly. Can we make it nicer?
+ // TODO: Handle container failure
+ while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
+ yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+ Thread.sleep(100)
+ }
+
+ logInfo("All workers have launched.")
+
+ }
+
+ // TODO: We might want to extend this to allocate more containers in case they die !
+ private def launchReporterThread(_sleepTime: Long): Thread = {
+ val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+
+ val t = new Thread {
+ override def run() {
+ while (!driverClosed) {
+ val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
+ if (missingWorkerCount > 0) {
+ logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
+ yarnAllocator.allocateContainers(missingWorkerCount)
+ }
+ else sendProgress()
+ Thread.sleep(sleepTime)
+ }
+ }
+ }
+ // setting to daemon status, though this is usually not a good idea.
+ t.setDaemon(true)
+ t.start()
+ logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+ return t
+ }
+
+ private def sendProgress() {
+ logDebug("Sending progress")
+ // simulated with an allocate request with no nodes requested ...
+ yarnAllocator.allocateContainers(0)
+ }
+
+ def finishApplicationMaster(status: FinalApplicationStatus) {
+
+ logInfo("finish ApplicationMaster with " + status)
+ val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+ .asInstanceOf[FinishApplicationMasterRequest]
+ finishReq.setAppAttemptId(appAttemptId)
+ finishReq.setFinishApplicationStatus(status)
+ resourceManager.finishApplicationMaster(finishReq)
+ }
+
+}
+
+
+object WorkerLauncher {
+ def main(argStrings: Array[String]) {
+ val args = new ApplicationMasterArguments(argStrings)
+ new WorkerLauncher(args).run()
+ }
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 7a66532254..6a90cc51cf 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -21,53 +21,59 @@ import java.net.URI
import java.nio.ByteBuffer
import java.security.PrivilegedExceptionAction
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
+
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
import org.apache.spark.Logging
-import org.apache.spark.util.Utils
-class WorkerRunnable(container: Container, conf: Configuration, masterAddress: String,
- slaveId: String, hostname: String, workerMemory: Int, workerCores: Int)
- extends Runnable with Logging {
-
+
+class WorkerRunnable(
+ container: Container,
+ conf: Configuration,
+ masterAddress: String,
+ slaveId: String,
+ hostname: String,
+ workerMemory: Int,
+ workerCores: Int)
+ extends Runnable with Logging {
+
var rpc: YarnRPC = YarnRPC.create(conf)
var cm: ContainerManager = null
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
+
def run = {
logInfo("Starting Worker Container")
cm = connectToCM
startContainer
}
-
+
def startContainer = {
logInfo("Setting up ContainerLaunchContext")
-
+
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
.asInstanceOf[ContainerLaunchContext]
-
+
ctx.setContainerId(container.getId())
ctx.setResource(container.getResource())
val localResources = prepareLocalResources
ctx.setLocalResources(localResources)
-
+
val env = prepareEnvironment
ctx.setEnvironment(env)
-
+
// Extra options for the JVM
var JAVA_OPTS = ""
// Set the JVM memory
@@ -80,17 +86,21 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
-
- // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
- // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
- // node, spark gc effects all other containers performance (which can also be other spark containers)
- // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in multi-tenant environments. Not sure how default java gc behaves if it is
- // limited to subset of cores on a node.
+ // Commenting it out for now - so that people can refer to the properties if required. Remove
+ // it once cpuset version is pushed out.
+ // The context is, default gc for server class machines end up using all cores to do gc - hence
+ // if there are multiple containers in same node, spark gc effects all other containers
+ // performance (which can also be other spark containers)
+ // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
+ // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
+ // of cores on a node.
/*
else {
// If no java_opts specified, default to using -XX:+CMSIncrementalMode
- // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont want to mess with it.
- // In our expts, using (default) throughput collector has severe perf ramnifications in multi-tennent machines
+ // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont
+ // want to mess with it.
+ // In our expts, using (default) throughput collector has severe perf ramnifications in
+ // multi-tennent machines
// The options are based on
// http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
@@ -108,7 +118,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
credentials.writeTokenStorageToStream(dob)
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
- var javaCommand = "java";
+ var javaCommand = "java"
val javaHome = System.getenv("JAVA_HOME")
if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
@@ -117,8 +127,10 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
val commands = List[String](javaCommand +
" -server " +
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
- // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in an inconsistent state.
- // TODO: If the OOM is not recoverable by rescheduling it on different node, then do 'something' to fail job ... akin to blacklisting trackers in mapred ?
+ // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in
+ // an inconsistent state.
+ // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
+ // 'something' to fail job ... akin to blacklisting trackers in mapred ?
" -XX:OnOutOfMemoryError='kill %p' " +
JAVA_OPTS +
" org.apache.spark.executor.CoarseGrainedExecutorBackend " +
@@ -130,7 +142,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
" 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
logInfo("Setting up worker with commands: " + commands)
ctx.setCommands(commands)
-
+
// Send the start request to the ContainerManager
val startReq = Records.newRecord(classOf[StartContainerRequest])
.asInstanceOf[StartContainerRequest]
@@ -138,7 +150,8 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
cm.startContainer(startReq)
}
- private def setupDistributedCache(file: String,
+ private def setupDistributedCache(
+ file: String,
rtype: LocalResourceType,
localResources: HashMap[String, LocalResource],
timestamp: String,
@@ -153,12 +166,11 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
amJarRsrc.setSize(size.toLong)
localResources(uri.getFragment()) = amJarRsrc
}
-
-
+
def prepareLocalResources: HashMap[String, LocalResource] = {
logInfo("Preparing Local resources")
val localResources = HashMap[String, LocalResource]()
-
+
if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
@@ -180,32 +192,32 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
timeStamps(i), fileSizes(i), visibilities(i))
}
}
-
+
logInfo("Prepared Local resources " + localResources)
return localResources
}
-
+
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()
Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
- // allow users to specify some environment variables
+ // Allow users to specify some environment variables
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
return env
}
-
+
def connectToCM: ContainerManager = {
val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
logInfo("Connecting to ContainerManager at " + cmHostPortStr)
- // use doAs and remoteUser here so we can add the container token and not
- // pollute the current users credentials with all of the individual container tokens
- val user = UserGroupInformation.createRemoteUser(container.getId().toString());
- val containerToken = container.getContainerToken();
+ // Use doAs and remoteUser here so we can add the container token and not pollute the current
+ // users credentials with all of the individual container tokens
+ val user = UserGroupInformation.createRemoteUser(container.getId().toString())
+ val containerToken = container.getContainerToken()
if (containerToken != null) {
user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
}
@@ -216,8 +228,8 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
return rpc.getProxy(classOf[ContainerManager],
cmAddress, conf).asInstanceOf[ContainerManager]
}
- });
- return proxy;
+ })
+ proxy
}
-
+
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 25da9aa917..f15f3c7c11 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -17,87 +17,112 @@
package org.apache.spark.deploy.yarn
+import java.lang.{Boolean => JBoolean}
+import java.util.{Collections, Set => JSet}
+import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
import org.apache.spark.Logging
-import org.apache.spark.util.Utils
import org.apache.spark.scheduler.SplitInfo
-import scala.collection
-import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container}
import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend}
+import org.apache.spark.util.Utils
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.AMRMProtocol
+import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId}
+import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus}
+import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
import org.apache.hadoop.yarn.util.{RackResolver, Records}
-import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
-import java.util.concurrent.atomic.AtomicInteger
-import org.apache.hadoop.yarn.api.AMRMProtocol
-import collection.JavaConversions._
-import collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import org.apache.hadoop.conf.Configuration
-import java.util.{Collections, Set => JSet}
-import java.lang.{Boolean => JBoolean}
+
object AllocationType extends Enumeration ("HOST", "RACK", "ANY") {
type AllocationType = Value
val HOST, RACK, ANY = Value
}
-// too many params ? refactor it 'somehow' ?
-// needs to be mt-safe
-// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive : should make it
-// more proactive and decoupled.
+// TODO:
+// Too many params.
+// Needs to be mt-safe
+// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
+// make it more proactive and decoupled.
+
// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
-// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for more info
-// on how we are requesting for containers.
-private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceManager: AMRMProtocol,
- val appAttemptId: ApplicationAttemptId,
- val maxWorkers: Int, val workerMemory: Int, val workerCores: Int,
- val preferredHostToCount: Map[String, Int],
- val preferredRackToCount: Map[String, Int])
+// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
+// more info on how we are requesting for containers.
+private[yarn] class YarnAllocationHandler(
+ val conf: Configuration,
+ val resourceManager: AMRMProtocol,
+ val appAttemptId: ApplicationAttemptId,
+ val maxWorkers: Int,
+ val workerMemory: Int,
+ val workerCores: Int,
+ val preferredHostToCount: Map[String, Int],
+ val preferredRackToCount: Map[String, Int])
extends Logging {
-
-
// These three are locked on allocatedHostToContainersMap. Complementary data structures
// allocatedHostToContainersMap : containers which are running : host, Set<containerid>
- // allocatedContainerToHostMap: container to host mapping
- private val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]]()
+ // allocatedContainerToHostMap: container to host mapping.
+ private val allocatedHostToContainersMap =
+ new HashMap[String, collection.mutable.Set[ContainerId]]()
+
private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
- // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an allocated node)
- // As with the two data structures above, tightly coupled with them, and to be locked on allocatedHostToContainersMap
+
+ // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
+ // allocated node)
+ // As with the two data structures above, tightly coupled with them, and to be locked on
+ // allocatedHostToContainersMap
private val allocatedRackCount = new HashMap[String, Int]()
- // containers which have been released.
+ // Containers which have been released.
private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]()
- // containers to be released in next request to RM
+ // Containers to be released in next request to RM
private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
private val numWorkersRunning = new AtomicInteger()
// Used to generate a unique id per worker
private val workerIdCounter = new AtomicInteger()
private val lastResponseId = new AtomicInteger()
+ private val numWorkersFailed = new AtomicInteger()
def getNumWorkersRunning: Int = numWorkersRunning.intValue
+ def getNumWorkersFailed: Int = numWorkersFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
}
def allocateContainers(workersToRequest: Int) {
- // We need to send the request only once from what I understand ... but for now, not modifying this much.
+ // We need to send the request only once from what I understand ... but for now, not modifying
+ // this much.
// Keep polling the Resource Manager for containers
val amResp = allocateWorkerResources(workersToRequest).getAMResponse
val _allocatedContainers = amResp.getAllocatedContainers()
- if (_allocatedContainers.size > 0) {
-
- logDebug("Allocated " + _allocatedContainers.size + " containers, current count " +
- numWorkersRunning.get() + ", to-be-released " + releasedContainerList +
- ", pendingReleaseContainers : " + pendingReleaseContainers)
- logDebug("Cluster Resources: " + amResp.getAvailableResources)
+ if (_allocatedContainers.size > 0) {
+ logDebug("""
+ Allocated containers: %d
+ Current worker count: %d
+ Containers released: %s
+ Containers to be released: %s
+ Cluster resources: %s
+ """.format(
+ _allocatedContainers.size,
+ numWorkersRunning.get(),
+ releasedContainerList,
+ pendingReleaseContainers,
+ amResp.getAvailableResources))
val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
- // ignore if not satisfying constraints {
+ // Ignore if not satisfying constraints {
for (container <- _allocatedContainers) {
if (isResourceConstraintSatisfied(container)) {
// allocatedContainers += container
@@ -111,8 +136,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
else releasedContainerList.add(container.getId())
}
- // Find the appropriate containers to use
- // Slightly non trivial groupBy I guess ...
+ // Find the appropriate containers to use. Slightly non trivial groupBy ...
val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
@@ -132,21 +156,22 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
remainingContainers = null
}
else if (requiredHostCount > 0) {
- // container list has more containers than we need for data locality.
- // Split into two : data local container count of (remainingContainers.size - requiredHostCount)
- // and rest as remainingContainer
- val (dataLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredHostCount)
+ // Container list has more containers than we need for data locality.
+ // Split into two : data local container count of (remainingContainers.size -
+ // requiredHostCount) and rest as remainingContainer
+ val (dataLocal, remaining) = remainingContainers.splitAt(
+ remainingContainers.size - requiredHostCount)
dataLocalContainers.put(candidateHost, dataLocal)
// remainingContainers = remaining
// yarn has nasty habit of allocating a tonne of containers on a host - discourage this :
- // add remaining to release list. If we have insufficient containers, next allocation cycle
- // will reallocate (but wont treat it as data local)
+ // add remaining to release list. If we have insufficient containers, next allocation
+ // cycle will reallocate (but wont treat it as data local)
for (container <- remaining) releasedContainerList.add(container.getId())
remainingContainers = null
}
- // now rack local
+ // Now rack local
if (remainingContainers != null){
val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
@@ -159,15 +184,17 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
if (requiredRackCount >= remainingContainers.size){
// Add all to dataLocalContainers
dataLocalContainers.put(rack, remainingContainers)
- // all consumed
+ // All consumed
remainingContainers = null
}
else if (requiredRackCount > 0) {
// container list has more containers than we need for data locality.
- // Split into two : data local container count of (remainingContainers.size - requiredRackCount)
- // and rest as remainingContainer
- val (rackLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredRackCount)
- val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack, new ArrayBuffer[Container]())
+ // Split into two : data local container count of (remainingContainers.size -
+ // requiredRackCount) and rest as remainingContainer
+ val (rackLocal, remaining) = remainingContainers.splitAt(
+ remainingContainers.size - requiredRackCount)
+ val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
+ new ArrayBuffer[Container]())
existingRackLocal ++= rackLocal
remainingContainers = remaining
@@ -183,8 +210,8 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
// Now that we have split the containers into various groups, go through them in order :
// first host local, then rack local and then off rack (everything else).
- // Note that the list we create below tries to ensure that not all containers end up within a host
- // if there are sufficiently large number of hosts/containers.
+ // Note that the list we create below tries to ensure that not all containers end up within a
+ // host if there are sufficiently large number of hosts/containers.
val allocatedContainers = new ArrayBuffer[Container](_allocatedContainers.size)
allocatedContainers ++= ClusterScheduler.prioritizeContainers(dataLocalContainers)
@@ -197,33 +224,39 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
val workerHostname = container.getNodeId.getHost
val containerId = container.getId
- assert (container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+ assert(
+ container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
if (numWorkersRunningNow > maxWorkers) {
- logInfo("Ignoring container " + containerId + " at host " + workerHostname +
- " .. we already have required number of containers")
+ logInfo("""Ignoring container %s at host %s, since we already have the required number of
+ containers for it.""".format(containerId, workerHostname))
releasedContainerList.add(containerId)
// reset counter back to old value.
numWorkersRunning.decrementAndGet()
}
else {
- // deallocate + allocate can result in reusing id's wrongly - so use a different counter (workerIdCounter)
+ // Deallocate + allocate can result in reusing id's wrongly - so use a different counter
+ // (workerIdCounter)
val workerId = workerIdCounter.incrementAndGet().toString
val driverUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
logInfo("launching container on " + containerId + " host " + workerHostname)
- // just to be safe, simply remove it from pendingReleaseContainers. Should not be there, but ..
+ // Just to be safe, simply remove it from pendingReleaseContainers.
+ // Should not be there, but ..
pendingReleaseContainers.remove(containerId)
val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
allocatedHostToContainersMap.synchronized {
- val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname, new HashSet[ContainerId]())
+ val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
+ new HashSet[ContainerId]())
containerSet += containerId
allocatedContainerToHostMap.put(containerId, workerHostname)
- if (rack != null) allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
+ if (rack != null) {
+ allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
+ }
}
new Thread(
@@ -232,17 +265,23 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
).start()
}
}
- logDebug("After allocated " + allocatedContainers.size + " containers (orig : " +
- _allocatedContainers.size + "), current count " + numWorkersRunning.get() +
- ", to-be-released " + releasedContainerList + ", pendingReleaseContainers : " + pendingReleaseContainers)
+ logDebug("""
+ Finished processing %d containers.
+ Current number of workers running: %d,
+ releasedContainerList: %s,
+ pendingReleaseContainers: %s
+ """.format(
+ allocatedContainers.size,
+ numWorkersRunning.get(),
+ releasedContainerList,
+ pendingReleaseContainers))
}
val completedContainers = amResp.getCompletedContainersStatuses()
if (completedContainers.size > 0){
- logDebug("Completed " + completedContainers.size + " containers, current count " + numWorkersRunning.get() +
- ", to-be-released " + releasedContainerList + ", pendingReleaseContainers : " + pendingReleaseContainers)
-
+ logDebug("Completed %d containers, to-be-released: %s".format(
+ completedContainers.size, releasedContainerList))
for (completedContainer <- completedContainers){
val containerId = completedContainer.getContainerId
@@ -251,10 +290,19 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
pendingReleaseContainers.remove(containerId)
}
else {
- // simply decrement count - next iteration of ReporterThread will take care of allocating !
+ // Simply decrement count - next iteration of ReporterThread will take care of allocating.
numWorkersRunning.decrementAndGet()
- logInfo("Container completed ? nodeId: " + containerId + ", state " + completedContainer.getState +
- " httpaddress: " + completedContainer.getDiagnostics)
+ logInfo("Completed container %s (state: %s, exit status: %s)".format(
+ containerId,
+ completedContainer.getState,
+ completedContainer.getExitStatus()))
+ // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+ // there are some exit status' we shouldn't necessarily count against us, but for
+ // now I think its ok as none of the containers are expected to exit
+ if (completedContainer.getExitStatus() != 0) {
+ logInfo("Container marked as failed: " + containerId)
+ numWorkersFailed.incrementAndGet()
+ }
}
allocatedHostToContainersMap.synchronized {
@@ -271,7 +319,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
allocatedContainerToHostMap -= containerId
- // doing this within locked context, sigh ... move to outside ?
+ // Doing this within locked context, sigh ... move to outside ?
val rack = YarnAllocationHandler.lookupRack(conf, host)
if (rack != null) {
val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
@@ -281,9 +329,16 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
}
}
}
- logDebug("After completed " + completedContainers.size + " containers, current count " +
- numWorkersRunning.get() + ", to-be-released " + releasedContainerList +
- ", pendingReleaseContainers : " + pendingReleaseContainers)
+ logDebug("""
+ Finished processing %d completed containers.
+ Current number of workers running: %d,
+ releasedContainerList: %s,
+ pendingReleaseContainers: %s
+ """.format(
+ completedContainers.size,
+ numWorkersRunning.get(),
+ releasedContainerList,
+ pendingReleaseContainers))
}
}
@@ -337,7 +392,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
// default.
if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
- logDebug("numWorkers: " + numWorkers + ", host preferences ? " + preferredHostToCount.isEmpty)
+ logDebug("numWorkers: " + numWorkers + ", host preferences: " + preferredHostToCount.isEmpty)
resourceRequests = List(
createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY))
}
@@ -350,17 +405,24 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
if (requiredCount > 0) {
- hostContainerRequests +=
- createResourceRequest(AllocationType.HOST, candidateHost, requiredCount, YarnAllocationHandler.PRIORITY)
+ hostContainerRequests += createResourceRequest(
+ AllocationType.HOST,
+ candidateHost,
+ requiredCount,
+ YarnAllocationHandler.PRIORITY)
}
}
- val rackContainerRequests: List[ResourceRequest] = createRackResourceRequests(hostContainerRequests.toList)
+ val rackContainerRequests: List[ResourceRequest] = createRackResourceRequests(
+ hostContainerRequests.toList)
- val anyContainerRequests: ResourceRequest =
- createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY)
+ val anyContainerRequests: ResourceRequest = createResourceRequest(
+ AllocationType.ANY,
+ resource = null,
+ numWorkers,
+ YarnAllocationHandler.PRIORITY)
- val containerRequests: ArrayBuffer[ResourceRequest] =
- new ArrayBuffer[ResourceRequest](hostContainerRequests.size() + rackContainerRequests.size() + 1)
+ val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
+ hostContainerRequests.size + rackContainerRequests.size + 1)
containerRequests ++= hostContainerRequests
containerRequests ++= rackContainerRequests
@@ -378,55 +440,60 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
val releasedContainerList = createReleasedContainerList()
req.addAllReleases(releasedContainerList)
-
-
if (numWorkers > 0) {
- logInfo("Allocating " + numWorkers + " worker containers with " + (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + " of memory each.")
+ logInfo("Allocating %d worker containers with %d of memory each.".format(numWorkers,
+ workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
}
else {
logDebug("Empty allocation req .. release : " + releasedContainerList)
}
- for (req <- resourceRequests) {
- logInfo("rsrcRequest ... host : " + req.getHostName + ", numContainers : " + req.getNumContainers +
- ", p = " + req.getPriority().getPriority + ", capability: " + req.getCapability)
+ for (request <- resourceRequests) {
+ logInfo("ResourceRequest (host : %s, num containers: %d, priority = %s , capability : %s)".
+ format(
+ request.getHostName,
+ request.getNumContainers,
+ request.getPriority,
+ request.getCapability))
}
resourceManager.allocate(req)
}
- private def createResourceRequest(requestType: AllocationType.AllocationType,
- resource:String, numWorkers: Int, priority: Int): ResourceRequest = {
+ private def createResourceRequest(
+ requestType: AllocationType.AllocationType,
+ resource:String,
+ numWorkers: Int,
+ priority: Int): ResourceRequest = {
// If hostname specified, we need atleast two requests - node local and rack local.
// There must be a third request - which is ANY : that will be specially handled.
requestType match {
case AllocationType.HOST => {
- assert (YarnAllocationHandler.ANY_HOST != resource)
-
+ assert(YarnAllocationHandler.ANY_HOST != resource)
val hostname = resource
val nodeLocal = createResourceRequestImpl(hostname, numWorkers, priority)
- // add to host->rack mapping
+ // Add to host->rack mapping
YarnAllocationHandler.populateRackInfo(conf, hostname)
nodeLocal
}
-
case AllocationType.RACK => {
val rack = resource
createResourceRequestImpl(rack, numWorkers, priority)
}
-
- case AllocationType.ANY => {
- createResourceRequestImpl(YarnAllocationHandler.ANY_HOST, numWorkers, priority)
- }
-
- case _ => throw new IllegalArgumentException("Unexpected/unsupported request type .. " + requestType)
+ case AllocationType.ANY => createResourceRequestImpl(
+ YarnAllocationHandler.ANY_HOST, numWorkers, priority)
+ case _ => throw new IllegalArgumentException(
+ "Unexpected/unsupported request type: " + requestType)
}
}
- private def createResourceRequestImpl(hostname:String, numWorkers: Int, priority: Int): ResourceRequest = {
+ private def createResourceRequestImpl(
+ hostname:String,
+ numWorkers: Int,
+ priority: Int): ResourceRequest = {
val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
val memCapability = Records.newRecord(classOf[Resource])
@@ -447,11 +514,11 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
val retval = new ArrayBuffer[ContainerId](1)
- // iterator on COW list ...
+ // Iterator on COW list ...
for (container <- releasedContainerList.iterator()){
retval += container
}
- // remove from the original list.
+ // Remove from the original list.
if (! retval.isEmpty) {
releasedContainerList.removeAll(retval)
for (v <- retval) pendingReleaseContainers.put(v, true)
@@ -466,14 +533,14 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
object YarnAllocationHandler {
val ANY_HOST = "*"
- // all requests are issued with same priority : we do not (yet) have any distinction between
+ // All requests are issued with same priority : we do not (yet) have any distinction between
// request types (like map/reduce in hadoop for example)
val PRIORITY = 1
// Additional memory overhead - in mb
val MEMORY_OVERHEAD = 384
- // host to rack map - saved from allocation requests
+ // Host to rack map - saved from allocation requests
// We are expecting this not to change.
// Note that it is possible for this to change : and RM will indicate that to us via update
// response to allocate. But we are punting on handling that for now.
@@ -481,38 +548,69 @@ object YarnAllocationHandler {
private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
- def newAllocator(conf: Configuration,
- resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments): YarnAllocationHandler = {
-
- new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers,
- args.workerMemory, args.workerCores, Map[String, Int](), Map[String, Int]())
+ def newAllocator(
+ conf: Configuration,
+ resourceManager: AMRMProtocol,
+ appAttemptId: ApplicationAttemptId,
+ args: ApplicationMasterArguments): YarnAllocationHandler = {
+
+ new YarnAllocationHandler(
+ conf,
+ resourceManager,
+ appAttemptId,
+ args.numWorkers,
+ args.workerMemory,
+ args.workerCores,
+ Map[String, Int](),
+ Map[String, Int]())
}
- def newAllocator(conf: Configuration,
- resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
- map: collection.Map[String, collection.Set[SplitInfo]]): YarnAllocationHandler = {
+ def newAllocator(
+ conf: Configuration,
+ resourceManager: AMRMProtocol,
+ appAttemptId: ApplicationAttemptId,
+ args: ApplicationMasterArguments,
+ map: collection.Map[String,
+ collection.Set[SplitInfo]]): YarnAllocationHandler = {
val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
-
- new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers,
- args.workerMemory, args.workerCores, hostToCount, rackToCount)
+ new YarnAllocationHandler(
+ conf,
+ resourceManager,
+ appAttemptId,
+ args.numWorkers,
+ args.workerMemory,
+ args.workerCores,
+ hostToCount,
+ rackToCount)
}
- def newAllocator(conf: Configuration,
- resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
- maxWorkers: Int, workerMemory: Int, workerCores: Int,
- map: collection.Map[String, collection.Set[SplitInfo]]): YarnAllocationHandler = {
+ def newAllocator(
+ conf: Configuration,
+ resourceManager: AMRMProtocol,
+ appAttemptId: ApplicationAttemptId,
+ maxWorkers: Int,
+ workerMemory: Int,
+ workerCores: Int,
+ map: collection.Map[String, collection.Set[SplitInfo]]): YarnAllocationHandler = {
val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
- new YarnAllocationHandler(conf, resourceManager, appAttemptId, maxWorkers,
- workerMemory, workerCores, hostToCount, rackToCount)
+ new YarnAllocationHandler(
+ conf,
+ resourceManager,
+ appAttemptId,
+ maxWorkers,
+ workerMemory,
+ workerCores,
+ hostToCount,
+ rackToCount)
}
// A simple method to copy the split info map.
- private def generateNodeToWeight(conf: Configuration, input: collection.Map[String, collection.Set[SplitInfo]]) :
+ private def generateNodeToWeight(
+ conf: Configuration,
+ input: collection.Map[String, collection.Set[SplitInfo]]) :
// host to count, rack to count
(Map[String, Int], Map[String, Int]) = {
@@ -536,7 +634,7 @@ object YarnAllocationHandler {
}
def lookupRack(conf: Configuration, host: String): String = {
- if (! hostToRack.contains(host)) populateRackInfo(conf, host)
+ if (!hostToRack.contains(host)) populateRackInfo(conf, host)
hostToRack.get(host)
}
@@ -559,10 +657,12 @@ object YarnAllocationHandler {
val rack = rackInfo.getNetworkLocation
hostToRack.put(hostname, rack)
if (! rackToHostSet.containsKey(rack)) {
- rackToHostSet.putIfAbsent(rack, Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
+ rackToHostSet.putIfAbsent(rack,
+ Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
}
rackToHostSet.get(rack).add(hostname)
+ // TODO(harvey): Figure out this comment...
// Since RackResolver caches, we are disabling this for now ...
} /* else {
// right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index ca2f1e2565..2ba2366ead 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -18,13 +18,10 @@
package org.apache.spark.deploy.yarn
import org.apache.spark.deploy.SparkHadoopUtil
-import collection.mutable.HashMap
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import java.security.PrivilegedExceptionAction
/**
* Contains util methods to interact with Hadoop from spark.
@@ -40,7 +37,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
// add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
override def addCredentials(conf: JobConf) {
- val jobCreds = conf.getCredentials();
+ val jobCreds = conf.getCredentials()
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
new file mode 100644
index 0000000000..63a0449e5a
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark._
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.deploy.yarn.YarnAllocationHandler
+import org.apache.spark.util.Utils
+
+/**
+ *
+ * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM.
+ */
+private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) {
+
+ def this(sc: SparkContext) = this(sc, new Configuration())
+
+ // By default, rack is unknown
+ override def getRackForHost(hostPort: String): Option[String] = {
+ val host = Utils.parseHostPort(hostPort)._1
+ val retval = YarnAllocationHandler.lookupRack(conf, host)
+ if (retval != null) Some(retval) else None
+ }
+
+ override def postStartHook() {
+
+ // The yarn application is running, but the worker might not yet ready
+ // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
+ Thread.sleep(2000L)
+ logInfo("YarnClientClusterScheduler.postStartHook done")
+ }
+}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
new file mode 100644
index 0000000000..b206780c78
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
+import org.apache.spark.{SparkException, Logging, SparkContext}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments}
+
+private[spark] class YarnClientSchedulerBackend(
+ scheduler: ClusterScheduler,
+ sc: SparkContext)
+ extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+ with Logging {
+
+ var client: Client = null
+ var appId: ApplicationId = null
+
+ override def start() {
+ super.start()
+
+ val defalutWorkerCores = "2"
+ val defalutWorkerMemory = "512m"
+ val defaultWorkerNumber = "1"
+
+ val userJar = System.getenv("SPARK_YARN_APP_JAR")
+ var workerCores = System.getenv("SPARK_WORKER_CORES")
+ var workerMemory = System.getenv("SPARK_WORKER_MEMORY")
+ var workerNumber = System.getenv("SPARK_WORKER_INSTANCES")
+
+ if (userJar == null)
+ throw new SparkException("env SPARK_YARN_APP_JAR is not set")
+
+ if (workerCores == null)
+ workerCores = defalutWorkerCores
+ if (workerMemory == null)
+ workerMemory = defalutWorkerMemory
+ if (workerNumber == null)
+ workerNumber = defaultWorkerNumber
+
+ val driverHost = System.getProperty("spark.driver.host")
+ val driverPort = System.getProperty("spark.driver.port")
+ val hostport = driverHost + ":" + driverPort
+
+ val argsArray = Array[String](
+ "--class", "notused",
+ "--jar", userJar,
+ "--args", hostport,
+ "--worker-memory", workerMemory,
+ "--worker-cores", workerCores,
+ "--num-workers", workerNumber,
+ "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
+ )
+
+ val args = new ClientArguments(argsArray)
+ client = new Client(args)
+ appId = client.runApp()
+ waitForApp()
+ }
+
+ def waitForApp() {
+
+ // TODO : need a better way to find out whether the workers are ready or not
+ // maybe by resource usage report?
+ while(true) {
+ val report = client.getApplicationReport(appId)
+
+ logInfo("Application report from ASM: \n" +
+ "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
+ "\t appStartTime: " + report.getStartTime() + "\n" +
+ "\t yarnAppState: " + report.getYarnApplicationState() + "\n"
+ )
+
+ // Ready to go, or already gone.
+ val state = report.getYarnApplicationState()
+ if (state == YarnApplicationState.RUNNING) {
+ return
+ } else if (state == YarnApplicationState.FINISHED ||
+ state == YarnApplicationState.FAILED ||
+ state == YarnApplicationState.KILLED) {
+ throw new SparkException("Yarn application already ended," +
+ "might be killed or not able to launch application master.")
+ }
+
+ Thread.sleep(1000)
+ }
+ }
+
+ override def stop() {
+ super.stop()
+ client.stop()
+ logInfo("Stoped")
+ }
+
+}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
index c0a2af0c6f..2941356bc5 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.yarn
-import java.net.URI;
+import java.net.URI
import org.scalatest.FunSuite
import org.scalatest.mock.MockitoSugar