aboutsummaryrefslogtreecommitdiff
path: root/yarn/stable
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-03-13 12:11:33 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-13 12:11:33 -0700
commit698373211ef3cdf841c82d48168cd5dbe00a57b4 (patch)
treea07edbe4835a7b01aa48cf9bd35c0d6939d21d78 /yarn/stable
parente4e8d8f395aea48f0cae00d7c381a863c48a2837 (diff)
downloadspark-698373211ef3cdf841c82d48168cd5dbe00a57b4.tar.gz
spark-698373211ef3cdf841c82d48168cd5dbe00a57b4.tar.bz2
spark-698373211ef3cdf841c82d48168cd5dbe00a57b4.zip
SPARK-1183. Don't use "worker" to mean executor
Author: Sandy Ryza <sandy@cloudera.com> Closes #120 from sryza/sandy-spark-1183 and squashes the following commits: 5066a4a [Sandy Ryza] Remove "worker" in a couple comments 0bd1e46 [Sandy Ryza] Remove --am-class from usage bfc8fe0 [Sandy Ryza] Remove am-class from doc and fix yarn-alpha 607539f [Sandy Ryza] Address review comments 74d087a [Sandy Ryza] SPARK-1183. Don't use "worker" to mean executor
Diffstat (limited to 'yarn/stable')
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala38
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala (renamed from yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala)26
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala (renamed from yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala)14
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala138
4 files changed, 108 insertions, 108 deletions
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 57d1577429..30735cbfdf 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -64,9 +64,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var isLastAMRetry: Boolean = true
private var amClient: AMRMClient[ContainerRequest] = _
- // Default to numWorkers * 2, with minimum of 3
- private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
- math.max(args.numWorkers * 2, 3))
+ // Default to numExecutors * 2, with minimum of 3
+ private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
+ sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
private var registered = false
@@ -101,7 +101,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// Call this to force generation of secret so it gets populated into the
// hadoop UGI. This has to happen before the startUserClass which does a
- // doAs in order for the credentials to be passed on to the worker containers.
+ // doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
// Start the user's JAR
@@ -120,7 +120,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
// Allocate all containers
- allocateWorkers()
+ allocateExecutors()
// Wait for the user class to Finish
userThread.join()
@@ -202,7 +202,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
t
}
- // This need to happen before allocateWorkers()
+ // This need to happen before allocateExecutors()
private def waitForSparkContextInitialized() {
logInfo("Waiting for Spark context initialization")
try {
@@ -247,18 +247,18 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
}
- private def allocateWorkers() {
+ private def allocateExecutors() {
try {
- logInfo("Allocating " + args.numWorkers + " workers.")
+ logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
- yarnAllocator.addResourceRequests(args.numWorkers)
+ yarnAllocator.addResourceRequests(args.numExecutors)
// Exits the loop if the user thread exits.
- while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
- if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of worker failures reached")
+ "max number of executor failures reached")
}
yarnAllocator.allocateResources()
ApplicationMaster.incrementAllocatorLoop(1)
@@ -269,7 +269,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
- logInfo("All workers have launched.")
+ logInfo("All executors have launched.")
// Launch a progress reporter thread, else the app will get killed after expiration
// (def: 10mins) timeout.
@@ -294,16 +294,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
val t = new Thread {
override def run() {
while (userThread.isAlive) {
- if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of worker failures reached")
+ "max number of executor failures reached")
}
- val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning -
+ val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
- if (missingWorkerCount > 0) {
+ if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingWorkerCount))
- yarnAllocator.addResourceRequests(missingWorkerCount)
+ format(missingExecutorCount))
+ yarnAllocator.addResourceRequests(missingExecutorCount)
}
sendProgress()
Thread.sleep(sleepTime)
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index f1c1fea0b5..b697f10391 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -35,7 +35,7 @@ import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
+class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
extends Logging {
def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
@@ -93,7 +93,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
waitForSparkMaster()
// Allocate all containers
- allocateWorkers()
+ allocateExecutors()
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
@@ -175,7 +175,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
}
- private def allocateWorkers() {
+ private def allocateExecutors() {
// Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
@@ -189,18 +189,18 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
preferredNodeLocationData,
sparkConf)
- logInfo("Allocating " + args.numWorkers + " workers.")
+ logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
- yarnAllocator.addResourceRequests(args.numWorkers)
- while ((yarnAllocator.getNumWorkersRunning < args.numWorkers) && (!driverClosed)) {
+ yarnAllocator.addResourceRequests(args.numExecutors)
+ while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
yarnAllocator.allocateResources()
Thread.sleep(100)
}
- logInfo("All workers have launched.")
+ logInfo("All executors have launched.")
}
@@ -211,12 +211,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
val t = new Thread {
override def run() {
while (!driverClosed) {
- val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning -
+ val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
- if (missingWorkerCount > 0) {
+ if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingWorkerCount))
- yarnAllocator.addResourceRequests(missingWorkerCount)
+ format(missingExecutorCount))
+ yarnAllocator.addResourceRequests(missingExecutorCount)
}
sendProgress()
Thread.sleep(sleepTime)
@@ -244,9 +244,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
}
-object WorkerLauncher {
+object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
- new WorkerLauncher(args).run()
+ new ExecutorLauncher(args).run()
}
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index ab4a79be70..53c403f7d0 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -38,16 +38,16 @@ import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
import org.apache.spark.{SparkConf, Logging}
-class WorkerRunnable(
+class ExecutorRunnable(
container: Container,
conf: Configuration,
spConf: SparkConf,
masterAddress: String,
slaveId: String,
hostname: String,
- workerMemory: Int,
- workerCores: Int)
- extends Runnable with WorkerRunnableUtil with Logging {
+ executorMemory: Int,
+ executorCores: Int)
+ extends Runnable with ExecutorRunnableUtil with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
var nmClient: NMClient = _
@@ -55,7 +55,7 @@ class WorkerRunnable(
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
def run = {
- logInfo("Starting Worker Container")
+ logInfo("Starting Executor Container")
nmClient = NMClient.createNMClient()
nmClient.init(yarnConf)
nmClient.start()
@@ -78,9 +78,9 @@ class WorkerRunnable(
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
- val commands = prepareCommand(masterAddress, slaveId, hostname, workerMemory, workerCores)
+ val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores)
- logInfo("Setting up worker with commands: " + commands)
+ logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)
// Send the start request to the ContainerManager
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 1ac61124cb..e31c4060e8 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -60,9 +60,9 @@ private[yarn] class YarnAllocationHandler(
val conf: Configuration,
val amClient: AMRMClient[ContainerRequest],
val appAttemptId: ApplicationAttemptId,
- val maxWorkers: Int,
- val workerMemory: Int,
- val workerCores: Int,
+ val maxExecutors: Int,
+ val executorMemory: Int,
+ val executorCores: Int,
val preferredHostToCount: Map[String, Int],
val preferredRackToCount: Map[String, Int],
val sparkConf: SparkConf)
@@ -89,20 +89,20 @@ private[yarn] class YarnAllocationHandler(
// Number of container requests that have been sent to, but not yet allocated by the
// ApplicationMaster.
private val numPendingAllocate = new AtomicInteger()
- private val numWorkersRunning = new AtomicInteger()
- // Used to generate a unique id per worker
- private val workerIdCounter = new AtomicInteger()
+ private val numExecutorsRunning = new AtomicInteger()
+ // Used to generate a unique id per executor
+ private val executorIdCounter = new AtomicInteger()
private val lastResponseId = new AtomicInteger()
- private val numWorkersFailed = new AtomicInteger()
+ private val numExecutorsFailed = new AtomicInteger()
def getNumPendingAllocate: Int = numPendingAllocate.intValue
- def getNumWorkersRunning: Int = numWorkersRunning.intValue
+ def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
- def getNumWorkersFailed: Int = numWorkersFailed.intValue
+ def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
}
def releaseContainer(container: Container) {
@@ -127,13 +127,13 @@ private[yarn] class YarnAllocationHandler(
logDebug("""
Allocated containers: %d
- Current worker count: %d
+ Current executor count: %d
Containers released: %s
Containers to-be-released: %s
Cluster resources: %s
""".format(
allocatedContainers.size,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers,
allocateResponse.getAvailableResources))
@@ -240,64 +240,64 @@ private[yarn] class YarnAllocationHandler(
// Run each of the allocated containers.
for (container <- allocatedContainersToProcess) {
- val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
- val workerHostname = container.getNodeId.getHost
+ val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
+ val executorHostname = container.getNodeId.getHost
val containerId = container.getId
- val workerMemoryOverhead = (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
- assert(container.getResource.getMemory >= workerMemoryOverhead)
+ val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ assert(container.getResource.getMemory >= executorMemoryOverhead)
- if (numWorkersRunningNow > maxWorkers) {
+ if (numExecutorsRunningNow > maxExecutors) {
logInfo("""Ignoring container %s at host %s, since we already have the required number of
- containers for it.""".format(containerId, workerHostname))
+ containers for it.""".format(containerId, executorHostname))
releaseContainer(container)
- numWorkersRunning.decrementAndGet()
+ numExecutorsRunning.decrementAndGet()
} else {
- val workerId = workerIdCounter.incrementAndGet().toString
+ val executorId = executorIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
- logInfo("Launching container %s for on host %s".format(containerId, workerHostname))
+ logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
// To be safe, remove the container from `pendingReleaseContainers`.
pendingReleaseContainers.remove(containerId)
- val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
+ val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
allocatedHostToContainersMap.synchronized {
- val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
+ val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId]())
containerSet += containerId
- allocatedContainerToHostMap.put(containerId, workerHostname)
+ allocatedContainerToHostMap.put(containerId, executorHostname)
if (rack != null) {
allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
}
}
- logInfo("Launching WorkerRunnable. driverUrl: %s, workerHostname: %s".format(driverUrl, workerHostname))
- val workerRunnable = new WorkerRunnable(
+ logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(driverUrl, executorHostname))
+ val executorRunnable = new ExecutorRunnable(
container,
conf,
sparkConf,
driverUrl,
- workerId,
- workerHostname,
- workerMemory,
- workerCores)
- new Thread(workerRunnable).start()
+ executorId,
+ executorHostname,
+ executorMemory,
+ executorCores)
+ new Thread(executorRunnable).start()
}
}
logDebug("""
Finished allocating %s containers (from %s originally).
- Current number of workers running: %d,
+ Current number of executors running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
allocatedContainersToProcess,
allocatedContainers,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}
@@ -314,9 +314,9 @@ private[yarn] class YarnAllocationHandler(
// `pendingReleaseContainers`.
pendingReleaseContainers.remove(containerId)
} else {
- // Decrement the number of workers running. The next iteration of the ApplicationMaster's
+ // Decrement the number of executors running. The next iteration of the ApplicationMaster's
// reporting thread will take care of allocating.
- numWorkersRunning.decrementAndGet()
+ numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s (state: %s, exit status: %s)".format(
containerId,
completedContainer.getState,
@@ -326,7 +326,7 @@ private[yarn] class YarnAllocationHandler(
// now I think its ok as none of the containers are expected to exit
if (completedContainer.getExitStatus() != 0) {
logInfo("Container marked as failed: " + containerId)
- numWorkersFailed.incrementAndGet()
+ numExecutorsFailed.incrementAndGet()
}
}
@@ -364,12 +364,12 @@ private[yarn] class YarnAllocationHandler(
}
logDebug("""
Finished processing %d completed containers.
- Current number of workers running: %d,
+ Current number of executors running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
completedContainers.size,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}
@@ -421,18 +421,18 @@ private[yarn] class YarnAllocationHandler(
retval
}
- def addResourceRequests(numWorkers: Int) {
+ def addResourceRequests(numExecutors: Int) {
val containerRequests: List[ContainerRequest] =
- if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
- logDebug("numWorkers: " + numWorkers + ", host preferences: " +
+ if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
+ logDebug("numExecutors: " + numExecutors + ", host preferences: " +
preferredHostToCount.isEmpty)
createResourceRequests(
AllocationType.ANY,
resource = null,
- numWorkers,
+ numExecutors,
YarnAllocationHandler.PRIORITY).toList
} else {
- // Request for all hosts in preferred nodes and for numWorkers -
+ // Request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size)
for ((candidateHost, candidateCount) <- preferredHostToCount) {
@@ -452,7 +452,7 @@ private[yarn] class YarnAllocationHandler(
val anyContainerRequests = createResourceRequests(
AllocationType.ANY,
resource = null,
- numWorkers,
+ numExecutors,
YarnAllocationHandler.PRIORITY)
val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
@@ -468,11 +468,11 @@ private[yarn] class YarnAllocationHandler(
amClient.addContainerRequest(request)
}
- if (numWorkers > 0) {
- numPendingAllocate.addAndGet(numWorkers)
- logInfo("Will Allocate %d worker containers, each with %d memory".format(
- numWorkers,
- (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
+ if (numExecutors > 0) {
+ numPendingAllocate.addAndGet(numExecutors)
+ logInfo("Will Allocate %d executor containers, each with %d memory".format(
+ numExecutors,
+ (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
} else {
logDebug("Empty allocation request ...")
}
@@ -494,7 +494,7 @@ private[yarn] class YarnAllocationHandler(
private def createResourceRequests(
requestType: AllocationType.AllocationType,
resource: String,
- numWorkers: Int,
+ numExecutors: Int,
priority: Int
): ArrayBuffer[ContainerRequest] = {
@@ -507,7 +507,7 @@ private[yarn] class YarnAllocationHandler(
val nodeLocal = constructContainerRequests(
Array(hostname),
racks = null,
- numWorkers,
+ numExecutors,
priority)
// Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler.
@@ -516,10 +516,10 @@ private[yarn] class YarnAllocationHandler(
}
case AllocationType.RACK => {
val rack = resource
- constructContainerRequests(hosts = null, Array(rack), numWorkers, priority)
+ constructContainerRequests(hosts = null, Array(rack), numExecutors, priority)
}
case AllocationType.ANY => constructContainerRequests(
- hosts = null, racks = null, numWorkers, priority)
+ hosts = null, racks = null, numExecutors, priority)
case _ => throw new IllegalArgumentException(
"Unexpected/unsupported request type: " + requestType)
}
@@ -528,18 +528,18 @@ private[yarn] class YarnAllocationHandler(
private def constructContainerRequests(
hosts: Array[String],
racks: Array[String],
- numWorkers: Int,
+ numExecutors: Int,
priority: Int
): ArrayBuffer[ContainerRequest] = {
- val memoryRequest = workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
- val resource = Resource.newInstance(memoryRequest, workerCores)
+ val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ val resource = Resource.newInstance(memoryRequest, executorCores)
val prioritySetting = Records.newRecord(classOf[Priority])
prioritySetting.setPriority(priority)
val requests = new ArrayBuffer[ContainerRequest]()
- for (i <- 0 until numWorkers) {
+ for (i <- 0 until numExecutors) {
requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
}
requests
@@ -574,9 +574,9 @@ object YarnAllocationHandler {
conf,
amClient,
appAttemptId,
- args.numWorkers,
- args.workerMemory,
- args.workerCores,
+ args.numExecutors,
+ args.executorMemory,
+ args.executorCores,
Map[String, Int](),
Map[String, Int](),
sparkConf)
@@ -596,9 +596,9 @@ object YarnAllocationHandler {
conf,
amClient,
appAttemptId,
- args.numWorkers,
- args.workerMemory,
- args.workerCores,
+ args.numExecutors,
+ args.executorMemory,
+ args.executorCores,
hostToSplitCount,
rackToSplitCount,
sparkConf)
@@ -608,9 +608,9 @@ object YarnAllocationHandler {
conf: Configuration,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
- maxWorkers: Int,
- workerMemory: Int,
- workerCores: Int,
+ maxExecutors: Int,
+ executorMemory: Int,
+ executorCores: Int,
map: collection.Map[String, collection.Set[SplitInfo]],
sparkConf: SparkConf
): YarnAllocationHandler = {
@@ -619,9 +619,9 @@ object YarnAllocationHandler {
conf,
amClient,
appAttemptId,
- maxWorkers,
- workerMemory,
- workerCores,
+ maxExecutors,
+ executorMemory,
+ executorCores,
hostToCount,
rackToCount,
sparkConf)