diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-03-13 12:11:33 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-03-13 12:11:33 -0700 |
commit | 698373211ef3cdf841c82d48168cd5dbe00a57b4 (patch) | |
tree | a07edbe4835a7b01aa48cf9bd35c0d6939d21d78 /yarn/stable | |
parent | e4e8d8f395aea48f0cae00d7c381a863c48a2837 (diff) | |
download | spark-698373211ef3cdf841c82d48168cd5dbe00a57b4.tar.gz spark-698373211ef3cdf841c82d48168cd5dbe00a57b4.tar.bz2 spark-698373211ef3cdf841c82d48168cd5dbe00a57b4.zip |
SPARK-1183. Don't use "worker" to mean executor
Author: Sandy Ryza <sandy@cloudera.com>
Closes #120 from sryza/sandy-spark-1183 and squashes the following commits:
5066a4a [Sandy Ryza] Remove "worker" in a couple comments
0bd1e46 [Sandy Ryza] Remove --am-class from usage
bfc8fe0 [Sandy Ryza] Remove am-class from doc and fix yarn-alpha
607539f [Sandy Ryza] Address review comments
74d087a [Sandy Ryza] SPARK-1183. Don't use "worker" to mean executor
Diffstat (limited to 'yarn/stable')
-rw-r--r-- | yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 38 | ||||
-rw-r--r-- | yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala (renamed from yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala) | 26 | ||||
-rw-r--r-- | yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala (renamed from yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala) | 14 | ||||
-rw-r--r-- | yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 138 |
4 files changed, 108 insertions, 108 deletions
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 57d1577429..30735cbfdf 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -64,9 +64,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private var isLastAMRetry: Boolean = true private var amClient: AMRMClient[ContainerRequest] = _ - // Default to numWorkers * 2, with minimum of 3 - private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", - math.max(args.numWorkers * 2, 3)) + // Default to numExecutors * 2, with minimum of 3 + private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", + sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) private var registered = false @@ -101,7 +101,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // Call this to force generation of secret so it gets populated into the // hadoop UGI. This has to happen before the startUserClass which does a - // doAs in order for the credentials to be passed on to the worker containers. + // doAs in order for the credentials to be passed on to the executor containers. val securityMgr = new SecurityManager(sparkConf) // Start the user's JAR @@ -120,7 +120,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } // Allocate all containers - allocateWorkers() + allocateExecutors() // Wait for the user class to Finish userThread.join() @@ -202,7 +202,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, t } - // This need to happen before allocateWorkers() + // This need to happen before allocateExecutors() private def waitForSparkContextInitialized() { logInfo("Waiting for Spark context initialization") try { @@ -247,18 +247,18 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } } - private def allocateWorkers() { + private def allocateExecutors() { try { - logInfo("Allocating " + args.numWorkers + " workers.") + logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished // TODO: This is a bit ugly. Can we make it nicer? // TODO: Handle container failure - yarnAllocator.addResourceRequests(args.numWorkers) + yarnAllocator.addResourceRequests(args.numExecutors) // Exits the loop if the user thread exits. - while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) { - if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) { + if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of worker failures reached") + "max number of executor failures reached") } yarnAllocator.allocateResources() ApplicationMaster.incrementAllocatorLoop(1) @@ -269,7 +269,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // so that the loop in ApplicationMaster#sparkContextInitialized() breaks. ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) } - logInfo("All workers have launched.") + logInfo("All executors have launched.") // Launch a progress reporter thread, else the app will get killed after expiration // (def: 10mins) timeout. @@ -294,16 +294,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, val t = new Thread { override def run() { while (userThread.isAlive) { - if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of worker failures reached") + "max number of executor failures reached") } - val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning - + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - yarnAllocator.getNumPendingAllocate - if (missingWorkerCount > 0) { + if (missingExecutorCount > 0) { logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingWorkerCount)) - yarnAllocator.addResourceRequests(missingWorkerCount) + format(missingExecutorCount)) + yarnAllocator.addResourceRequests(missingExecutorCount) } sendProgress() Thread.sleep(sleepTime) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index f1c1fea0b5..b697f10391 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -35,7 +35,7 @@ import org.apache.spark.scheduler.SplitInfo import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) +class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging { def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = @@ -93,7 +93,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar waitForSparkMaster() // Allocate all containers - allocateWorkers() + allocateExecutors() // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. @@ -175,7 +175,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar } - private def allocateWorkers() { + private def allocateExecutors() { // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now. val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = @@ -189,18 +189,18 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar preferredNodeLocationData, sparkConf) - logInfo("Allocating " + args.numWorkers + " workers.") + logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished // TODO: This is a bit ugly. Can we make it nicer? // TODO: Handle container failure - yarnAllocator.addResourceRequests(args.numWorkers) - while ((yarnAllocator.getNumWorkersRunning < args.numWorkers) && (!driverClosed)) { + yarnAllocator.addResourceRequests(args.numExecutors) + while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { yarnAllocator.allocateResources() Thread.sleep(100) } - logInfo("All workers have launched.") + logInfo("All executors have launched.") } @@ -211,12 +211,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar val t = new Thread { override def run() { while (!driverClosed) { - val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning - + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - yarnAllocator.getNumPendingAllocate - if (missingWorkerCount > 0) { + if (missingExecutorCount > 0) { logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingWorkerCount)) - yarnAllocator.addResourceRequests(missingWorkerCount) + format(missingExecutorCount)) + yarnAllocator.addResourceRequests(missingExecutorCount) } sendProgress() Thread.sleep(sleepTime) @@ -244,9 +244,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar } -object WorkerLauncher { +object ExecutorLauncher { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - new WorkerLauncher(args).run() + new ExecutorLauncher(args).run() } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index ab4a79be70..53c403f7d0 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -38,16 +38,16 @@ import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records} import org.apache.spark.{SparkConf, Logging} -class WorkerRunnable( +class ExecutorRunnable( container: Container, conf: Configuration, spConf: SparkConf, masterAddress: String, slaveId: String, hostname: String, - workerMemory: Int, - workerCores: Int) - extends Runnable with WorkerRunnableUtil with Logging { + executorMemory: Int, + executorCores: Int) + extends Runnable with ExecutorRunnableUtil with Logging { var rpc: YarnRPC = YarnRPC.create(conf) var nmClient: NMClient = _ @@ -55,7 +55,7 @@ class WorkerRunnable( val yarnConf: YarnConfiguration = new YarnConfiguration(conf) def run = { - logInfo("Starting Worker Container") + logInfo("Starting Executor Container") nmClient = NMClient.createNMClient() nmClient.init(yarnConf) nmClient.start() @@ -78,9 +78,9 @@ class WorkerRunnable( credentials.writeTokenStorageToStream(dob) ctx.setTokens(ByteBuffer.wrap(dob.getData())) - val commands = prepareCommand(masterAddress, slaveId, hostname, workerMemory, workerCores) + val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores) - logInfo("Setting up worker with commands: " + commands) + logInfo("Setting up executor with commands: " + commands) ctx.setCommands(commands) // Send the start request to the ContainerManager diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 1ac61124cb..e31c4060e8 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -60,9 +60,9 @@ private[yarn] class YarnAllocationHandler( val conf: Configuration, val amClient: AMRMClient[ContainerRequest], val appAttemptId: ApplicationAttemptId, - val maxWorkers: Int, - val workerMemory: Int, - val workerCores: Int, + val maxExecutors: Int, + val executorMemory: Int, + val executorCores: Int, val preferredHostToCount: Map[String, Int], val preferredRackToCount: Map[String, Int], val sparkConf: SparkConf) @@ -89,20 +89,20 @@ private[yarn] class YarnAllocationHandler( // Number of container requests that have been sent to, but not yet allocated by the // ApplicationMaster. private val numPendingAllocate = new AtomicInteger() - private val numWorkersRunning = new AtomicInteger() - // Used to generate a unique id per worker - private val workerIdCounter = new AtomicInteger() + private val numExecutorsRunning = new AtomicInteger() + // Used to generate a unique id per executor + private val executorIdCounter = new AtomicInteger() private val lastResponseId = new AtomicInteger() - private val numWorkersFailed = new AtomicInteger() + private val numExecutorsFailed = new AtomicInteger() def getNumPendingAllocate: Int = numPendingAllocate.intValue - def getNumWorkersRunning: Int = numWorkersRunning.intValue + def getNumExecutorsRunning: Int = numExecutorsRunning.intValue - def getNumWorkersFailed: Int = numWorkersFailed.intValue + def getNumExecutorsFailed: Int = numExecutorsFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) } def releaseContainer(container: Container) { @@ -127,13 +127,13 @@ private[yarn] class YarnAllocationHandler( logDebug(""" Allocated containers: %d - Current worker count: %d + Current executor count: %d Containers released: %s Containers to-be-released: %s Cluster resources: %s """.format( allocatedContainers.size, - numWorkersRunning.get(), + numExecutorsRunning.get(), releasedContainerList, pendingReleaseContainers, allocateResponse.getAvailableResources)) @@ -240,64 +240,64 @@ private[yarn] class YarnAllocationHandler( // Run each of the allocated containers. for (container <- allocatedContainersToProcess) { - val numWorkersRunningNow = numWorkersRunning.incrementAndGet() - val workerHostname = container.getNodeId.getHost + val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet() + val executorHostname = container.getNodeId.getHost val containerId = container.getId - val workerMemoryOverhead = (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) - assert(container.getResource.getMemory >= workerMemoryOverhead) + val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + assert(container.getResource.getMemory >= executorMemoryOverhead) - if (numWorkersRunningNow > maxWorkers) { + if (numExecutorsRunningNow > maxExecutors) { logInfo("""Ignoring container %s at host %s, since we already have the required number of - containers for it.""".format(containerId, workerHostname)) + containers for it.""".format(containerId, executorHostname)) releaseContainer(container) - numWorkersRunning.decrementAndGet() + numExecutorsRunning.decrementAndGet() } else { - val workerId = workerIdCounter.incrementAndGet().toString + val executorId = executorIdCounter.incrementAndGet().toString val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) - logInfo("Launching container %s for on host %s".format(containerId, workerHostname)) + logInfo("Launching container %s for on host %s".format(containerId, executorHostname)) // To be safe, remove the container from `pendingReleaseContainers`. pendingReleaseContainers.remove(containerId) - val rack = YarnAllocationHandler.lookupRack(conf, workerHostname) + val rack = YarnAllocationHandler.lookupRack(conf, executorHostname) allocatedHostToContainersMap.synchronized { - val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname, + val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, new HashSet[ContainerId]()) containerSet += containerId - allocatedContainerToHostMap.put(containerId, workerHostname) + allocatedContainerToHostMap.put(containerId, executorHostname) if (rack != null) { allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1) } } - logInfo("Launching WorkerRunnable. driverUrl: %s, workerHostname: %s".format(driverUrl, workerHostname)) - val workerRunnable = new WorkerRunnable( + logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(driverUrl, executorHostname)) + val executorRunnable = new ExecutorRunnable( container, conf, sparkConf, driverUrl, - workerId, - workerHostname, - workerMemory, - workerCores) - new Thread(workerRunnable).start() + executorId, + executorHostname, + executorMemory, + executorCores) + new Thread(executorRunnable).start() } } logDebug(""" Finished allocating %s containers (from %s originally). - Current number of workers running: %d, + Current number of executors running: %d, releasedContainerList: %s, pendingReleaseContainers: %s """.format( allocatedContainersToProcess, allocatedContainers, - numWorkersRunning.get(), + numExecutorsRunning.get(), releasedContainerList, pendingReleaseContainers)) } @@ -314,9 +314,9 @@ private[yarn] class YarnAllocationHandler( // `pendingReleaseContainers`. pendingReleaseContainers.remove(containerId) } else { - // Decrement the number of workers running. The next iteration of the ApplicationMaster's + // Decrement the number of executors running. The next iteration of the ApplicationMaster's // reporting thread will take care of allocating. - numWorkersRunning.decrementAndGet() + numExecutorsRunning.decrementAndGet() logInfo("Completed container %s (state: %s, exit status: %s)".format( containerId, completedContainer.getState, @@ -326,7 +326,7 @@ private[yarn] class YarnAllocationHandler( // now I think its ok as none of the containers are expected to exit if (completedContainer.getExitStatus() != 0) { logInfo("Container marked as failed: " + containerId) - numWorkersFailed.incrementAndGet() + numExecutorsFailed.incrementAndGet() } } @@ -364,12 +364,12 @@ private[yarn] class YarnAllocationHandler( } logDebug(""" Finished processing %d completed containers. - Current number of workers running: %d, + Current number of executors running: %d, releasedContainerList: %s, pendingReleaseContainers: %s """.format( completedContainers.size, - numWorkersRunning.get(), + numExecutorsRunning.get(), releasedContainerList, pendingReleaseContainers)) } @@ -421,18 +421,18 @@ private[yarn] class YarnAllocationHandler( retval } - def addResourceRequests(numWorkers: Int) { + def addResourceRequests(numExecutors: Int) { val containerRequests: List[ContainerRequest] = - if (numWorkers <= 0 || preferredHostToCount.isEmpty) { - logDebug("numWorkers: " + numWorkers + ", host preferences: " + + if (numExecutors <= 0 || preferredHostToCount.isEmpty) { + logDebug("numExecutors: " + numExecutors + ", host preferences: " + preferredHostToCount.isEmpty) createResourceRequests( AllocationType.ANY, resource = null, - numWorkers, + numExecutors, YarnAllocationHandler.PRIORITY).toList } else { - // Request for all hosts in preferred nodes and for numWorkers - + // Request for all hosts in preferred nodes and for numExecutors - // candidates.size, request by default allocation policy. val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size) for ((candidateHost, candidateCount) <- preferredHostToCount) { @@ -452,7 +452,7 @@ private[yarn] class YarnAllocationHandler( val anyContainerRequests = createResourceRequests( AllocationType.ANY, resource = null, - numWorkers, + numExecutors, YarnAllocationHandler.PRIORITY) val containerRequestBuffer = new ArrayBuffer[ContainerRequest]( @@ -468,11 +468,11 @@ private[yarn] class YarnAllocationHandler( amClient.addContainerRequest(request) } - if (numWorkers > 0) { - numPendingAllocate.addAndGet(numWorkers) - logInfo("Will Allocate %d worker containers, each with %d memory".format( - numWorkers, - (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))) + if (numExecutors > 0) { + numPendingAllocate.addAndGet(numExecutors) + logInfo("Will Allocate %d executor containers, each with %d memory".format( + numExecutors, + (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))) } else { logDebug("Empty allocation request ...") } @@ -494,7 +494,7 @@ private[yarn] class YarnAllocationHandler( private def createResourceRequests( requestType: AllocationType.AllocationType, resource: String, - numWorkers: Int, + numExecutors: Int, priority: Int ): ArrayBuffer[ContainerRequest] = { @@ -507,7 +507,7 @@ private[yarn] class YarnAllocationHandler( val nodeLocal = constructContainerRequests( Array(hostname), racks = null, - numWorkers, + numExecutors, priority) // Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler. @@ -516,10 +516,10 @@ private[yarn] class YarnAllocationHandler( } case AllocationType.RACK => { val rack = resource - constructContainerRequests(hosts = null, Array(rack), numWorkers, priority) + constructContainerRequests(hosts = null, Array(rack), numExecutors, priority) } case AllocationType.ANY => constructContainerRequests( - hosts = null, racks = null, numWorkers, priority) + hosts = null, racks = null, numExecutors, priority) case _ => throw new IllegalArgumentException( "Unexpected/unsupported request type: " + requestType) } @@ -528,18 +528,18 @@ private[yarn] class YarnAllocationHandler( private def constructContainerRequests( hosts: Array[String], racks: Array[String], - numWorkers: Int, + numExecutors: Int, priority: Int ): ArrayBuffer[ContainerRequest] = { - val memoryRequest = workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD - val resource = Resource.newInstance(memoryRequest, workerCores) + val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val resource = Resource.newInstance(memoryRequest, executorCores) val prioritySetting = Records.newRecord(classOf[Priority]) prioritySetting.setPriority(priority) val requests = new ArrayBuffer[ContainerRequest]() - for (i <- 0 until numWorkers) { + for (i <- 0 until numExecutors) { requests += new ContainerRequest(resource, hosts, racks, prioritySetting) } requests @@ -574,9 +574,9 @@ object YarnAllocationHandler { conf, amClient, appAttemptId, - args.numWorkers, - args.workerMemory, - args.workerCores, + args.numExecutors, + args.executorMemory, + args.executorCores, Map[String, Int](), Map[String, Int](), sparkConf) @@ -596,9 +596,9 @@ object YarnAllocationHandler { conf, amClient, appAttemptId, - args.numWorkers, - args.workerMemory, - args.workerCores, + args.numExecutors, + args.executorMemory, + args.executorCores, hostToSplitCount, rackToSplitCount, sparkConf) @@ -608,9 +608,9 @@ object YarnAllocationHandler { conf: Configuration, amClient: AMRMClient[ContainerRequest], appAttemptId: ApplicationAttemptId, - maxWorkers: Int, - workerMemory: Int, - workerCores: Int, + maxExecutors: Int, + executorMemory: Int, + executorCores: Int, map: collection.Map[String, collection.Set[SplitInfo]], sparkConf: SparkConf ): YarnAllocationHandler = { @@ -619,9 +619,9 @@ object YarnAllocationHandler { conf, amClient, appAttemptId, - maxWorkers, - workerMemory, - workerCores, + maxExecutors, + executorMemory, + executorCores, hostToCount, rackToCount, sparkConf) |