aboutsummaryrefslogtreecommitdiff
path: root/yarn/alpha
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-03-13 12:11:33 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-13 12:11:33 -0700
commit698373211ef3cdf841c82d48168cd5dbe00a57b4 (patch)
treea07edbe4835a7b01aa48cf9bd35c0d6939d21d78 /yarn/alpha
parente4e8d8f395aea48f0cae00d7c381a863c48a2837 (diff)
downloadspark-698373211ef3cdf841c82d48168cd5dbe00a57b4.tar.gz
spark-698373211ef3cdf841c82d48168cd5dbe00a57b4.tar.bz2
spark-698373211ef3cdf841c82d48168cd5dbe00a57b4.zip
SPARK-1183. Don't use "worker" to mean executor
Author: Sandy Ryza <sandy@cloudera.com> Closes #120 from sryza/sandy-spark-1183 and squashes the following commits: 5066a4a [Sandy Ryza] Remove "worker" in a couple comments 0bd1e46 [Sandy Ryza] Remove --am-class from usage bfc8fe0 [Sandy Ryza] Remove am-class from doc and fix yarn-alpha 607539f [Sandy Ryza] Address review comments 74d087a [Sandy Ryza] SPARK-1183. Don't use "worker" to mean executor
Diffstat (limited to 'yarn/alpha')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala38
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala (renamed from yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala)28
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala (renamed from yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala)14
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala124
4 files changed, 102 insertions, 102 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 87785cdc60..910484ed54 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -61,9 +61,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true
- // Default to numWorkers * 2, with minimum of 3
- private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
- math.max(args.numWorkers * 2, 3))
+ // Default to numExecutors * 2, with minimum of 3
+ private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
+ sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
private var registered = false
@@ -96,7 +96,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// Call this to force generation of secret so it gets populated into the
// hadoop UGI. This has to happen before the startUserClass which does a
- // doAs in order for the credentials to be passed on to the worker containers.
+ // doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
// Start the user's JAR
@@ -115,7 +115,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
// Allocate all containers
- allocateWorkers()
+ allocateExecutors()
// Wait for the user class to Finish
userThread.join()
@@ -215,7 +215,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
t
}
- // this need to happen before allocateWorkers
+ // this need to happen before allocateExecutors
private def waitForSparkContextInitialized() {
logInfo("Waiting for spark context initialization")
try {
@@ -260,21 +260,21 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
}
- private def allocateWorkers() {
+ private def allocateExecutors() {
try {
- logInfo("Allocating " + args.numWorkers + " workers.")
+ logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
// Exists the loop if the user thread exits.
- while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
- if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of worker failures reached")
+ "max number of executor failures reached")
}
yarnAllocator.allocateContainers(
- math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+ math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
}
@@ -283,7 +283,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
- logInfo("All workers have launched.")
+ logInfo("All executors have launched.")
// Launch a progress reporter thread, else the app will get killed after expiration
// (def: 10mins) timeout.
@@ -309,15 +309,15 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
val t = new Thread {
override def run() {
while (userThread.isAlive) {
- if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of worker failures reached")
+ "max number of executor failures reached")
}
- val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
- if (missingWorkerCount > 0) {
+ val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
+ if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingWorkerCount))
- yarnAllocator.allocateContainers(missingWorkerCount)
+ format(missingExecutorCount))
+ yarnAllocator.allocateContainers(missingExecutorCount)
}
else sendProgress()
Thread.sleep(sleepTime)
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index b735d01df8..7b0e020263 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -34,7 +34,7 @@ import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
+class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
extends Logging {
def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
@@ -89,7 +89,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
if (minimumMemory > 0) {
- val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
if (numCore > 0) {
@@ -102,7 +102,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
waitForSparkMaster()
// Allocate all containers
- allocateWorkers()
+ allocateExecutors()
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
@@ -199,7 +199,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
}
- private def allocateWorkers() {
+ private def allocateExecutors() {
// Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
@@ -208,16 +208,16 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
args, preferredNodeLocationData, sparkConf)
- logInfo("Allocating " + args.numWorkers + " workers.")
+ logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
- while ((yarnAllocator.getNumWorkersRunning < args.numWorkers) && (!driverClosed)) {
- yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+ while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
+ yarnAllocator.allocateContainers(math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
Thread.sleep(100)
}
- logInfo("All workers have launched.")
+ logInfo("All executors have launched.")
}
@@ -228,10 +228,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
val t = new Thread {
override def run() {
while (!driverClosed) {
- val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
- if (missingWorkerCount > 0) {
- logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
- yarnAllocator.allocateContainers(missingWorkerCount)
+ val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
+ if (missingExecutorCount > 0) {
+ logInfo("Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers")
+ yarnAllocator.allocateContainers(missingExecutorCount)
}
else sendProgress()
Thread.sleep(sleepTime)
@@ -264,9 +264,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
}
-object WorkerLauncher {
+object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
- new WorkerLauncher(args).run()
+ new ExecutorLauncher(args).run()
}
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 8c686e393f..981e8b05f6 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -38,16 +38,16 @@ import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
import org.apache.spark.{SparkConf, Logging}
-class WorkerRunnable(
+class ExecutorRunnable(
container: Container,
conf: Configuration,
spConf: SparkConf,
masterAddress: String,
slaveId: String,
hostname: String,
- workerMemory: Int,
- workerCores: Int)
- extends Runnable with WorkerRunnableUtil with Logging {
+ executorMemory: Int,
+ executorCores: Int)
+ extends Runnable with ExecutorRunnableUtil with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
var cm: ContainerManager = _
@@ -55,7 +55,7 @@ class WorkerRunnable(
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
def run = {
- logInfo("Starting Worker Container")
+ logInfo("Starting Executor Container")
cm = connectToCM
startContainer
}
@@ -81,8 +81,8 @@ class WorkerRunnable(
credentials.writeTokenStorageToStream(dob)
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
- val commands = prepareCommand(masterAddress, slaveId, hostname, workerMemory, workerCores)
- logInfo("Setting up worker with commands: " + commands)
+ val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores)
+ logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)
// Send the start request to the ContainerManager
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index e91257be8e..2056667af5 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -58,9 +58,9 @@ private[yarn] class YarnAllocationHandler(
val conf: Configuration,
val resourceManager: AMRMProtocol,
val appAttemptId: ApplicationAttemptId,
- val maxWorkers: Int,
- val workerMemory: Int,
- val workerCores: Int,
+ val maxExecutors: Int,
+ val executorMemory: Int,
+ val executorCores: Int,
val preferredHostToCount: Map[String, Int],
val preferredRackToCount: Map[String, Int],
val sparkConf: SparkConf)
@@ -84,39 +84,39 @@ private[yarn] class YarnAllocationHandler(
// Containers to be released in next request to RM
private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
- private val numWorkersRunning = new AtomicInteger()
- // Used to generate a unique id per worker
- private val workerIdCounter = new AtomicInteger()
+ private val numExecutorsRunning = new AtomicInteger()
+ // Used to generate a unique id per executor
+ private val executorIdCounter = new AtomicInteger()
private val lastResponseId = new AtomicInteger()
- private val numWorkersFailed = new AtomicInteger()
+ private val numExecutorsFailed = new AtomicInteger()
- def getNumWorkersRunning: Int = numWorkersRunning.intValue
+ def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
- def getNumWorkersFailed: Int = numWorkersFailed.intValue
+ def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
}
- def allocateContainers(workersToRequest: Int) {
+ def allocateContainers(executorsToRequest: Int) {
// We need to send the request only once from what I understand ... but for now, not modifying
// this much.
// Keep polling the Resource Manager for containers
- val amResp = allocateWorkerResources(workersToRequest).getAMResponse
+ val amResp = allocateExecutorResources(executorsToRequest).getAMResponse
val _allocatedContainers = amResp.getAllocatedContainers()
if (_allocatedContainers.size > 0) {
logDebug("""
Allocated containers: %d
- Current worker count: %d
+ Current executor count: %d
Containers released: %s
Containers to be released: %s
Cluster resources: %s
""".format(
_allocatedContainers.size,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers,
amResp.getAvailableResources))
@@ -221,59 +221,59 @@ private[yarn] class YarnAllocationHandler(
// Run each of the allocated containers
for (container <- allocatedContainers) {
- val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
- val workerHostname = container.getNodeId.getHost
+ val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
+ val executorHostname = container.getNodeId.getHost
val containerId = container.getId
assert(
- container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+ container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
- if (numWorkersRunningNow > maxWorkers) {
+ if (numExecutorsRunningNow > maxExecutors) {
logInfo("""Ignoring container %s at host %s, since we already have the required number of
- containers for it.""".format(containerId, workerHostname))
+ containers for it.""".format(containerId, executorHostname))
releasedContainerList.add(containerId)
// reset counter back to old value.
- numWorkersRunning.decrementAndGet()
+ numExecutorsRunning.decrementAndGet()
}
else {
// Deallocate + allocate can result in reusing id's wrongly - so use a different counter
- // (workerIdCounter)
- val workerId = workerIdCounter.incrementAndGet().toString
+ // (executorIdCounter)
+ val executorId = executorIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
- logInfo("launching container on " + containerId + " host " + workerHostname)
+ logInfo("launching container on " + containerId + " host " + executorHostname)
// Just to be safe, simply remove it from pendingReleaseContainers.
// Should not be there, but ..
pendingReleaseContainers.remove(containerId)
- val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
+ val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
allocatedHostToContainersMap.synchronized {
- val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
+ val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId]())
containerSet += containerId
- allocatedContainerToHostMap.put(containerId, workerHostname)
+ allocatedContainerToHostMap.put(containerId, executorHostname)
if (rack != null) {
allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
}
}
new Thread(
- new WorkerRunnable(container, conf, sparkConf, driverUrl, workerId,
- workerHostname, workerMemory, workerCores)
+ new ExecutorRunnable(container, conf, sparkConf, driverUrl, executorId,
+ executorHostname, executorMemory, executorCores)
).start()
}
}
logDebug("""
Finished processing %d containers.
- Current number of workers running: %d,
+ Current number of executors running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
allocatedContainers.size,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}
@@ -292,7 +292,7 @@ private[yarn] class YarnAllocationHandler(
}
else {
// Simply decrement count - next iteration of ReporterThread will take care of allocating.
- numWorkersRunning.decrementAndGet()
+ numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s (state: %s, exit status: %s)".format(
containerId,
completedContainer.getState,
@@ -302,7 +302,7 @@ private[yarn] class YarnAllocationHandler(
// now I think its ok as none of the containers are expected to exit
if (completedContainer.getExitStatus() != 0) {
logInfo("Container marked as failed: " + containerId)
- numWorkersFailed.incrementAndGet()
+ numExecutorsFailed.incrementAndGet()
}
}
@@ -332,12 +332,12 @@ private[yarn] class YarnAllocationHandler(
}
logDebug("""
Finished processing %d completed containers.
- Current number of workers running: %d,
+ Current number of executors running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
completedContainers.size,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}
@@ -387,18 +387,18 @@ private[yarn] class YarnAllocationHandler(
retval
}
- private def allocateWorkerResources(numWorkers: Int): AllocateResponse = {
+ private def allocateExecutorResources(numExecutors: Int): AllocateResponse = {
var resourceRequests: List[ResourceRequest] = null
// default.
- if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
- logDebug("numWorkers: " + numWorkers + ", host preferences: " + preferredHostToCount.isEmpty)
+ if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
+ logDebug("numExecutors: " + numExecutors + ", host preferences: " + preferredHostToCount.isEmpty)
resourceRequests = List(
- createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY))
+ createResourceRequest(AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
}
else {
- // request for all hosts in preferred nodes and for numWorkers -
+ // request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
val hostContainerRequests: ArrayBuffer[ResourceRequest] =
new ArrayBuffer[ResourceRequest](preferredHostToCount.size)
@@ -419,7 +419,7 @@ private[yarn] class YarnAllocationHandler(
val anyContainerRequests: ResourceRequest = createResourceRequest(
AllocationType.ANY,
resource = null,
- numWorkers,
+ numExecutors,
YarnAllocationHandler.PRIORITY)
val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
@@ -441,9 +441,9 @@ private[yarn] class YarnAllocationHandler(
val releasedContainerList = createReleasedContainerList()
req.addAllReleases(releasedContainerList)
- if (numWorkers > 0) {
- logInfo("Allocating %d worker containers with %d of memory each.".format(numWorkers,
- workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+ if (numExecutors > 0) {
+ logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors,
+ executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
}
else {
logDebug("Empty allocation req .. release : " + releasedContainerList)
@@ -464,7 +464,7 @@ private[yarn] class YarnAllocationHandler(
private def createResourceRequest(
requestType: AllocationType.AllocationType,
resource:String,
- numWorkers: Int,
+ numExecutors: Int,
priority: Int): ResourceRequest = {
// If hostname specified, we need atleast two requests - node local and rack local.
@@ -473,7 +473,7 @@ private[yarn] class YarnAllocationHandler(
case AllocationType.HOST => {
assert(YarnAllocationHandler.ANY_HOST != resource)
val hostname = resource
- val nodeLocal = createResourceRequestImpl(hostname, numWorkers, priority)
+ val nodeLocal = createResourceRequestImpl(hostname, numExecutors, priority)
// Add to host->rack mapping
YarnAllocationHandler.populateRackInfo(conf, hostname)
@@ -482,10 +482,10 @@ private[yarn] class YarnAllocationHandler(
}
case AllocationType.RACK => {
val rack = resource
- createResourceRequestImpl(rack, numWorkers, priority)
+ createResourceRequestImpl(rack, numExecutors, priority)
}
case AllocationType.ANY => createResourceRequestImpl(
- YarnAllocationHandler.ANY_HOST, numWorkers, priority)
+ YarnAllocationHandler.ANY_HOST, numExecutors, priority)
case _ => throw new IllegalArgumentException(
"Unexpected/unsupported request type: " + requestType)
}
@@ -493,13 +493,13 @@ private[yarn] class YarnAllocationHandler(
private def createResourceRequestImpl(
hostname:String,
- numWorkers: Int,
+ numExecutors: Int,
priority: Int): ResourceRequest = {
val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
val memCapability = Records.newRecord(classOf[Resource])
// There probably is some overhead here, let's reserve a bit more memory.
- memCapability.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ memCapability.setMemory(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
rsrcRequest.setCapability(memCapability)
val pri = Records.newRecord(classOf[Priority])
@@ -508,7 +508,7 @@ private[yarn] class YarnAllocationHandler(
rsrcRequest.setHostName(hostname)
- rsrcRequest.setNumContainers(java.lang.Math.max(numWorkers, 0))
+ rsrcRequest.setNumContainers(java.lang.Math.max(numExecutors, 0))
rsrcRequest
}
@@ -560,9 +560,9 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
- args.numWorkers,
- args.workerMemory,
- args.workerCores,
+ args.numExecutors,
+ args.executorMemory,
+ args.executorCores,
Map[String, Int](),
Map[String, Int](),
sparkConf)
@@ -582,9 +582,9 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
- args.numWorkers,
- args.workerMemory,
- args.workerCores,
+ args.numExecutors,
+ args.executorMemory,
+ args.executorCores,
hostToCount,
rackToCount,
sparkConf)
@@ -594,9 +594,9 @@ object YarnAllocationHandler {
conf: Configuration,
resourceManager: AMRMProtocol,
appAttemptId: ApplicationAttemptId,
- maxWorkers: Int,
- workerMemory: Int,
- workerCores: Int,
+ maxExecutors: Int,
+ executorMemory: Int,
+ executorCores: Int,
map: collection.Map[String, collection.Set[SplitInfo]],
sparkConf: SparkConf): YarnAllocationHandler = {
@@ -606,9 +606,9 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
- maxWorkers,
- workerMemory,
- workerCores,
+ maxExecutors,
+ executorMemory,
+ executorCores,
hostToCount,
rackToCount,
sparkConf)