aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-03-13 12:11:33 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-13 12:11:33 -0700
commit698373211ef3cdf841c82d48168cd5dbe00a57b4 (patch)
treea07edbe4835a7b01aa48cf9bd35c0d6939d21d78 /yarn
parente4e8d8f395aea48f0cae00d7c381a863c48a2837 (diff)
downloadspark-698373211ef3cdf841c82d48168cd5dbe00a57b4.tar.gz
spark-698373211ef3cdf841c82d48168cd5dbe00a57b4.tar.bz2
spark-698373211ef3cdf841c82d48168cd5dbe00a57b4.zip
SPARK-1183. Don't use "worker" to mean executor
Author: Sandy Ryza <sandy@cloudera.com> Closes #120 from sryza/sandy-spark-1183 and squashes the following commits: 5066a4a [Sandy Ryza] Remove "worker" in a couple comments 0bd1e46 [Sandy Ryza] Remove --am-class from usage bfc8fe0 [Sandy Ryza] Remove am-class from doc and fix yarn-alpha 607539f [Sandy Ryza] Address review comments 74d087a [Sandy Ryza] SPARK-1183. Don't use "worker" to mean executor
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala38
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala (renamed from yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala)28
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala (renamed from yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala)14
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala124
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala27
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala46
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala18
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala4
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala (renamed from yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala)14
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala4
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala26
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala38
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala (renamed from yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala)26
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala (renamed from yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala)14
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala138
15 files changed, 289 insertions, 270 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 87785cdc60..910484ed54 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -61,9 +61,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true
- // Default to numWorkers * 2, with minimum of 3
- private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
- math.max(args.numWorkers * 2, 3))
+ // Default to numExecutors * 2, with minimum of 3
+ private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
+ sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
private var registered = false
@@ -96,7 +96,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// Call this to force generation of secret so it gets populated into the
// hadoop UGI. This has to happen before the startUserClass which does a
- // doAs in order for the credentials to be passed on to the worker containers.
+ // doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
// Start the user's JAR
@@ -115,7 +115,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
// Allocate all containers
- allocateWorkers()
+ allocateExecutors()
// Wait for the user class to Finish
userThread.join()
@@ -215,7 +215,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
t
}
- // this need to happen before allocateWorkers
+ // this need to happen before allocateExecutors
private def waitForSparkContextInitialized() {
logInfo("Waiting for spark context initialization")
try {
@@ -260,21 +260,21 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
}
- private def allocateWorkers() {
+ private def allocateExecutors() {
try {
- logInfo("Allocating " + args.numWorkers + " workers.")
+ logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
// Exists the loop if the user thread exits.
- while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
- if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of worker failures reached")
+ "max number of executor failures reached")
}
yarnAllocator.allocateContainers(
- math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+ math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
}
@@ -283,7 +283,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
- logInfo("All workers have launched.")
+ logInfo("All executors have launched.")
// Launch a progress reporter thread, else the app will get killed after expiration
// (def: 10mins) timeout.
@@ -309,15 +309,15 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
val t = new Thread {
override def run() {
while (userThread.isAlive) {
- if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of worker failures reached")
+ "max number of executor failures reached")
}
- val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
- if (missingWorkerCount > 0) {
+ val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
+ if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingWorkerCount))
- yarnAllocator.allocateContainers(missingWorkerCount)
+ format(missingExecutorCount))
+ yarnAllocator.allocateContainers(missingExecutorCount)
}
else sendProgress()
Thread.sleep(sleepTime)
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index b735d01df8..7b0e020263 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -34,7 +34,7 @@ import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
+class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
extends Logging {
def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
@@ -89,7 +89,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
if (minimumMemory > 0) {
- val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
if (numCore > 0) {
@@ -102,7 +102,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
waitForSparkMaster()
// Allocate all containers
- allocateWorkers()
+ allocateExecutors()
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
@@ -199,7 +199,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
}
- private def allocateWorkers() {
+ private def allocateExecutors() {
// Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
@@ -208,16 +208,16 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
args, preferredNodeLocationData, sparkConf)
- logInfo("Allocating " + args.numWorkers + " workers.")
+ logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
- while ((yarnAllocator.getNumWorkersRunning < args.numWorkers) && (!driverClosed)) {
- yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+ while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
+ yarnAllocator.allocateContainers(math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
Thread.sleep(100)
}
- logInfo("All workers have launched.")
+ logInfo("All executors have launched.")
}
@@ -228,10 +228,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
val t = new Thread {
override def run() {
while (!driverClosed) {
- val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
- if (missingWorkerCount > 0) {
- logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
- yarnAllocator.allocateContainers(missingWorkerCount)
+ val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
+ if (missingExecutorCount > 0) {
+ logInfo("Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers")
+ yarnAllocator.allocateContainers(missingExecutorCount)
}
else sendProgress()
Thread.sleep(sleepTime)
@@ -264,9 +264,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
}
-object WorkerLauncher {
+object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
- new WorkerLauncher(args).run()
+ new ExecutorLauncher(args).run()
}
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 8c686e393f..981e8b05f6 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -38,16 +38,16 @@ import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
import org.apache.spark.{SparkConf, Logging}
-class WorkerRunnable(
+class ExecutorRunnable(
container: Container,
conf: Configuration,
spConf: SparkConf,
masterAddress: String,
slaveId: String,
hostname: String,
- workerMemory: Int,
- workerCores: Int)
- extends Runnable with WorkerRunnableUtil with Logging {
+ executorMemory: Int,
+ executorCores: Int)
+ extends Runnable with ExecutorRunnableUtil with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
var cm: ContainerManager = _
@@ -55,7 +55,7 @@ class WorkerRunnable(
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
def run = {
- logInfo("Starting Worker Container")
+ logInfo("Starting Executor Container")
cm = connectToCM
startContainer
}
@@ -81,8 +81,8 @@ class WorkerRunnable(
credentials.writeTokenStorageToStream(dob)
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
- val commands = prepareCommand(masterAddress, slaveId, hostname, workerMemory, workerCores)
- logInfo("Setting up worker with commands: " + commands)
+ val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores)
+ logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)
// Send the start request to the ContainerManager
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index e91257be8e..2056667af5 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -58,9 +58,9 @@ private[yarn] class YarnAllocationHandler(
val conf: Configuration,
val resourceManager: AMRMProtocol,
val appAttemptId: ApplicationAttemptId,
- val maxWorkers: Int,
- val workerMemory: Int,
- val workerCores: Int,
+ val maxExecutors: Int,
+ val executorMemory: Int,
+ val executorCores: Int,
val preferredHostToCount: Map[String, Int],
val preferredRackToCount: Map[String, Int],
val sparkConf: SparkConf)
@@ -84,39 +84,39 @@ private[yarn] class YarnAllocationHandler(
// Containers to be released in next request to RM
private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
- private val numWorkersRunning = new AtomicInteger()
- // Used to generate a unique id per worker
- private val workerIdCounter = new AtomicInteger()
+ private val numExecutorsRunning = new AtomicInteger()
+ // Used to generate a unique id per executor
+ private val executorIdCounter = new AtomicInteger()
private val lastResponseId = new AtomicInteger()
- private val numWorkersFailed = new AtomicInteger()
+ private val numExecutorsFailed = new AtomicInteger()
- def getNumWorkersRunning: Int = numWorkersRunning.intValue
+ def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
- def getNumWorkersFailed: Int = numWorkersFailed.intValue
+ def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
}
- def allocateContainers(workersToRequest: Int) {
+ def allocateContainers(executorsToRequest: Int) {
// We need to send the request only once from what I understand ... but for now, not modifying
// this much.
// Keep polling the Resource Manager for containers
- val amResp = allocateWorkerResources(workersToRequest).getAMResponse
+ val amResp = allocateExecutorResources(executorsToRequest).getAMResponse
val _allocatedContainers = amResp.getAllocatedContainers()
if (_allocatedContainers.size > 0) {
logDebug("""
Allocated containers: %d
- Current worker count: %d
+ Current executor count: %d
Containers released: %s
Containers to be released: %s
Cluster resources: %s
""".format(
_allocatedContainers.size,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers,
amResp.getAvailableResources))
@@ -221,59 +221,59 @@ private[yarn] class YarnAllocationHandler(
// Run each of the allocated containers
for (container <- allocatedContainers) {
- val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
- val workerHostname = container.getNodeId.getHost
+ val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
+ val executorHostname = container.getNodeId.getHost
val containerId = container.getId
assert(
- container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+ container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
- if (numWorkersRunningNow > maxWorkers) {
+ if (numExecutorsRunningNow > maxExecutors) {
logInfo("""Ignoring container %s at host %s, since we already have the required number of
- containers for it.""".format(containerId, workerHostname))
+ containers for it.""".format(containerId, executorHostname))
releasedContainerList.add(containerId)
// reset counter back to old value.
- numWorkersRunning.decrementAndGet()
+ numExecutorsRunning.decrementAndGet()
}
else {
// Deallocate + allocate can result in reusing id's wrongly - so use a different counter
- // (workerIdCounter)
- val workerId = workerIdCounter.incrementAndGet().toString
+ // (executorIdCounter)
+ val executorId = executorIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
- logInfo("launching container on " + containerId + " host " + workerHostname)
+ logInfo("launching container on " + containerId + " host " + executorHostname)
// Just to be safe, simply remove it from pendingReleaseContainers.
// Should not be there, but ..
pendingReleaseContainers.remove(containerId)
- val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
+ val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
allocatedHostToContainersMap.synchronized {
- val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
+ val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId]())
containerSet += containerId
- allocatedContainerToHostMap.put(containerId, workerHostname)
+ allocatedContainerToHostMap.put(containerId, executorHostname)
if (rack != null) {
allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
}
}
new Thread(
- new WorkerRunnable(container, conf, sparkConf, driverUrl, workerId,
- workerHostname, workerMemory, workerCores)
+ new ExecutorRunnable(container, conf, sparkConf, driverUrl, executorId,
+ executorHostname, executorMemory, executorCores)
).start()
}
}
logDebug("""
Finished processing %d containers.
- Current number of workers running: %d,
+ Current number of executors running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
allocatedContainers.size,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}
@@ -292,7 +292,7 @@ private[yarn] class YarnAllocationHandler(
}
else {
// Simply decrement count - next iteration of ReporterThread will take care of allocating.
- numWorkersRunning.decrementAndGet()
+ numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s (state: %s, exit status: %s)".format(
containerId,
completedContainer.getState,
@@ -302,7 +302,7 @@ private[yarn] class YarnAllocationHandler(
// now I think its ok as none of the containers are expected to exit
if (completedContainer.getExitStatus() != 0) {
logInfo("Container marked as failed: " + containerId)
- numWorkersFailed.incrementAndGet()
+ numExecutorsFailed.incrementAndGet()
}
}
@@ -332,12 +332,12 @@ private[yarn] class YarnAllocationHandler(
}
logDebug("""
Finished processing %d completed containers.
- Current number of workers running: %d,
+ Current number of executors running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
completedContainers.size,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}
@@ -387,18 +387,18 @@ private[yarn] class YarnAllocationHandler(
retval
}
- private def allocateWorkerResources(numWorkers: Int): AllocateResponse = {
+ private def allocateExecutorResources(numExecutors: Int): AllocateResponse = {
var resourceRequests: List[ResourceRequest] = null
// default.
- if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
- logDebug("numWorkers: " + numWorkers + ", host preferences: " + preferredHostToCount.isEmpty)
+ if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
+ logDebug("numExecutors: " + numExecutors + ", host preferences: " + preferredHostToCount.isEmpty)
resourceRequests = List(
- createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY))
+ createResourceRequest(AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
}
else {
- // request for all hosts in preferred nodes and for numWorkers -
+ // request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
val hostContainerRequests: ArrayBuffer[ResourceRequest] =
new ArrayBuffer[ResourceRequest](preferredHostToCount.size)
@@ -419,7 +419,7 @@ private[yarn] class YarnAllocationHandler(
val anyContainerRequests: ResourceRequest = createResourceRequest(
AllocationType.ANY,
resource = null,
- numWorkers,
+ numExecutors,
YarnAllocationHandler.PRIORITY)
val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
@@ -441,9 +441,9 @@ private[yarn] class YarnAllocationHandler(
val releasedContainerList = createReleasedContainerList()
req.addAllReleases(releasedContainerList)
- if (numWorkers > 0) {
- logInfo("Allocating %d worker containers with %d of memory each.".format(numWorkers,
- workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+ if (numExecutors > 0) {
+ logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors,
+ executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
}
else {
logDebug("Empty allocation req .. release : " + releasedContainerList)
@@ -464,7 +464,7 @@ private[yarn] class YarnAllocationHandler(
private def createResourceRequest(
requestType: AllocationType.AllocationType,
resource:String,
- numWorkers: Int,
+ numExecutors: Int,
priority: Int): ResourceRequest = {
// If hostname specified, we need atleast two requests - node local and rack local.
@@ -473,7 +473,7 @@ private[yarn] class YarnAllocationHandler(
case AllocationType.HOST => {
assert(YarnAllocationHandler.ANY_HOST != resource)
val hostname = resource
- val nodeLocal = createResourceRequestImpl(hostname, numWorkers, priority)
+ val nodeLocal = createResourceRequestImpl(hostname, numExecutors, priority)
// Add to host->rack mapping
YarnAllocationHandler.populateRackInfo(conf, hostname)
@@ -482,10 +482,10 @@ private[yarn] class YarnAllocationHandler(
}
case AllocationType.RACK => {
val rack = resource
- createResourceRequestImpl(rack, numWorkers, priority)
+ createResourceRequestImpl(rack, numExecutors, priority)
}
case AllocationType.ANY => createResourceRequestImpl(
- YarnAllocationHandler.ANY_HOST, numWorkers, priority)
+ YarnAllocationHandler.ANY_HOST, numExecutors, priority)
case _ => throw new IllegalArgumentException(
"Unexpected/unsupported request type: " + requestType)
}
@@ -493,13 +493,13 @@ private[yarn] class YarnAllocationHandler(
private def createResourceRequestImpl(
hostname:String,
- numWorkers: Int,
+ numExecutors: Int,
priority: Int): ResourceRequest = {
val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
val memCapability = Records.newRecord(classOf[Resource])
// There probably is some overhead here, let's reserve a bit more memory.
- memCapability.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ memCapability.setMemory(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
rsrcRequest.setCapability(memCapability)
val pri = Records.newRecord(classOf[Priority])
@@ -508,7 +508,7 @@ private[yarn] class YarnAllocationHandler(
rsrcRequest.setHostName(hostname)
- rsrcRequest.setNumContainers(java.lang.Math.max(numWorkers, 0))
+ rsrcRequest.setNumContainers(java.lang.Math.max(numExecutors, 0))
rsrcRequest
}
@@ -560,9 +560,9 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
- args.numWorkers,
- args.workerMemory,
- args.workerCores,
+ args.numExecutors,
+ args.executorMemory,
+ args.executorCores,
Map[String, Int](),
Map[String, Int](),
sparkConf)
@@ -582,9 +582,9 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
- args.numWorkers,
- args.workerMemory,
- args.workerCores,
+ args.numExecutors,
+ args.executorMemory,
+ args.executorCores,
hostToCount,
rackToCount,
sparkConf)
@@ -594,9 +594,9 @@ object YarnAllocationHandler {
conf: Configuration,
resourceManager: AMRMProtocol,
appAttemptId: ApplicationAttemptId,
- maxWorkers: Int,
- workerMemory: Int,
- workerCores: Int,
+ maxExecutors: Int,
+ executorMemory: Int,
+ executorCores: Int,
map: collection.Map[String, collection.Set[SplitInfo]],
sparkConf: SparkConf): YarnAllocationHandler = {
@@ -606,9 +606,9 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
- maxWorkers,
- workerMemory,
- workerCores,
+ maxExecutors,
+ executorMemory,
+ executorCores,
hostToCount,
rackToCount,
sparkConf)
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index f76a5ddd39..25cc9016b1 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -24,9 +24,9 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userJar: String = null
var userClass: String = null
var userArgs: Seq[String] = Seq[String]()
- var workerMemory = 1024
- var workerCores = 1
- var numWorkers = 2
+ var executorMemory = 1024
+ var executorCores = 1
+ var numExecutors = 2
parseArgs(args.toList)
@@ -36,7 +36,8 @@ class ApplicationMasterArguments(val args: Array[String]) {
var args = inputArgs
while (! args.isEmpty) {
-
+ // --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0,
+ // the properties with executor in their names are preferred.
args match {
case ("--jar") :: value :: tail =>
userJar = value
@@ -50,16 +51,16 @@ class ApplicationMasterArguments(val args: Array[String]) {
userArgsBuffer += value
args = tail
- case ("--num-workers") :: IntParam(value) :: tail =>
- numWorkers = value
+ case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
+ numExecutors = value
args = tail
- case ("--worker-memory") :: IntParam(value) :: tail =>
- workerMemory = value
+ case ("--worker-memory" | "--executor-memory") :: IntParam(value) :: tail =>
+ executorMemory = value
args = tail
- case ("--worker-cores") :: IntParam(value) :: tail =>
- workerCores = value
+ case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
+ executorCores = value
args = tail
case Nil =>
@@ -86,9 +87,9 @@ class ApplicationMasterArguments(val args: Array[String]) {
" --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" +
- " --num-workers NUM Number of workers to start (Default: 2)\n" +
- " --worker-cores NUM Number of cores for the workers (Default: 1)\n" +
- " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n")
+ " --num-executors NUM Number of executors to start (Default: 2)\n" +
+ " --executor-cores NUM Number of cores for the executors (Default: 1)\n" +
+ " --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n")
System.exit(exitCode)
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 1f894a677d..a001060cdb 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -33,9 +33,9 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
var userJar: String = null
var userClass: String = null
var userArgs: Seq[String] = Seq[String]()
- var workerMemory = 1024 // MB
- var workerCores = 1
- var numWorkers = 2
+ var executorMemory = 1024 // MB
+ var executorCores = 1
+ var numExecutors = 2
var amQueue = sparkConf.get("QUEUE", "default")
var amMemory: Int = 512 // MB
var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
@@ -67,24 +67,39 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
userArgsBuffer += value
args = tail
- case ("--master-class") :: value :: tail =>
+ case ("--master-class" | "--am-class") :: value :: tail =>
+ if (args(0) == "--master-class") {
+ println("--master-class is deprecated. Use --am-class instead.")
+ }
amClass = value
args = tail
- case ("--master-memory") :: MemoryParam(value) :: tail =>
+ case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail =>
+ if (args(0) == "--master-memory") {
+ println("--master-memory is deprecated. Use --driver-memory instead.")
+ }
amMemory = value
args = tail
- case ("--num-workers") :: IntParam(value) :: tail =>
- numWorkers = value
+ case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
+ if (args(0) == "--num-workers") {
+ println("--num-workers is deprecated. Use --num-executors instead.")
+ }
+ numExecutors = value
args = tail
- case ("--worker-memory") :: MemoryParam(value) :: tail =>
- workerMemory = value
+ case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
+ if (args(0) == "--worker-memory") {
+ println("--worker-memory is deprecated. Use --executor-memory instead.")
+ }
+ executorMemory = value
args = tail
- case ("--worker-cores") :: IntParam(value) :: tail =>
- workerCores = value
+ case ("--worker-cores" | "--executor-memory") :: IntParam(value) :: tail =>
+ if (args(0) == "--worker-cores") {
+ println("--worker-cores is deprecated. Use --executor-cores instead.")
+ }
+ executorCores = value
args = tail
case ("--queue") :: value :: tail =>
@@ -133,11 +148,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
" --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" +
- " --num-workers NUM Number of workers to start (Default: 2)\n" +
- " --worker-cores NUM Number of cores for the workers (Default: 1).\n" +
- " --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" +
- " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
- " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
+ " --num-executors NUM Number of executors to start (Default: 2)\n" +
+ " --executor-cores NUM Number of cores for the executors (Default: 1).\n" +
+ " --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
+ " --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n" +
" --name NAME The name of your application (Default: Spark)\n" +
" --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
" --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 74c5e0f18e..57e5761cba 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -73,10 +73,10 @@ trait ClientBase extends Logging {
((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) ->
"Error: You must specify a user jar when running in standalone mode!"),
(args.userClass == null) -> "Error: You must specify a user class!",
- (args.numWorkers <= 0) -> "Error: You must specify at least 1 worker!",
+ (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
(args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" +
"greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
- (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size" +
+ (args.executorMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Executor memory size" +
"must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString)
).foreach { case(cond, errStr) =>
if (cond) {
@@ -95,9 +95,9 @@ trait ClientBase extends Logging {
logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
// If we have requested more then the clusters max for a single resource then exit.
- if (args.workerMemory > maxMem) {
- logError("Required worker memory (%d MB), is above the max threshold (%d MB) of this cluster.".
- format(args.workerMemory, maxMem))
+ if (args.executorMemory > maxMem) {
+ logError("Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster.".
+ format(args.executorMemory, maxMem))
System.exit(1)
}
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
@@ -276,7 +276,7 @@ trait ClientBase extends Logging {
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
- // Set the environment variables to be passed on to the Workers.
+ // Set the environment variables to be passed on to the executors.
distCacheMgr.setDistFilesEnv(env)
distCacheMgr.setDistArchivesEnv(env)
@@ -360,9 +360,9 @@ trait ClientBase extends Logging {
" --class " + args.userClass +
" --jar " + args.userJar +
userArgsToString(args) +
- " --worker-memory " + args.workerMemory +
- " --worker-cores " + args.workerCores +
- " --num-workers " + args.numWorkers +
+ " --executor-memory " + args.executorMemory +
+ " --executor-cores " + args.executorCores +
+ " --num-executors " + args.numExecutors +
" 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 535abbfb7f..68cda0f1c9 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -46,10 +46,10 @@ class ClientDistributedCacheManager() extends Logging {
/**
* Add a resource to the list of distributed cache resources. This list can
- * be sent to the ApplicationMaster and possibly the workers so that it can
+ * be sent to the ApplicationMaster and possibly the executors so that it can
* be downloaded into the Hadoop distributed cache for use by this application.
* Adds the LocalResource to the localResources HashMap passed in and saves
- * the stats of the resources to they can be sent to the workers and verified.
+ * the stats of the resources to they can be sent to the executors and verified.
*
* @param fs FileSystem
* @param conf Configuration
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index bfa8f84bf7..da0a6f74ef 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -39,7 +39,7 @@ import org.apache.spark.{SparkConf, Logging}
import org.apache.hadoop.yarn.conf.YarnConfiguration
-trait WorkerRunnableUtil extends Logging {
+trait ExecutorRunnableUtil extends Logging {
val yarnConf: YarnConfiguration
val sparkConf: SparkConf
@@ -49,13 +49,13 @@ trait WorkerRunnableUtil extends Logging {
masterAddress: String,
slaveId: String,
hostname: String,
- workerMemory: Int,
- workerCores: Int) = {
+ executorMemory: Int,
+ executorCores: Int) = {
// Extra options for the JVM
var JAVA_OPTS = ""
// Set the JVM memory
- val workerMemoryString = workerMemory + "m"
- JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " "
+ val executorMemoryString = executorMemory + "m"
+ JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
}
@@ -97,7 +97,7 @@ trait WorkerRunnableUtil extends Logging {
val commands = List[String](javaCommand +
" -server " +
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
- // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in
+ // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
// an inconsistent state.
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do
// 'something' to fail job ... akin to blacklisting trackers in mapred ?
@@ -107,7 +107,7 @@ trait WorkerRunnableUtil extends Logging {
masterAddress + " " +
slaveId + " " +
hostname + " " +
- workerCores +
+ executorCores +
" 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 522e0a9ad7..6b91e6b9eb 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -25,7 +25,7 @@ import org.apache.spark.util.Utils
/**
*
- * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM.
+ * This scheduler launches executors through Yarn - by calling into Client to launch ExecutorLauncher as AM.
*/
private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
@@ -40,7 +40,7 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur
override def postStartHook() {
- // The yarn application is running, but the worker might not yet ready
+ // The yarn application is running, but the executor might not yet ready
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
Thread.sleep(2000L)
logInfo("YarnClientClusterScheduler.postStartHook done")
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index e7130d2407..d1f13e3c36 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -53,20 +53,24 @@ private[spark] class YarnClientSchedulerBackend(
"--class", "notused",
"--jar", null,
"--args", hostport,
- "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
+ "--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher"
)
// process any optional arguments, use the defaults already defined in ClientArguments
// if things aren't specified
- Map("--master-memory" -> "SPARK_MASTER_MEMORY",
- "--num-workers" -> "SPARK_WORKER_INSTANCES",
- "--worker-memory" -> "SPARK_WORKER_MEMORY",
- "--worker-cores" -> "SPARK_WORKER_CORES",
- "--queue" -> "SPARK_YARN_QUEUE",
- "--name" -> "SPARK_YARN_APP_NAME",
- "--files" -> "SPARK_YARN_DIST_FILES",
- "--archives" -> "SPARK_YARN_DIST_ARCHIVES")
- .foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) }
+ Map("SPARK_MASTER_MEMORY" -> "--driver-memory",
+ "SPARK_DRIVER_MEMORY" -> "--driver-memory",
+ "SPARK_WORKER_INSTANCES" -> "--num-executors",
+ "SPARK_WORKER_MEMORY" -> "--executor-memory",
+ "SPARK_WORKER_CORES" -> "--executor-cores",
+ "SPARK_EXECUTOR_INSTANCES" -> "--num-executors",
+ "SPARK_EXECUTOR_MEMORY" -> "--executor-memory",
+ "SPARK_EXECUTOR_CORES" -> "--executor-cores",
+ "SPARK_YARN_QUEUE" -> "--queue",
+ "SPARK_YARN_APP_NAME" -> "--name",
+ "SPARK_YARN_DIST_FILES" -> "--files",
+ "SPARK_YARN_DIST_ARCHIVES" -> "--archives")
+ .foreach { case (optParam, optName) => addArg(optName, optParam, argsArrayBuf) }
logDebug("ClientArguments called with: " + argsArrayBuf)
val args = new ClientArguments(argsArrayBuf.toArray, conf)
@@ -77,7 +81,7 @@ private[spark] class YarnClientSchedulerBackend(
def waitForApp() {
- // TODO : need a better way to find out whether the workers are ready or not
+ // TODO : need a better way to find out whether the executors are ready or not
// maybe by resource usage report?
while(true) {
val report = client.getApplicationReport(appId)
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 57d1577429..30735cbfdf 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -64,9 +64,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var isLastAMRetry: Boolean = true
private var amClient: AMRMClient[ContainerRequest] = _
- // Default to numWorkers * 2, with minimum of 3
- private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
- math.max(args.numWorkers * 2, 3))
+ // Default to numExecutors * 2, with minimum of 3
+ private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
+ sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
private var registered = false
@@ -101,7 +101,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// Call this to force generation of secret so it gets populated into the
// hadoop UGI. This has to happen before the startUserClass which does a
- // doAs in order for the credentials to be passed on to the worker containers.
+ // doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
// Start the user's JAR
@@ -120,7 +120,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
// Allocate all containers
- allocateWorkers()
+ allocateExecutors()
// Wait for the user class to Finish
userThread.join()
@@ -202,7 +202,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
t
}
- // This need to happen before allocateWorkers()
+ // This need to happen before allocateExecutors()
private def waitForSparkContextInitialized() {
logInfo("Waiting for Spark context initialization")
try {
@@ -247,18 +247,18 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
}
- private def allocateWorkers() {
+ private def allocateExecutors() {
try {
- logInfo("Allocating " + args.numWorkers + " workers.")
+ logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
- yarnAllocator.addResourceRequests(args.numWorkers)
+ yarnAllocator.addResourceRequests(args.numExecutors)
// Exits the loop if the user thread exits.
- while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
- if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of worker failures reached")
+ "max number of executor failures reached")
}
yarnAllocator.allocateResources()
ApplicationMaster.incrementAllocatorLoop(1)
@@ -269,7 +269,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
- logInfo("All workers have launched.")
+ logInfo("All executors have launched.")
// Launch a progress reporter thread, else the app will get killed after expiration
// (def: 10mins) timeout.
@@ -294,16 +294,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
val t = new Thread {
override def run() {
while (userThread.isAlive) {
- if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of worker failures reached")
+ "max number of executor failures reached")
}
- val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning -
+ val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
- if (missingWorkerCount > 0) {
+ if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingWorkerCount))
- yarnAllocator.addResourceRequests(missingWorkerCount)
+ format(missingExecutorCount))
+ yarnAllocator.addResourceRequests(missingExecutorCount)
}
sendProgress()
Thread.sleep(sleepTime)
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index f1c1fea0b5..b697f10391 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -35,7 +35,7 @@ import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
+class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
extends Logging {
def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
@@ -93,7 +93,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
waitForSparkMaster()
// Allocate all containers
- allocateWorkers()
+ allocateExecutors()
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
@@ -175,7 +175,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
}
- private def allocateWorkers() {
+ private def allocateExecutors() {
// Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
@@ -189,18 +189,18 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
preferredNodeLocationData,
sparkConf)
- logInfo("Allocating " + args.numWorkers + " workers.")
+ logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
- yarnAllocator.addResourceRequests(args.numWorkers)
- while ((yarnAllocator.getNumWorkersRunning < args.numWorkers) && (!driverClosed)) {
+ yarnAllocator.addResourceRequests(args.numExecutors)
+ while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
yarnAllocator.allocateResources()
Thread.sleep(100)
}
- logInfo("All workers have launched.")
+ logInfo("All executors have launched.")
}
@@ -211,12 +211,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
val t = new Thread {
override def run() {
while (!driverClosed) {
- val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning -
+ val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
- if (missingWorkerCount > 0) {
+ if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingWorkerCount))
- yarnAllocator.addResourceRequests(missingWorkerCount)
+ format(missingExecutorCount))
+ yarnAllocator.addResourceRequests(missingExecutorCount)
}
sendProgress()
Thread.sleep(sleepTime)
@@ -244,9 +244,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
}
-object WorkerLauncher {
+object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
- new WorkerLauncher(args).run()
+ new ExecutorLauncher(args).run()
}
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index ab4a79be70..53c403f7d0 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -38,16 +38,16 @@ import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
import org.apache.spark.{SparkConf, Logging}
-class WorkerRunnable(
+class ExecutorRunnable(
container: Container,
conf: Configuration,
spConf: SparkConf,
masterAddress: String,
slaveId: String,
hostname: String,
- workerMemory: Int,
- workerCores: Int)
- extends Runnable with WorkerRunnableUtil with Logging {
+ executorMemory: Int,
+ executorCores: Int)
+ extends Runnable with ExecutorRunnableUtil with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
var nmClient: NMClient = _
@@ -55,7 +55,7 @@ class WorkerRunnable(
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
def run = {
- logInfo("Starting Worker Container")
+ logInfo("Starting Executor Container")
nmClient = NMClient.createNMClient()
nmClient.init(yarnConf)
nmClient.start()
@@ -78,9 +78,9 @@ class WorkerRunnable(
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
- val commands = prepareCommand(masterAddress, slaveId, hostname, workerMemory, workerCores)
+ val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores)
- logInfo("Setting up worker with commands: " + commands)
+ logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)
// Send the start request to the ContainerManager
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 1ac61124cb..e31c4060e8 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -60,9 +60,9 @@ private[yarn] class YarnAllocationHandler(
val conf: Configuration,
val amClient: AMRMClient[ContainerRequest],
val appAttemptId: ApplicationAttemptId,
- val maxWorkers: Int,
- val workerMemory: Int,
- val workerCores: Int,
+ val maxExecutors: Int,
+ val executorMemory: Int,
+ val executorCores: Int,
val preferredHostToCount: Map[String, Int],
val preferredRackToCount: Map[String, Int],
val sparkConf: SparkConf)
@@ -89,20 +89,20 @@ private[yarn] class YarnAllocationHandler(
// Number of container requests that have been sent to, but not yet allocated by the
// ApplicationMaster.
private val numPendingAllocate = new AtomicInteger()
- private val numWorkersRunning = new AtomicInteger()
- // Used to generate a unique id per worker
- private val workerIdCounter = new AtomicInteger()
+ private val numExecutorsRunning = new AtomicInteger()
+ // Used to generate a unique id per executor
+ private val executorIdCounter = new AtomicInteger()
private val lastResponseId = new AtomicInteger()
- private val numWorkersFailed = new AtomicInteger()
+ private val numExecutorsFailed = new AtomicInteger()
def getNumPendingAllocate: Int = numPendingAllocate.intValue
- def getNumWorkersRunning: Int = numWorkersRunning.intValue
+ def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
- def getNumWorkersFailed: Int = numWorkersFailed.intValue
+ def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
}
def releaseContainer(container: Container) {
@@ -127,13 +127,13 @@ private[yarn] class YarnAllocationHandler(
logDebug("""
Allocated containers: %d
- Current worker count: %d
+ Current executor count: %d
Containers released: %s
Containers to-be-released: %s
Cluster resources: %s
""".format(
allocatedContainers.size,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers,
allocateResponse.getAvailableResources))
@@ -240,64 +240,64 @@ private[yarn] class YarnAllocationHandler(
// Run each of the allocated containers.
for (container <- allocatedContainersToProcess) {
- val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
- val workerHostname = container.getNodeId.getHost
+ val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
+ val executorHostname = container.getNodeId.getHost
val containerId = container.getId
- val workerMemoryOverhead = (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
- assert(container.getResource.getMemory >= workerMemoryOverhead)
+ val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ assert(container.getResource.getMemory >= executorMemoryOverhead)
- if (numWorkersRunningNow > maxWorkers) {
+ if (numExecutorsRunningNow > maxExecutors) {
logInfo("""Ignoring container %s at host %s, since we already have the required number of
- containers for it.""".format(containerId, workerHostname))
+ containers for it.""".format(containerId, executorHostname))
releaseContainer(container)
- numWorkersRunning.decrementAndGet()
+ numExecutorsRunning.decrementAndGet()
} else {
- val workerId = workerIdCounter.incrementAndGet().toString
+ val executorId = executorIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
- logInfo("Launching container %s for on host %s".format(containerId, workerHostname))
+ logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
// To be safe, remove the container from `pendingReleaseContainers`.
pendingReleaseContainers.remove(containerId)
- val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
+ val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
allocatedHostToContainersMap.synchronized {
- val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
+ val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId]())
containerSet += containerId
- allocatedContainerToHostMap.put(containerId, workerHostname)
+ allocatedContainerToHostMap.put(containerId, executorHostname)
if (rack != null) {
allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
}
}
- logInfo("Launching WorkerRunnable. driverUrl: %s, workerHostname: %s".format(driverUrl, workerHostname))
- val workerRunnable = new WorkerRunnable(
+ logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(driverUrl, executorHostname))
+ val executorRunnable = new ExecutorRunnable(
container,
conf,
sparkConf,
driverUrl,
- workerId,
- workerHostname,
- workerMemory,
- workerCores)
- new Thread(workerRunnable).start()
+ executorId,
+ executorHostname,
+ executorMemory,
+ executorCores)
+ new Thread(executorRunnable).start()
}
}
logDebug("""
Finished allocating %s containers (from %s originally).
- Current number of workers running: %d,
+ Current number of executors running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
allocatedContainersToProcess,
allocatedContainers,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}
@@ -314,9 +314,9 @@ private[yarn] class YarnAllocationHandler(
// `pendingReleaseContainers`.
pendingReleaseContainers.remove(containerId)
} else {
- // Decrement the number of workers running. The next iteration of the ApplicationMaster's
+ // Decrement the number of executors running. The next iteration of the ApplicationMaster's
// reporting thread will take care of allocating.
- numWorkersRunning.decrementAndGet()
+ numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s (state: %s, exit status: %s)".format(
containerId,
completedContainer.getState,
@@ -326,7 +326,7 @@ private[yarn] class YarnAllocationHandler(
// now I think its ok as none of the containers are expected to exit
if (completedContainer.getExitStatus() != 0) {
logInfo("Container marked as failed: " + containerId)
- numWorkersFailed.incrementAndGet()
+ numExecutorsFailed.incrementAndGet()
}
}
@@ -364,12 +364,12 @@ private[yarn] class YarnAllocationHandler(
}
logDebug("""
Finished processing %d completed containers.
- Current number of workers running: %d,
+ Current number of executors running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
completedContainers.size,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}
@@ -421,18 +421,18 @@ private[yarn] class YarnAllocationHandler(
retval
}
- def addResourceRequests(numWorkers: Int) {
+ def addResourceRequests(numExecutors: Int) {
val containerRequests: List[ContainerRequest] =
- if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
- logDebug("numWorkers: " + numWorkers + ", host preferences: " +
+ if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
+ logDebug("numExecutors: " + numExecutors + ", host preferences: " +
preferredHostToCount.isEmpty)
createResourceRequests(
AllocationType.ANY,
resource = null,
- numWorkers,
+ numExecutors,
YarnAllocationHandler.PRIORITY).toList
} else {
- // Request for all hosts in preferred nodes and for numWorkers -
+ // Request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size)
for ((candidateHost, candidateCount) <- preferredHostToCount) {
@@ -452,7 +452,7 @@ private[yarn] class YarnAllocationHandler(
val anyContainerRequests = createResourceRequests(
AllocationType.ANY,
resource = null,
- numWorkers,
+ numExecutors,
YarnAllocationHandler.PRIORITY)
val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
@@ -468,11 +468,11 @@ private[yarn] class YarnAllocationHandler(
amClient.addContainerRequest(request)
}
- if (numWorkers > 0) {
- numPendingAllocate.addAndGet(numWorkers)
- logInfo("Will Allocate %d worker containers, each with %d memory".format(
- numWorkers,
- (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
+ if (numExecutors > 0) {
+ numPendingAllocate.addAndGet(numExecutors)
+ logInfo("Will Allocate %d executor containers, each with %d memory".format(
+ numExecutors,
+ (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
} else {
logDebug("Empty allocation request ...")
}
@@ -494,7 +494,7 @@ private[yarn] class YarnAllocationHandler(
private def createResourceRequests(
requestType: AllocationType.AllocationType,
resource: String,
- numWorkers: Int,
+ numExecutors: Int,
priority: Int
): ArrayBuffer[ContainerRequest] = {
@@ -507,7 +507,7 @@ private[yarn] class YarnAllocationHandler(
val nodeLocal = constructContainerRequests(
Array(hostname),
racks = null,
- numWorkers,
+ numExecutors,
priority)
// Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler.
@@ -516,10 +516,10 @@ private[yarn] class YarnAllocationHandler(
}
case AllocationType.RACK => {
val rack = resource
- constructContainerRequests(hosts = null, Array(rack), numWorkers, priority)
+ constructContainerRequests(hosts = null, Array(rack), numExecutors, priority)
}
case AllocationType.ANY => constructContainerRequests(
- hosts = null, racks = null, numWorkers, priority)
+ hosts = null, racks = null, numExecutors, priority)
case _ => throw new IllegalArgumentException(
"Unexpected/unsupported request type: " + requestType)
}
@@ -528,18 +528,18 @@ private[yarn] class YarnAllocationHandler(
private def constructContainerRequests(
hosts: Array[String],
racks: Array[String],
- numWorkers: Int,
+ numExecutors: Int,
priority: Int
): ArrayBuffer[ContainerRequest] = {
- val memoryRequest = workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
- val resource = Resource.newInstance(memoryRequest, workerCores)
+ val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ val resource = Resource.newInstance(memoryRequest, executorCores)
val prioritySetting = Records.newRecord(classOf[Priority])
prioritySetting.setPriority(priority)
val requests = new ArrayBuffer[ContainerRequest]()
- for (i <- 0 until numWorkers) {
+ for (i <- 0 until numExecutors) {
requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
}
requests
@@ -574,9 +574,9 @@ object YarnAllocationHandler {
conf,
amClient,
appAttemptId,
- args.numWorkers,
- args.workerMemory,
- args.workerCores,
+ args.numExecutors,
+ args.executorMemory,
+ args.executorCores,
Map[String, Int](),
Map[String, Int](),
sparkConf)
@@ -596,9 +596,9 @@ object YarnAllocationHandler {
conf,
amClient,
appAttemptId,
- args.numWorkers,
- args.workerMemory,
- args.workerCores,
+ args.numExecutors,
+ args.executorMemory,
+ args.executorCores,
hostToSplitCount,
rackToSplitCount,
sparkConf)
@@ -608,9 +608,9 @@ object YarnAllocationHandler {
conf: Configuration,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
- maxWorkers: Int,
- workerMemory: Int,
- workerCores: Int,
+ maxExecutors: Int,
+ executorMemory: Int,
+ executorCores: Int,
map: collection.Map[String, collection.Set[SplitInfo]],
sparkConf: SparkConf
): YarnAllocationHandler = {
@@ -619,9 +619,9 @@ object YarnAllocationHandler {
conf,
amClient,
appAttemptId,
- maxWorkers,
- workerMemory,
- workerCores,
+ maxExecutors,
+ executorMemory,
+ executorCores,
hostToCount,
rackToCount,
sparkConf)