diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-03-13 12:11:33 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-03-13 12:11:33 -0700 |
commit | 698373211ef3cdf841c82d48168cd5dbe00a57b4 (patch) | |
tree | a07edbe4835a7b01aa48cf9bd35c0d6939d21d78 /yarn/alpha | |
parent | e4e8d8f395aea48f0cae00d7c381a863c48a2837 (diff) | |
download | spark-698373211ef3cdf841c82d48168cd5dbe00a57b4.tar.gz spark-698373211ef3cdf841c82d48168cd5dbe00a57b4.tar.bz2 spark-698373211ef3cdf841c82d48168cd5dbe00a57b4.zip |
SPARK-1183. Don't use "worker" to mean executor
Author: Sandy Ryza <sandy@cloudera.com>
Closes #120 from sryza/sandy-spark-1183 and squashes the following commits:
5066a4a [Sandy Ryza] Remove "worker" in a couple comments
0bd1e46 [Sandy Ryza] Remove --am-class from usage
bfc8fe0 [Sandy Ryza] Remove am-class from doc and fix yarn-alpha
607539f [Sandy Ryza] Address review comments
74d087a [Sandy Ryza] SPARK-1183. Don't use "worker" to mean executor
Diffstat (limited to 'yarn/alpha')
-rw-r--r-- | yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 38 | ||||
-rw-r--r-- | yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala (renamed from yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala) | 28 | ||||
-rw-r--r-- | yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala (renamed from yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala) | 14 | ||||
-rw-r--r-- | yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 124 |
4 files changed, 102 insertions, 102 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 87785cdc60..910484ed54 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -61,9 +61,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) private var isLastAMRetry: Boolean = true - // Default to numWorkers * 2, with minimum of 3 - private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", - math.max(args.numWorkers * 2, 3)) + // Default to numExecutors * 2, with minimum of 3 + private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", + sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) private var registered = false @@ -96,7 +96,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // Call this to force generation of secret so it gets populated into the // hadoop UGI. This has to happen before the startUserClass which does a - // doAs in order for the credentials to be passed on to the worker containers. + // doAs in order for the credentials to be passed on to the executor containers. val securityMgr = new SecurityManager(sparkConf) // Start the user's JAR @@ -115,7 +115,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } // Allocate all containers - allocateWorkers() + allocateExecutors() // Wait for the user class to Finish userThread.join() @@ -215,7 +215,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, t } - // this need to happen before allocateWorkers + // this need to happen before allocateExecutors private def waitForSparkContextInitialized() { logInfo("Waiting for spark context initialization") try { @@ -260,21 +260,21 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } } - private def allocateWorkers() { + private def allocateExecutors() { try { - logInfo("Allocating " + args.numWorkers + " workers.") + logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished // TODO: This is a bit ugly. Can we make it nicer? // TODO: Handle container failure // Exists the loop if the user thread exits. - while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) { - if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) { + if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of worker failures reached") + "max number of executor failures reached") } yarnAllocator.allocateContainers( - math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)) + math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0)) ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) } @@ -283,7 +283,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // so that the loop in ApplicationMaster#sparkContextInitialized() breaks. ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) } - logInfo("All workers have launched.") + logInfo("All executors have launched.") // Launch a progress reporter thread, else the app will get killed after expiration // (def: 10mins) timeout. @@ -309,15 +309,15 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, val t = new Thread { override def run() { while (userThread.isAlive) { - if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of worker failures reached") + "max number of executor failures reached") } - val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning - if (missingWorkerCount > 0) { + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning + if (missingExecutorCount > 0) { logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingWorkerCount)) - yarnAllocator.allocateContainers(missingWorkerCount) + format(missingExecutorCount)) + yarnAllocator.allocateContainers(missingExecutorCount) } else sendProgress() Thread.sleep(sleepTime) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index b735d01df8..7b0e020263 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -34,7 +34,7 @@ import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo -class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) +class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging { def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf) @@ -89,7 +89,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() if (minimumMemory > 0) { - val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) if (numCore > 0) { @@ -102,7 +102,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar waitForSparkMaster() // Allocate all containers - allocateWorkers() + allocateExecutors() // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. @@ -199,7 +199,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar } - private def allocateWorkers() { + private def allocateExecutors() { // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now. val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = @@ -208,16 +208,16 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, preferredNodeLocationData, sparkConf) - logInfo("Allocating " + args.numWorkers + " workers.") + logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished // TODO: This is a bit ugly. Can we make it nicer? // TODO: Handle container failure - while ((yarnAllocator.getNumWorkersRunning < args.numWorkers) && (!driverClosed)) { - yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)) + while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { + yarnAllocator.allocateContainers(math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0)) Thread.sleep(100) } - logInfo("All workers have launched.") + logInfo("All executors have launched.") } @@ -228,10 +228,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar val t = new Thread { override def run() { while (!driverClosed) { - val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning - if (missingWorkerCount > 0) { - logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers") - yarnAllocator.allocateContainers(missingWorkerCount) + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning + if (missingExecutorCount > 0) { + logInfo("Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers") + yarnAllocator.allocateContainers(missingExecutorCount) } else sendProgress() Thread.sleep(sleepTime) @@ -264,9 +264,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar } -object WorkerLauncher { +object ExecutorLauncher { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - new WorkerLauncher(args).run() + new ExecutorLauncher(args).run() } } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 8c686e393f..981e8b05f6 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -38,16 +38,16 @@ import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils} import org.apache.spark.{SparkConf, Logging} -class WorkerRunnable( +class ExecutorRunnable( container: Container, conf: Configuration, spConf: SparkConf, masterAddress: String, slaveId: String, hostname: String, - workerMemory: Int, - workerCores: Int) - extends Runnable with WorkerRunnableUtil with Logging { + executorMemory: Int, + executorCores: Int) + extends Runnable with ExecutorRunnableUtil with Logging { var rpc: YarnRPC = YarnRPC.create(conf) var cm: ContainerManager = _ @@ -55,7 +55,7 @@ class WorkerRunnable( val yarnConf: YarnConfiguration = new YarnConfiguration(conf) def run = { - logInfo("Starting Worker Container") + logInfo("Starting Executor Container") cm = connectToCM startContainer } @@ -81,8 +81,8 @@ class WorkerRunnable( credentials.writeTokenStorageToStream(dob) ctx.setContainerTokens(ByteBuffer.wrap(dob.getData())) - val commands = prepareCommand(masterAddress, slaveId, hostname, workerMemory, workerCores) - logInfo("Setting up worker with commands: " + commands) + val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores) + logInfo("Setting up executor with commands: " + commands) ctx.setCommands(commands) // Send the start request to the ContainerManager diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index e91257be8e..2056667af5 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -58,9 +58,9 @@ private[yarn] class YarnAllocationHandler( val conf: Configuration, val resourceManager: AMRMProtocol, val appAttemptId: ApplicationAttemptId, - val maxWorkers: Int, - val workerMemory: Int, - val workerCores: Int, + val maxExecutors: Int, + val executorMemory: Int, + val executorCores: Int, val preferredHostToCount: Map[String, Int], val preferredRackToCount: Map[String, Int], val sparkConf: SparkConf) @@ -84,39 +84,39 @@ private[yarn] class YarnAllocationHandler( // Containers to be released in next request to RM private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean] - private val numWorkersRunning = new AtomicInteger() - // Used to generate a unique id per worker - private val workerIdCounter = new AtomicInteger() + private val numExecutorsRunning = new AtomicInteger() + // Used to generate a unique id per executor + private val executorIdCounter = new AtomicInteger() private val lastResponseId = new AtomicInteger() - private val numWorkersFailed = new AtomicInteger() + private val numExecutorsFailed = new AtomicInteger() - def getNumWorkersRunning: Int = numWorkersRunning.intValue + def getNumExecutorsRunning: Int = numExecutorsRunning.intValue - def getNumWorkersFailed: Int = numWorkersFailed.intValue + def getNumExecutorsFailed: Int = numExecutorsFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) } - def allocateContainers(workersToRequest: Int) { + def allocateContainers(executorsToRequest: Int) { // We need to send the request only once from what I understand ... but for now, not modifying // this much. // Keep polling the Resource Manager for containers - val amResp = allocateWorkerResources(workersToRequest).getAMResponse + val amResp = allocateExecutorResources(executorsToRequest).getAMResponse val _allocatedContainers = amResp.getAllocatedContainers() if (_allocatedContainers.size > 0) { logDebug(""" Allocated containers: %d - Current worker count: %d + Current executor count: %d Containers released: %s Containers to be released: %s Cluster resources: %s """.format( _allocatedContainers.size, - numWorkersRunning.get(), + numExecutorsRunning.get(), releasedContainerList, pendingReleaseContainers, amResp.getAvailableResources)) @@ -221,59 +221,59 @@ private[yarn] class YarnAllocationHandler( // Run each of the allocated containers for (container <- allocatedContainers) { - val numWorkersRunningNow = numWorkersRunning.incrementAndGet() - val workerHostname = container.getNodeId.getHost + val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet() + val executorHostname = container.getNodeId.getHost val containerId = container.getId assert( - container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) + container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) - if (numWorkersRunningNow > maxWorkers) { + if (numExecutorsRunningNow > maxExecutors) { logInfo("""Ignoring container %s at host %s, since we already have the required number of - containers for it.""".format(containerId, workerHostname)) + containers for it.""".format(containerId, executorHostname)) releasedContainerList.add(containerId) // reset counter back to old value. - numWorkersRunning.decrementAndGet() + numExecutorsRunning.decrementAndGet() } else { // Deallocate + allocate can result in reusing id's wrongly - so use a different counter - // (workerIdCounter) - val workerId = workerIdCounter.incrementAndGet().toString + // (executorIdCounter) + val executorId = executorIdCounter.incrementAndGet().toString val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) - logInfo("launching container on " + containerId + " host " + workerHostname) + logInfo("launching container on " + containerId + " host " + executorHostname) // Just to be safe, simply remove it from pendingReleaseContainers. // Should not be there, but .. pendingReleaseContainers.remove(containerId) - val rack = YarnAllocationHandler.lookupRack(conf, workerHostname) + val rack = YarnAllocationHandler.lookupRack(conf, executorHostname) allocatedHostToContainersMap.synchronized { - val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname, + val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, new HashSet[ContainerId]()) containerSet += containerId - allocatedContainerToHostMap.put(containerId, workerHostname) + allocatedContainerToHostMap.put(containerId, executorHostname) if (rack != null) { allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1) } } new Thread( - new WorkerRunnable(container, conf, sparkConf, driverUrl, workerId, - workerHostname, workerMemory, workerCores) + new ExecutorRunnable(container, conf, sparkConf, driverUrl, executorId, + executorHostname, executorMemory, executorCores) ).start() } } logDebug(""" Finished processing %d containers. - Current number of workers running: %d, + Current number of executors running: %d, releasedContainerList: %s, pendingReleaseContainers: %s """.format( allocatedContainers.size, - numWorkersRunning.get(), + numExecutorsRunning.get(), releasedContainerList, pendingReleaseContainers)) } @@ -292,7 +292,7 @@ private[yarn] class YarnAllocationHandler( } else { // Simply decrement count - next iteration of ReporterThread will take care of allocating. - numWorkersRunning.decrementAndGet() + numExecutorsRunning.decrementAndGet() logInfo("Completed container %s (state: %s, exit status: %s)".format( containerId, completedContainer.getState, @@ -302,7 +302,7 @@ private[yarn] class YarnAllocationHandler( // now I think its ok as none of the containers are expected to exit if (completedContainer.getExitStatus() != 0) { logInfo("Container marked as failed: " + containerId) - numWorkersFailed.incrementAndGet() + numExecutorsFailed.incrementAndGet() } } @@ -332,12 +332,12 @@ private[yarn] class YarnAllocationHandler( } logDebug(""" Finished processing %d completed containers. - Current number of workers running: %d, + Current number of executors running: %d, releasedContainerList: %s, pendingReleaseContainers: %s """.format( completedContainers.size, - numWorkersRunning.get(), + numExecutorsRunning.get(), releasedContainerList, pendingReleaseContainers)) } @@ -387,18 +387,18 @@ private[yarn] class YarnAllocationHandler( retval } - private def allocateWorkerResources(numWorkers: Int): AllocateResponse = { + private def allocateExecutorResources(numExecutors: Int): AllocateResponse = { var resourceRequests: List[ResourceRequest] = null // default. - if (numWorkers <= 0 || preferredHostToCount.isEmpty) { - logDebug("numWorkers: " + numWorkers + ", host preferences: " + preferredHostToCount.isEmpty) + if (numExecutors <= 0 || preferredHostToCount.isEmpty) { + logDebug("numExecutors: " + numExecutors + ", host preferences: " + preferredHostToCount.isEmpty) resourceRequests = List( - createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY)) + createResourceRequest(AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY)) } else { - // request for all hosts in preferred nodes and for numWorkers - + // request for all hosts in preferred nodes and for numExecutors - // candidates.size, request by default allocation policy. val hostContainerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](preferredHostToCount.size) @@ -419,7 +419,7 @@ private[yarn] class YarnAllocationHandler( val anyContainerRequests: ResourceRequest = createResourceRequest( AllocationType.ANY, resource = null, - numWorkers, + numExecutors, YarnAllocationHandler.PRIORITY) val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest]( @@ -441,9 +441,9 @@ private[yarn] class YarnAllocationHandler( val releasedContainerList = createReleasedContainerList() req.addAllReleases(releasedContainerList) - if (numWorkers > 0) { - logInfo("Allocating %d worker containers with %d of memory each.".format(numWorkers, - workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) + if (numExecutors > 0) { + logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors, + executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) } else { logDebug("Empty allocation req .. release : " + releasedContainerList) @@ -464,7 +464,7 @@ private[yarn] class YarnAllocationHandler( private def createResourceRequest( requestType: AllocationType.AllocationType, resource:String, - numWorkers: Int, + numExecutors: Int, priority: Int): ResourceRequest = { // If hostname specified, we need atleast two requests - node local and rack local. @@ -473,7 +473,7 @@ private[yarn] class YarnAllocationHandler( case AllocationType.HOST => { assert(YarnAllocationHandler.ANY_HOST != resource) val hostname = resource - val nodeLocal = createResourceRequestImpl(hostname, numWorkers, priority) + val nodeLocal = createResourceRequestImpl(hostname, numExecutors, priority) // Add to host->rack mapping YarnAllocationHandler.populateRackInfo(conf, hostname) @@ -482,10 +482,10 @@ private[yarn] class YarnAllocationHandler( } case AllocationType.RACK => { val rack = resource - createResourceRequestImpl(rack, numWorkers, priority) + createResourceRequestImpl(rack, numExecutors, priority) } case AllocationType.ANY => createResourceRequestImpl( - YarnAllocationHandler.ANY_HOST, numWorkers, priority) + YarnAllocationHandler.ANY_HOST, numExecutors, priority) case _ => throw new IllegalArgumentException( "Unexpected/unsupported request type: " + requestType) } @@ -493,13 +493,13 @@ private[yarn] class YarnAllocationHandler( private def createResourceRequestImpl( hostname:String, - numWorkers: Int, + numExecutors: Int, priority: Int): ResourceRequest = { val rsrcRequest = Records.newRecord(classOf[ResourceRequest]) val memCapability = Records.newRecord(classOf[Resource]) // There probably is some overhead here, let's reserve a bit more memory. - memCapability.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + memCapability.setMemory(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) rsrcRequest.setCapability(memCapability) val pri = Records.newRecord(classOf[Priority]) @@ -508,7 +508,7 @@ private[yarn] class YarnAllocationHandler( rsrcRequest.setHostName(hostname) - rsrcRequest.setNumContainers(java.lang.Math.max(numWorkers, 0)) + rsrcRequest.setNumContainers(java.lang.Math.max(numExecutors, 0)) rsrcRequest } @@ -560,9 +560,9 @@ object YarnAllocationHandler { conf, resourceManager, appAttemptId, - args.numWorkers, - args.workerMemory, - args.workerCores, + args.numExecutors, + args.executorMemory, + args.executorCores, Map[String, Int](), Map[String, Int](), sparkConf) @@ -582,9 +582,9 @@ object YarnAllocationHandler { conf, resourceManager, appAttemptId, - args.numWorkers, - args.workerMemory, - args.workerCores, + args.numExecutors, + args.executorMemory, + args.executorCores, hostToCount, rackToCount, sparkConf) @@ -594,9 +594,9 @@ object YarnAllocationHandler { conf: Configuration, resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId, - maxWorkers: Int, - workerMemory: Int, - workerCores: Int, + maxExecutors: Int, + executorMemory: Int, + executorCores: Int, map: collection.Map[String, collection.Set[SplitInfo]], sparkConf: SparkConf): YarnAllocationHandler = { @@ -606,9 +606,9 @@ object YarnAllocationHandler { conf, resourceManager, appAttemptId, - maxWorkers, - workerMemory, - workerCores, + maxExecutors, + executorMemory, + executorCores, hostToCount, rackToCount, sparkConf) |