diff options
Diffstat (limited to 'yarn')
15 files changed, 289 insertions, 270 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 87785cdc60..910484ed54 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -61,9 +61,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) private var isLastAMRetry: Boolean = true - // Default to numWorkers * 2, with minimum of 3 - private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", - math.max(args.numWorkers * 2, 3)) + // Default to numExecutors * 2, with minimum of 3 + private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", + sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) private var registered = false @@ -96,7 +96,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // Call this to force generation of secret so it gets populated into the // hadoop UGI. This has to happen before the startUserClass which does a - // doAs in order for the credentials to be passed on to the worker containers. + // doAs in order for the credentials to be passed on to the executor containers. val securityMgr = new SecurityManager(sparkConf) // Start the user's JAR @@ -115,7 +115,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } // Allocate all containers - allocateWorkers() + allocateExecutors() // Wait for the user class to Finish userThread.join() @@ -215,7 +215,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, t } - // this need to happen before allocateWorkers + // this need to happen before allocateExecutors private def waitForSparkContextInitialized() { logInfo("Waiting for spark context initialization") try { @@ -260,21 +260,21 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } } - private def allocateWorkers() { + private def allocateExecutors() { try { - logInfo("Allocating " + args.numWorkers + " workers.") + logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished // TODO: This is a bit ugly. Can we make it nicer? // TODO: Handle container failure // Exists the loop if the user thread exits. - while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) { - if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) { + if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of worker failures reached") + "max number of executor failures reached") } yarnAllocator.allocateContainers( - math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)) + math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0)) ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) } @@ -283,7 +283,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // so that the loop in ApplicationMaster#sparkContextInitialized() breaks. ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) } - logInfo("All workers have launched.") + logInfo("All executors have launched.") // Launch a progress reporter thread, else the app will get killed after expiration // (def: 10mins) timeout. @@ -309,15 +309,15 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, val t = new Thread { override def run() { while (userThread.isAlive) { - if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of worker failures reached") + "max number of executor failures reached") } - val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning - if (missingWorkerCount > 0) { + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning + if (missingExecutorCount > 0) { logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingWorkerCount)) - yarnAllocator.allocateContainers(missingWorkerCount) + format(missingExecutorCount)) + yarnAllocator.allocateContainers(missingExecutorCount) } else sendProgress() Thread.sleep(sleepTime) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index b735d01df8..7b0e020263 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -34,7 +34,7 @@ import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo -class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) +class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging { def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf) @@ -89,7 +89,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() if (minimumMemory > 0) { - val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) if (numCore > 0) { @@ -102,7 +102,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar waitForSparkMaster() // Allocate all containers - allocateWorkers() + allocateExecutors() // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. @@ -199,7 +199,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar } - private def allocateWorkers() { + private def allocateExecutors() { // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now. val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = @@ -208,16 +208,16 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, preferredNodeLocationData, sparkConf) - logInfo("Allocating " + args.numWorkers + " workers.") + logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished // TODO: This is a bit ugly. Can we make it nicer? // TODO: Handle container failure - while ((yarnAllocator.getNumWorkersRunning < args.numWorkers) && (!driverClosed)) { - yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)) + while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { + yarnAllocator.allocateContainers(math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0)) Thread.sleep(100) } - logInfo("All workers have launched.") + logInfo("All executors have launched.") } @@ -228,10 +228,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar val t = new Thread { override def run() { while (!driverClosed) { - val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning - if (missingWorkerCount > 0) { - logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers") - yarnAllocator.allocateContainers(missingWorkerCount) + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning + if (missingExecutorCount > 0) { + logInfo("Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers") + yarnAllocator.allocateContainers(missingExecutorCount) } else sendProgress() Thread.sleep(sleepTime) @@ -264,9 +264,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar } -object WorkerLauncher { +object ExecutorLauncher { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - new WorkerLauncher(args).run() + new ExecutorLauncher(args).run() } } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 8c686e393f..981e8b05f6 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -38,16 +38,16 @@ import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils} import org.apache.spark.{SparkConf, Logging} -class WorkerRunnable( +class ExecutorRunnable( container: Container, conf: Configuration, spConf: SparkConf, masterAddress: String, slaveId: String, hostname: String, - workerMemory: Int, - workerCores: Int) - extends Runnable with WorkerRunnableUtil with Logging { + executorMemory: Int, + executorCores: Int) + extends Runnable with ExecutorRunnableUtil with Logging { var rpc: YarnRPC = YarnRPC.create(conf) var cm: ContainerManager = _ @@ -55,7 +55,7 @@ class WorkerRunnable( val yarnConf: YarnConfiguration = new YarnConfiguration(conf) def run = { - logInfo("Starting Worker Container") + logInfo("Starting Executor Container") cm = connectToCM startContainer } @@ -81,8 +81,8 @@ class WorkerRunnable( credentials.writeTokenStorageToStream(dob) ctx.setContainerTokens(ByteBuffer.wrap(dob.getData())) - val commands = prepareCommand(masterAddress, slaveId, hostname, workerMemory, workerCores) - logInfo("Setting up worker with commands: " + commands) + val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores) + logInfo("Setting up executor with commands: " + commands) ctx.setCommands(commands) // Send the start request to the ContainerManager diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index e91257be8e..2056667af5 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -58,9 +58,9 @@ private[yarn] class YarnAllocationHandler( val conf: Configuration, val resourceManager: AMRMProtocol, val appAttemptId: ApplicationAttemptId, - val maxWorkers: Int, - val workerMemory: Int, - val workerCores: Int, + val maxExecutors: Int, + val executorMemory: Int, + val executorCores: Int, val preferredHostToCount: Map[String, Int], val preferredRackToCount: Map[String, Int], val sparkConf: SparkConf) @@ -84,39 +84,39 @@ private[yarn] class YarnAllocationHandler( // Containers to be released in next request to RM private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean] - private val numWorkersRunning = new AtomicInteger() - // Used to generate a unique id per worker - private val workerIdCounter = new AtomicInteger() + private val numExecutorsRunning = new AtomicInteger() + // Used to generate a unique id per executor + private val executorIdCounter = new AtomicInteger() private val lastResponseId = new AtomicInteger() - private val numWorkersFailed = new AtomicInteger() + private val numExecutorsFailed = new AtomicInteger() - def getNumWorkersRunning: Int = numWorkersRunning.intValue + def getNumExecutorsRunning: Int = numExecutorsRunning.intValue - def getNumWorkersFailed: Int = numWorkersFailed.intValue + def getNumExecutorsFailed: Int = numExecutorsFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) } - def allocateContainers(workersToRequest: Int) { + def allocateContainers(executorsToRequest: Int) { // We need to send the request only once from what I understand ... but for now, not modifying // this much. // Keep polling the Resource Manager for containers - val amResp = allocateWorkerResources(workersToRequest).getAMResponse + val amResp = allocateExecutorResources(executorsToRequest).getAMResponse val _allocatedContainers = amResp.getAllocatedContainers() if (_allocatedContainers.size > 0) { logDebug(""" Allocated containers: %d - Current worker count: %d + Current executor count: %d Containers released: %s Containers to be released: %s Cluster resources: %s """.format( _allocatedContainers.size, - numWorkersRunning.get(), + numExecutorsRunning.get(), releasedContainerList, pendingReleaseContainers, amResp.getAvailableResources)) @@ -221,59 +221,59 @@ private[yarn] class YarnAllocationHandler( // Run each of the allocated containers for (container <- allocatedContainers) { - val numWorkersRunningNow = numWorkersRunning.incrementAndGet() - val workerHostname = container.getNodeId.getHost + val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet() + val executorHostname = container.getNodeId.getHost val containerId = container.getId assert( - container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) + container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) - if (numWorkersRunningNow > maxWorkers) { + if (numExecutorsRunningNow > maxExecutors) { logInfo("""Ignoring container %s at host %s, since we already have the required number of - containers for it.""".format(containerId, workerHostname)) + containers for it.""".format(containerId, executorHostname)) releasedContainerList.add(containerId) // reset counter back to old value. - numWorkersRunning.decrementAndGet() + numExecutorsRunning.decrementAndGet() } else { // Deallocate + allocate can result in reusing id's wrongly - so use a different counter - // (workerIdCounter) - val workerId = workerIdCounter.incrementAndGet().toString + // (executorIdCounter) + val executorId = executorIdCounter.incrementAndGet().toString val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) - logInfo("launching container on " + containerId + " host " + workerHostname) + logInfo("launching container on " + containerId + " host " + executorHostname) // Just to be safe, simply remove it from pendingReleaseContainers. // Should not be there, but .. pendingReleaseContainers.remove(containerId) - val rack = YarnAllocationHandler.lookupRack(conf, workerHostname) + val rack = YarnAllocationHandler.lookupRack(conf, executorHostname) allocatedHostToContainersMap.synchronized { - val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname, + val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, new HashSet[ContainerId]()) containerSet += containerId - allocatedContainerToHostMap.put(containerId, workerHostname) + allocatedContainerToHostMap.put(containerId, executorHostname) if (rack != null) { allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1) } } new Thread( - new WorkerRunnable(container, conf, sparkConf, driverUrl, workerId, - workerHostname, workerMemory, workerCores) + new ExecutorRunnable(container, conf, sparkConf, driverUrl, executorId, + executorHostname, executorMemory, executorCores) ).start() } } logDebug(""" Finished processing %d containers. - Current number of workers running: %d, + Current number of executors running: %d, releasedContainerList: %s, pendingReleaseContainers: %s """.format( allocatedContainers.size, - numWorkersRunning.get(), + numExecutorsRunning.get(), releasedContainerList, pendingReleaseContainers)) } @@ -292,7 +292,7 @@ private[yarn] class YarnAllocationHandler( } else { // Simply decrement count - next iteration of ReporterThread will take care of allocating. - numWorkersRunning.decrementAndGet() + numExecutorsRunning.decrementAndGet() logInfo("Completed container %s (state: %s, exit status: %s)".format( containerId, completedContainer.getState, @@ -302,7 +302,7 @@ private[yarn] class YarnAllocationHandler( // now I think its ok as none of the containers are expected to exit if (completedContainer.getExitStatus() != 0) { logInfo("Container marked as failed: " + containerId) - numWorkersFailed.incrementAndGet() + numExecutorsFailed.incrementAndGet() } } @@ -332,12 +332,12 @@ private[yarn] class YarnAllocationHandler( } logDebug(""" Finished processing %d completed containers. - Current number of workers running: %d, + Current number of executors running: %d, releasedContainerList: %s, pendingReleaseContainers: %s """.format( completedContainers.size, - numWorkersRunning.get(), + numExecutorsRunning.get(), releasedContainerList, pendingReleaseContainers)) } @@ -387,18 +387,18 @@ private[yarn] class YarnAllocationHandler( retval } - private def allocateWorkerResources(numWorkers: Int): AllocateResponse = { + private def allocateExecutorResources(numExecutors: Int): AllocateResponse = { var resourceRequests: List[ResourceRequest] = null // default. - if (numWorkers <= 0 || preferredHostToCount.isEmpty) { - logDebug("numWorkers: " + numWorkers + ", host preferences: " + preferredHostToCount.isEmpty) + if (numExecutors <= 0 || preferredHostToCount.isEmpty) { + logDebug("numExecutors: " + numExecutors + ", host preferences: " + preferredHostToCount.isEmpty) resourceRequests = List( - createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY)) + createResourceRequest(AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY)) } else { - // request for all hosts in preferred nodes and for numWorkers - + // request for all hosts in preferred nodes and for numExecutors - // candidates.size, request by default allocation policy. val hostContainerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](preferredHostToCount.size) @@ -419,7 +419,7 @@ private[yarn] class YarnAllocationHandler( val anyContainerRequests: ResourceRequest = createResourceRequest( AllocationType.ANY, resource = null, - numWorkers, + numExecutors, YarnAllocationHandler.PRIORITY) val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest]( @@ -441,9 +441,9 @@ private[yarn] class YarnAllocationHandler( val releasedContainerList = createReleasedContainerList() req.addAllReleases(releasedContainerList) - if (numWorkers > 0) { - logInfo("Allocating %d worker containers with %d of memory each.".format(numWorkers, - workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) + if (numExecutors > 0) { + logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors, + executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) } else { logDebug("Empty allocation req .. release : " + releasedContainerList) @@ -464,7 +464,7 @@ private[yarn] class YarnAllocationHandler( private def createResourceRequest( requestType: AllocationType.AllocationType, resource:String, - numWorkers: Int, + numExecutors: Int, priority: Int): ResourceRequest = { // If hostname specified, we need atleast two requests - node local and rack local. @@ -473,7 +473,7 @@ private[yarn] class YarnAllocationHandler( case AllocationType.HOST => { assert(YarnAllocationHandler.ANY_HOST != resource) val hostname = resource - val nodeLocal = createResourceRequestImpl(hostname, numWorkers, priority) + val nodeLocal = createResourceRequestImpl(hostname, numExecutors, priority) // Add to host->rack mapping YarnAllocationHandler.populateRackInfo(conf, hostname) @@ -482,10 +482,10 @@ private[yarn] class YarnAllocationHandler( } case AllocationType.RACK => { val rack = resource - createResourceRequestImpl(rack, numWorkers, priority) + createResourceRequestImpl(rack, numExecutors, priority) } case AllocationType.ANY => createResourceRequestImpl( - YarnAllocationHandler.ANY_HOST, numWorkers, priority) + YarnAllocationHandler.ANY_HOST, numExecutors, priority) case _ => throw new IllegalArgumentException( "Unexpected/unsupported request type: " + requestType) } @@ -493,13 +493,13 @@ private[yarn] class YarnAllocationHandler( private def createResourceRequestImpl( hostname:String, - numWorkers: Int, + numExecutors: Int, priority: Int): ResourceRequest = { val rsrcRequest = Records.newRecord(classOf[ResourceRequest]) val memCapability = Records.newRecord(classOf[Resource]) // There probably is some overhead here, let's reserve a bit more memory. - memCapability.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + memCapability.setMemory(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) rsrcRequest.setCapability(memCapability) val pri = Records.newRecord(classOf[Priority]) @@ -508,7 +508,7 @@ private[yarn] class YarnAllocationHandler( rsrcRequest.setHostName(hostname) - rsrcRequest.setNumContainers(java.lang.Math.max(numWorkers, 0)) + rsrcRequest.setNumContainers(java.lang.Math.max(numExecutors, 0)) rsrcRequest } @@ -560,9 +560,9 @@ object YarnAllocationHandler { conf, resourceManager, appAttemptId, - args.numWorkers, - args.workerMemory, - args.workerCores, + args.numExecutors, + args.executorMemory, + args.executorCores, Map[String, Int](), Map[String, Int](), sparkConf) @@ -582,9 +582,9 @@ object YarnAllocationHandler { conf, resourceManager, appAttemptId, - args.numWorkers, - args.workerMemory, - args.workerCores, + args.numExecutors, + args.executorMemory, + args.executorCores, hostToCount, rackToCount, sparkConf) @@ -594,9 +594,9 @@ object YarnAllocationHandler { conf: Configuration, resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId, - maxWorkers: Int, - workerMemory: Int, - workerCores: Int, + maxExecutors: Int, + executorMemory: Int, + executorCores: Int, map: collection.Map[String, collection.Set[SplitInfo]], sparkConf: SparkConf): YarnAllocationHandler = { @@ -606,9 +606,9 @@ object YarnAllocationHandler { conf, resourceManager, appAttemptId, - maxWorkers, - workerMemory, - workerCores, + maxExecutors, + executorMemory, + executorCores, hostToCount, rackToCount, sparkConf) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index f76a5ddd39..25cc9016b1 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -24,9 +24,9 @@ class ApplicationMasterArguments(val args: Array[String]) { var userJar: String = null var userClass: String = null var userArgs: Seq[String] = Seq[String]() - var workerMemory = 1024 - var workerCores = 1 - var numWorkers = 2 + var executorMemory = 1024 + var executorCores = 1 + var numExecutors = 2 parseArgs(args.toList) @@ -36,7 +36,8 @@ class ApplicationMasterArguments(val args: Array[String]) { var args = inputArgs while (! args.isEmpty) { - + // --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0, + // the properties with executor in their names are preferred. args match { case ("--jar") :: value :: tail => userJar = value @@ -50,16 +51,16 @@ class ApplicationMasterArguments(val args: Array[String]) { userArgsBuffer += value args = tail - case ("--num-workers") :: IntParam(value) :: tail => - numWorkers = value + case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => + numExecutors = value args = tail - case ("--worker-memory") :: IntParam(value) :: tail => - workerMemory = value + case ("--worker-memory" | "--executor-memory") :: IntParam(value) :: tail => + executorMemory = value args = tail - case ("--worker-cores") :: IntParam(value) :: tail => - workerCores = value + case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail => + executorCores = value args = tail case Nil => @@ -86,9 +87,9 @@ class ApplicationMasterArguments(val args: Array[String]) { " --class CLASS_NAME Name of your application's main class (required)\n" + " --args ARGS Arguments to be passed to your application's main class.\n" + " Mutliple invocations are possible, each will be passed in order.\n" + - " --num-workers NUM Number of workers to start (Default: 2)\n" + - " --worker-cores NUM Number of cores for the workers (Default: 1)\n" + - " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n") + " --num-executors NUM Number of executors to start (Default: 2)\n" + + " --executor-cores NUM Number of cores for the executors (Default: 1)\n" + + " --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n") System.exit(exitCode) } } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 1f894a677d..a001060cdb 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -33,9 +33,9 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { var userJar: String = null var userClass: String = null var userArgs: Seq[String] = Seq[String]() - var workerMemory = 1024 // MB - var workerCores = 1 - var numWorkers = 2 + var executorMemory = 1024 // MB + var executorCores = 1 + var numExecutors = 2 var amQueue = sparkConf.get("QUEUE", "default") var amMemory: Int = 512 // MB var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster" @@ -67,24 +67,39 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { userArgsBuffer += value args = tail - case ("--master-class") :: value :: tail => + case ("--master-class" | "--am-class") :: value :: tail => + if (args(0) == "--master-class") { + println("--master-class is deprecated. Use --am-class instead.") + } amClass = value args = tail - case ("--master-memory") :: MemoryParam(value) :: tail => + case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail => + if (args(0) == "--master-memory") { + println("--master-memory is deprecated. Use --driver-memory instead.") + } amMemory = value args = tail - case ("--num-workers") :: IntParam(value) :: tail => - numWorkers = value + case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => + if (args(0) == "--num-workers") { + println("--num-workers is deprecated. Use --num-executors instead.") + } + numExecutors = value args = tail - case ("--worker-memory") :: MemoryParam(value) :: tail => - workerMemory = value + case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail => + if (args(0) == "--worker-memory") { + println("--worker-memory is deprecated. Use --executor-memory instead.") + } + executorMemory = value args = tail - case ("--worker-cores") :: IntParam(value) :: tail => - workerCores = value + case ("--worker-cores" | "--executor-memory") :: IntParam(value) :: tail => + if (args(0) == "--worker-cores") { + println("--worker-cores is deprecated. Use --executor-cores instead.") + } + executorCores = value args = tail case ("--queue") :: value :: tail => @@ -133,11 +148,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { " --class CLASS_NAME Name of your application's main class (required)\n" + " --args ARGS Arguments to be passed to your application's main class.\n" + " Mutliple invocations are possible, each will be passed in order.\n" + - " --num-workers NUM Number of workers to start (Default: 2)\n" + - " --worker-cores NUM Number of cores for the workers (Default: 1).\n" + - " --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" + - " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + - " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + + " --num-executors NUM Number of executors to start (Default: 2)\n" + + " --executor-cores NUM Number of cores for the executors (Default: 1).\n" + + " --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)\n" + + " --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n" + " --name NAME The name of your application (Default: Spark)\n" + " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" + " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" + diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 74c5e0f18e..57e5761cba 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -73,10 +73,10 @@ trait ClientBase extends Logging { ((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) -> "Error: You must specify a user jar when running in standalone mode!"), (args.userClass == null) -> "Error: You must specify a user class!", - (args.numWorkers <= 0) -> "Error: You must specify at least 1 worker!", + (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!", (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" + "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD), - (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size" + + (args.executorMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Executor memory size" + "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString) ).foreach { case(cond, errStr) => if (cond) { @@ -95,9 +95,9 @@ trait ClientBase extends Logging { logInfo("Max mem capabililty of a single resource in this cluster " + maxMem) // If we have requested more then the clusters max for a single resource then exit. - if (args.workerMemory > maxMem) { - logError("Required worker memory (%d MB), is above the max threshold (%d MB) of this cluster.". - format(args.workerMemory, maxMem)) + if (args.executorMemory > maxMem) { + logError("Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster.". + format(args.executorMemory, maxMem)) System.exit(1) } val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD @@ -276,7 +276,7 @@ trait ClientBase extends Logging { env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() - // Set the environment variables to be passed on to the Workers. + // Set the environment variables to be passed on to the executors. distCacheMgr.setDistFilesEnv(env) distCacheMgr.setDistArchivesEnv(env) @@ -360,9 +360,9 @@ trait ClientBase extends Logging { " --class " + args.userClass + " --jar " + args.userJar + userArgsToString(args) + - " --worker-memory " + args.workerMemory + - " --worker-cores " + args.workerCores + - " --num-workers " + args.numWorkers + + " --executor-memory " + args.executorMemory + + " --executor-cores " + args.executorCores + + " --num-executors " + args.numExecutors + " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index 535abbfb7f..68cda0f1c9 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -46,10 +46,10 @@ class ClientDistributedCacheManager() extends Logging { /** * Add a resource to the list of distributed cache resources. This list can - * be sent to the ApplicationMaster and possibly the workers so that it can + * be sent to the ApplicationMaster and possibly the executors so that it can * be downloaded into the Hadoop distributed cache for use by this application. * Adds the LocalResource to the localResources HashMap passed in and saves - * the stats of the resources to they can be sent to the workers and verified. + * the stats of the resources to they can be sent to the executors and verified. * * @param fs FileSystem * @param conf Configuration diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala index bfa8f84bf7..da0a6f74ef 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -39,7 +39,7 @@ import org.apache.spark.{SparkConf, Logging} import org.apache.hadoop.yarn.conf.YarnConfiguration -trait WorkerRunnableUtil extends Logging { +trait ExecutorRunnableUtil extends Logging { val yarnConf: YarnConfiguration val sparkConf: SparkConf @@ -49,13 +49,13 @@ trait WorkerRunnableUtil extends Logging { masterAddress: String, slaveId: String, hostname: String, - workerMemory: Int, - workerCores: Int) = { + executorMemory: Int, + executorCores: Int) = { // Extra options for the JVM var JAVA_OPTS = "" // Set the JVM memory - val workerMemoryString = workerMemory + "m" - JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " " + val executorMemoryString = executorMemory + "m" + JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " " if (env.isDefinedAt("SPARK_JAVA_OPTS")) { JAVA_OPTS += env("SPARK_JAVA_OPTS") + " " } @@ -97,7 +97,7 @@ trait WorkerRunnableUtil extends Logging { val commands = List[String](javaCommand + " -server " + // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling. - // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in + // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in // an inconsistent state. // TODO: If the OOM is not recoverable by rescheduling it on different node, then do // 'something' to fail job ... akin to blacklisting trackers in mapred ? @@ -107,7 +107,7 @@ trait WorkerRunnableUtil extends Logging { masterAddress + " " + slaveId + " " + hostname + " " + - workerCores + + executorCores + " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala index 522e0a9ad7..6b91e6b9eb 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -25,7 +25,7 @@ import org.apache.spark.util.Utils /** * - * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM. + * This scheduler launches executors through Yarn - by calling into Client to launch ExecutorLauncher as AM. */ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { @@ -40,7 +40,7 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur override def postStartHook() { - // The yarn application is running, but the worker might not yet ready + // The yarn application is running, but the executor might not yet ready // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt Thread.sleep(2000L) logInfo("YarnClientClusterScheduler.postStartHook done") diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index e7130d2407..d1f13e3c36 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -53,20 +53,24 @@ private[spark] class YarnClientSchedulerBackend( "--class", "notused", "--jar", null, "--args", hostport, - "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher" + "--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher" ) // process any optional arguments, use the defaults already defined in ClientArguments // if things aren't specified - Map("--master-memory" -> "SPARK_MASTER_MEMORY", - "--num-workers" -> "SPARK_WORKER_INSTANCES", - "--worker-memory" -> "SPARK_WORKER_MEMORY", - "--worker-cores" -> "SPARK_WORKER_CORES", - "--queue" -> "SPARK_YARN_QUEUE", - "--name" -> "SPARK_YARN_APP_NAME", - "--files" -> "SPARK_YARN_DIST_FILES", - "--archives" -> "SPARK_YARN_DIST_ARCHIVES") - .foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) } + Map("SPARK_MASTER_MEMORY" -> "--driver-memory", + "SPARK_DRIVER_MEMORY" -> "--driver-memory", + "SPARK_WORKER_INSTANCES" -> "--num-executors", + "SPARK_WORKER_MEMORY" -> "--executor-memory", + "SPARK_WORKER_CORES" -> "--executor-cores", + "SPARK_EXECUTOR_INSTANCES" -> "--num-executors", + "SPARK_EXECUTOR_MEMORY" -> "--executor-memory", + "SPARK_EXECUTOR_CORES" -> "--executor-cores", + "SPARK_YARN_QUEUE" -> "--queue", + "SPARK_YARN_APP_NAME" -> "--name", + "SPARK_YARN_DIST_FILES" -> "--files", + "SPARK_YARN_DIST_ARCHIVES" -> "--archives") + .foreach { case (optParam, optName) => addArg(optName, optParam, argsArrayBuf) } logDebug("ClientArguments called with: " + argsArrayBuf) val args = new ClientArguments(argsArrayBuf.toArray, conf) @@ -77,7 +81,7 @@ private[spark] class YarnClientSchedulerBackend( def waitForApp() { - // TODO : need a better way to find out whether the workers are ready or not + // TODO : need a better way to find out whether the executors are ready or not // maybe by resource usage report? while(true) { val report = client.getApplicationReport(appId) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 57d1577429..30735cbfdf 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -64,9 +64,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private var isLastAMRetry: Boolean = true private var amClient: AMRMClient[ContainerRequest] = _ - // Default to numWorkers * 2, with minimum of 3 - private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", - math.max(args.numWorkers * 2, 3)) + // Default to numExecutors * 2, with minimum of 3 + private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", + sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) private var registered = false @@ -101,7 +101,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // Call this to force generation of secret so it gets populated into the // hadoop UGI. This has to happen before the startUserClass which does a - // doAs in order for the credentials to be passed on to the worker containers. + // doAs in order for the credentials to be passed on to the executor containers. val securityMgr = new SecurityManager(sparkConf) // Start the user's JAR @@ -120,7 +120,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } // Allocate all containers - allocateWorkers() + allocateExecutors() // Wait for the user class to Finish userThread.join() @@ -202,7 +202,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, t } - // This need to happen before allocateWorkers() + // This need to happen before allocateExecutors() private def waitForSparkContextInitialized() { logInfo("Waiting for Spark context initialization") try { @@ -247,18 +247,18 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } } - private def allocateWorkers() { + private def allocateExecutors() { try { - logInfo("Allocating " + args.numWorkers + " workers.") + logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished // TODO: This is a bit ugly. Can we make it nicer? // TODO: Handle container failure - yarnAllocator.addResourceRequests(args.numWorkers) + yarnAllocator.addResourceRequests(args.numExecutors) // Exits the loop if the user thread exits. - while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) { - if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) { + if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of worker failures reached") + "max number of executor failures reached") } yarnAllocator.allocateResources() ApplicationMaster.incrementAllocatorLoop(1) @@ -269,7 +269,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // so that the loop in ApplicationMaster#sparkContextInitialized() breaks. ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) } - logInfo("All workers have launched.") + logInfo("All executors have launched.") // Launch a progress reporter thread, else the app will get killed after expiration // (def: 10mins) timeout. @@ -294,16 +294,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, val t = new Thread { override def run() { while (userThread.isAlive) { - if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of worker failures reached") + "max number of executor failures reached") } - val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning - + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - yarnAllocator.getNumPendingAllocate - if (missingWorkerCount > 0) { + if (missingExecutorCount > 0) { logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingWorkerCount)) - yarnAllocator.addResourceRequests(missingWorkerCount) + format(missingExecutorCount)) + yarnAllocator.addResourceRequests(missingExecutorCount) } sendProgress() Thread.sleep(sleepTime) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index f1c1fea0b5..b697f10391 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -35,7 +35,7 @@ import org.apache.spark.scheduler.SplitInfo import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) +class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging { def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = @@ -93,7 +93,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar waitForSparkMaster() // Allocate all containers - allocateWorkers() + allocateExecutors() // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. @@ -175,7 +175,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar } - private def allocateWorkers() { + private def allocateExecutors() { // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now. val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = @@ -189,18 +189,18 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar preferredNodeLocationData, sparkConf) - logInfo("Allocating " + args.numWorkers + " workers.") + logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished // TODO: This is a bit ugly. Can we make it nicer? // TODO: Handle container failure - yarnAllocator.addResourceRequests(args.numWorkers) - while ((yarnAllocator.getNumWorkersRunning < args.numWorkers) && (!driverClosed)) { + yarnAllocator.addResourceRequests(args.numExecutors) + while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { yarnAllocator.allocateResources() Thread.sleep(100) } - logInfo("All workers have launched.") + logInfo("All executors have launched.") } @@ -211,12 +211,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar val t = new Thread { override def run() { while (!driverClosed) { - val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning - + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - yarnAllocator.getNumPendingAllocate - if (missingWorkerCount > 0) { + if (missingExecutorCount > 0) { logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingWorkerCount)) - yarnAllocator.addResourceRequests(missingWorkerCount) + format(missingExecutorCount)) + yarnAllocator.addResourceRequests(missingExecutorCount) } sendProgress() Thread.sleep(sleepTime) @@ -244,9 +244,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar } -object WorkerLauncher { +object ExecutorLauncher { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - new WorkerLauncher(args).run() + new ExecutorLauncher(args).run() } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index ab4a79be70..53c403f7d0 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -38,16 +38,16 @@ import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records} import org.apache.spark.{SparkConf, Logging} -class WorkerRunnable( +class ExecutorRunnable( container: Container, conf: Configuration, spConf: SparkConf, masterAddress: String, slaveId: String, hostname: String, - workerMemory: Int, - workerCores: Int) - extends Runnable with WorkerRunnableUtil with Logging { + executorMemory: Int, + executorCores: Int) + extends Runnable with ExecutorRunnableUtil with Logging { var rpc: YarnRPC = YarnRPC.create(conf) var nmClient: NMClient = _ @@ -55,7 +55,7 @@ class WorkerRunnable( val yarnConf: YarnConfiguration = new YarnConfiguration(conf) def run = { - logInfo("Starting Worker Container") + logInfo("Starting Executor Container") nmClient = NMClient.createNMClient() nmClient.init(yarnConf) nmClient.start() @@ -78,9 +78,9 @@ class WorkerRunnable( credentials.writeTokenStorageToStream(dob) ctx.setTokens(ByteBuffer.wrap(dob.getData())) - val commands = prepareCommand(masterAddress, slaveId, hostname, workerMemory, workerCores) + val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores) - logInfo("Setting up worker with commands: " + commands) + logInfo("Setting up executor with commands: " + commands) ctx.setCommands(commands) // Send the start request to the ContainerManager diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 1ac61124cb..e31c4060e8 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -60,9 +60,9 @@ private[yarn] class YarnAllocationHandler( val conf: Configuration, val amClient: AMRMClient[ContainerRequest], val appAttemptId: ApplicationAttemptId, - val maxWorkers: Int, - val workerMemory: Int, - val workerCores: Int, + val maxExecutors: Int, + val executorMemory: Int, + val executorCores: Int, val preferredHostToCount: Map[String, Int], val preferredRackToCount: Map[String, Int], val sparkConf: SparkConf) @@ -89,20 +89,20 @@ private[yarn] class YarnAllocationHandler( // Number of container requests that have been sent to, but not yet allocated by the // ApplicationMaster. private val numPendingAllocate = new AtomicInteger() - private val numWorkersRunning = new AtomicInteger() - // Used to generate a unique id per worker - private val workerIdCounter = new AtomicInteger() + private val numExecutorsRunning = new AtomicInteger() + // Used to generate a unique id per executor + private val executorIdCounter = new AtomicInteger() private val lastResponseId = new AtomicInteger() - private val numWorkersFailed = new AtomicInteger() + private val numExecutorsFailed = new AtomicInteger() def getNumPendingAllocate: Int = numPendingAllocate.intValue - def getNumWorkersRunning: Int = numWorkersRunning.intValue + def getNumExecutorsRunning: Int = numExecutorsRunning.intValue - def getNumWorkersFailed: Int = numWorkersFailed.intValue + def getNumExecutorsFailed: Int = numExecutorsFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) } def releaseContainer(container: Container) { @@ -127,13 +127,13 @@ private[yarn] class YarnAllocationHandler( logDebug(""" Allocated containers: %d - Current worker count: %d + Current executor count: %d Containers released: %s Containers to-be-released: %s Cluster resources: %s """.format( allocatedContainers.size, - numWorkersRunning.get(), + numExecutorsRunning.get(), releasedContainerList, pendingReleaseContainers, allocateResponse.getAvailableResources)) @@ -240,64 +240,64 @@ private[yarn] class YarnAllocationHandler( // Run each of the allocated containers. for (container <- allocatedContainersToProcess) { - val numWorkersRunningNow = numWorkersRunning.incrementAndGet() - val workerHostname = container.getNodeId.getHost + val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet() + val executorHostname = container.getNodeId.getHost val containerId = container.getId - val workerMemoryOverhead = (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) - assert(container.getResource.getMemory >= workerMemoryOverhead) + val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + assert(container.getResource.getMemory >= executorMemoryOverhead) - if (numWorkersRunningNow > maxWorkers) { + if (numExecutorsRunningNow > maxExecutors) { logInfo("""Ignoring container %s at host %s, since we already have the required number of - containers for it.""".format(containerId, workerHostname)) + containers for it.""".format(containerId, executorHostname)) releaseContainer(container) - numWorkersRunning.decrementAndGet() + numExecutorsRunning.decrementAndGet() } else { - val workerId = workerIdCounter.incrementAndGet().toString + val executorId = executorIdCounter.incrementAndGet().toString val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) - logInfo("Launching container %s for on host %s".format(containerId, workerHostname)) + logInfo("Launching container %s for on host %s".format(containerId, executorHostname)) // To be safe, remove the container from `pendingReleaseContainers`. pendingReleaseContainers.remove(containerId) - val rack = YarnAllocationHandler.lookupRack(conf, workerHostname) + val rack = YarnAllocationHandler.lookupRack(conf, executorHostname) allocatedHostToContainersMap.synchronized { - val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname, + val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, new HashSet[ContainerId]()) containerSet += containerId - allocatedContainerToHostMap.put(containerId, workerHostname) + allocatedContainerToHostMap.put(containerId, executorHostname) if (rack != null) { allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1) } } - logInfo("Launching WorkerRunnable. driverUrl: %s, workerHostname: %s".format(driverUrl, workerHostname)) - val workerRunnable = new WorkerRunnable( + logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(driverUrl, executorHostname)) + val executorRunnable = new ExecutorRunnable( container, conf, sparkConf, driverUrl, - workerId, - workerHostname, - workerMemory, - workerCores) - new Thread(workerRunnable).start() + executorId, + executorHostname, + executorMemory, + executorCores) + new Thread(executorRunnable).start() } } logDebug(""" Finished allocating %s containers (from %s originally). - Current number of workers running: %d, + Current number of executors running: %d, releasedContainerList: %s, pendingReleaseContainers: %s """.format( allocatedContainersToProcess, allocatedContainers, - numWorkersRunning.get(), + numExecutorsRunning.get(), releasedContainerList, pendingReleaseContainers)) } @@ -314,9 +314,9 @@ private[yarn] class YarnAllocationHandler( // `pendingReleaseContainers`. pendingReleaseContainers.remove(containerId) } else { - // Decrement the number of workers running. The next iteration of the ApplicationMaster's + // Decrement the number of executors running. The next iteration of the ApplicationMaster's // reporting thread will take care of allocating. - numWorkersRunning.decrementAndGet() + numExecutorsRunning.decrementAndGet() logInfo("Completed container %s (state: %s, exit status: %s)".format( containerId, completedContainer.getState, @@ -326,7 +326,7 @@ private[yarn] class YarnAllocationHandler( // now I think its ok as none of the containers are expected to exit if (completedContainer.getExitStatus() != 0) { logInfo("Container marked as failed: " + containerId) - numWorkersFailed.incrementAndGet() + numExecutorsFailed.incrementAndGet() } } @@ -364,12 +364,12 @@ private[yarn] class YarnAllocationHandler( } logDebug(""" Finished processing %d completed containers. - Current number of workers running: %d, + Current number of executors running: %d, releasedContainerList: %s, pendingReleaseContainers: %s """.format( completedContainers.size, - numWorkersRunning.get(), + numExecutorsRunning.get(), releasedContainerList, pendingReleaseContainers)) } @@ -421,18 +421,18 @@ private[yarn] class YarnAllocationHandler( retval } - def addResourceRequests(numWorkers: Int) { + def addResourceRequests(numExecutors: Int) { val containerRequests: List[ContainerRequest] = - if (numWorkers <= 0 || preferredHostToCount.isEmpty) { - logDebug("numWorkers: " + numWorkers + ", host preferences: " + + if (numExecutors <= 0 || preferredHostToCount.isEmpty) { + logDebug("numExecutors: " + numExecutors + ", host preferences: " + preferredHostToCount.isEmpty) createResourceRequests( AllocationType.ANY, resource = null, - numWorkers, + numExecutors, YarnAllocationHandler.PRIORITY).toList } else { - // Request for all hosts in preferred nodes and for numWorkers - + // Request for all hosts in preferred nodes and for numExecutors - // candidates.size, request by default allocation policy. val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size) for ((candidateHost, candidateCount) <- preferredHostToCount) { @@ -452,7 +452,7 @@ private[yarn] class YarnAllocationHandler( val anyContainerRequests = createResourceRequests( AllocationType.ANY, resource = null, - numWorkers, + numExecutors, YarnAllocationHandler.PRIORITY) val containerRequestBuffer = new ArrayBuffer[ContainerRequest]( @@ -468,11 +468,11 @@ private[yarn] class YarnAllocationHandler( amClient.addContainerRequest(request) } - if (numWorkers > 0) { - numPendingAllocate.addAndGet(numWorkers) - logInfo("Will Allocate %d worker containers, each with %d memory".format( - numWorkers, - (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))) + if (numExecutors > 0) { + numPendingAllocate.addAndGet(numExecutors) + logInfo("Will Allocate %d executor containers, each with %d memory".format( + numExecutors, + (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))) } else { logDebug("Empty allocation request ...") } @@ -494,7 +494,7 @@ private[yarn] class YarnAllocationHandler( private def createResourceRequests( requestType: AllocationType.AllocationType, resource: String, - numWorkers: Int, + numExecutors: Int, priority: Int ): ArrayBuffer[ContainerRequest] = { @@ -507,7 +507,7 @@ private[yarn] class YarnAllocationHandler( val nodeLocal = constructContainerRequests( Array(hostname), racks = null, - numWorkers, + numExecutors, priority) // Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler. @@ -516,10 +516,10 @@ private[yarn] class YarnAllocationHandler( } case AllocationType.RACK => { val rack = resource - constructContainerRequests(hosts = null, Array(rack), numWorkers, priority) + constructContainerRequests(hosts = null, Array(rack), numExecutors, priority) } case AllocationType.ANY => constructContainerRequests( - hosts = null, racks = null, numWorkers, priority) + hosts = null, racks = null, numExecutors, priority) case _ => throw new IllegalArgumentException( "Unexpected/unsupported request type: " + requestType) } @@ -528,18 +528,18 @@ private[yarn] class YarnAllocationHandler( private def constructContainerRequests( hosts: Array[String], racks: Array[String], - numWorkers: Int, + numExecutors: Int, priority: Int ): ArrayBuffer[ContainerRequest] = { - val memoryRequest = workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD - val resource = Resource.newInstance(memoryRequest, workerCores) + val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val resource = Resource.newInstance(memoryRequest, executorCores) val prioritySetting = Records.newRecord(classOf[Priority]) prioritySetting.setPriority(priority) val requests = new ArrayBuffer[ContainerRequest]() - for (i <- 0 until numWorkers) { + for (i <- 0 until numExecutors) { requests += new ContainerRequest(resource, hosts, racks, prioritySetting) } requests @@ -574,9 +574,9 @@ object YarnAllocationHandler { conf, amClient, appAttemptId, - args.numWorkers, - args.workerMemory, - args.workerCores, + args.numExecutors, + args.executorMemory, + args.executorCores, Map[String, Int](), Map[String, Int](), sparkConf) @@ -596,9 +596,9 @@ object YarnAllocationHandler { conf, amClient, appAttemptId, - args.numWorkers, - args.workerMemory, - args.workerCores, + args.numExecutors, + args.executorMemory, + args.executorCores, hostToSplitCount, rackToSplitCount, sparkConf) @@ -608,9 +608,9 @@ object YarnAllocationHandler { conf: Configuration, amClient: AMRMClient[ContainerRequest], appAttemptId: ApplicationAttemptId, - maxWorkers: Int, - workerMemory: Int, - workerCores: Int, + maxExecutors: Int, + executorMemory: Int, + executorCores: Int, map: collection.Map[String, collection.Set[SplitInfo]], sparkConf: SparkConf ): YarnAllocationHandler = { @@ -619,9 +619,9 @@ object YarnAllocationHandler { conf, amClient, appAttemptId, - maxWorkers, - workerMemory, - workerCores, + maxExecutors, + executorMemory, + executorCores, hostToCount, rackToCount, sparkConf) |