aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/cluster-overview.md2
-rw-r--r--docs/graphx-programming-guide.md2
-rw-r--r--docs/job-scheduling.md4
-rw-r--r--docs/mllib-classification-regression.md4
-rw-r--r--docs/python-programming-guide.md6
-rw-r--r--docs/running-on-yarn.md29
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala38
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala (renamed from yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala)28
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala (renamed from yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala)14
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala124
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala27
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala46
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala18
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala4
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala (renamed from yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala)14
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala4
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala26
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala38
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala (renamed from yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala)26
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala (renamed from yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala)14
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala138
21 files changed, 312 insertions, 294 deletions
diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index e16703292c..a555a7b502 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -13,7 +13,7 @@ object in your main program (called the _driver program_).
Specifically, to run on a cluster, the SparkContext can connect to several types of _cluster managers_
(either Spark's own standalone cluster manager or Mesos/YARN), which allocate resources across
applications. Once connected, Spark acquires *executors* on nodes in the cluster, which are
-worker processes that run computations and store data for your application.
+processes that run computations and store data for your application.
Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to
the executors. Finally, SparkContext sends *tasks* for the executors to run.
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index 3dfed7bea9..1238e3e0a4 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -135,7 +135,7 @@ Like RDDs, property graphs are immutable, distributed, and fault-tolerant. Chan
structure of the graph are accomplished by producing a new graph with the desired changes. Note
that substantial parts of the original graph (i.e., unaffected structure, attributes, and indicies)
are reused in the new graph reducing the cost of this inherently functional data-structure. The
-graph is partitioned across the workers using a range of vertex-partitioning heuristics. As with
+graph is partitioned across the executors using a range of vertex-partitioning heuristics. As with
RDDs, each partition of the graph can be recreated on a different machine in the event of a failure.
Logically the property graph corresponds to a pair of typed collections (RDDs) encoding the
diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md
index df2faa5e41..94604f301d 100644
--- a/docs/job-scheduling.md
+++ b/docs/job-scheduling.md
@@ -39,8 +39,8 @@ Resource allocation can be configured as follows, based on the cluster type:
* **Mesos:** To use static partitioning on Mesos, set the `spark.mesos.coarse` configuration property to `true`,
and optionally set `spark.cores.max` to limit each application's resource share as in the standalone mode.
You should also set `spark.executor.memory` to control the executor memory.
-* **YARN:** The `--num-workers` option to the Spark YARN client controls how many workers it will allocate
- on the cluster, while `--worker-memory` and `--worker-cores` control the resources per worker.
+* **YARN:** The `--num-executors` option to the Spark YARN client controls how many executors it will allocate
+ on the cluster, while `--executor-memory` and `--executor-cores` control the resources per executor.
A second option available on Mesos is _dynamic sharing_ of CPU cores. In this mode, each Spark application
still has a fixed and independent memory allocation (set by `spark.executor.memory`), but when the
diff --git a/docs/mllib-classification-regression.md b/docs/mllib-classification-regression.md
index 18a3e8e075..d5bd8042ca 100644
--- a/docs/mllib-classification-regression.md
+++ b/docs/mllib-classification-regression.md
@@ -77,8 +77,8 @@ between the two goals of small loss and small model complexity.
**Distributed Datasets.**
For all currently implemented optimization methods for classification, the data must be
-distributed between the worker machines *by examples*. Every machine holds a consecutive block of
-the `$n$` example/label pairs `$(\x_i,y_i)$`.
+distributed between processes on the worker machines *by examples*. Machines hold consecutive
+blocks of the `$n$` example/label pairs `$(\x_i,y_i)$`.
In other words, the input distributed dataset
([RDD](scala-programming-guide.html#resilient-distributed-datasets-rdds)) must be the set of
vectors `$\x_i\in\R^d$`.
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 57ed54c9cf..cbe7d820b4 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -43,9 +43,9 @@ def is_error(line):
errors = logData.filter(is_error)
{% endhighlight %}
-PySpark will automatically ship these functions to workers, along with any objects that they reference.
-Instances of classes will be serialized and shipped to workers by PySpark, but classes themselves cannot be automatically distributed to workers.
-The [Standalone Use](#standalone-use) section describes how to ship code dependencies to workers.
+PySpark will automatically ship these functions to executors, along with any objects that they reference.
+Instances of classes will be serialized and shipped to executors by PySpark, but classes themselves cannot be automatically distributed to executors.
+The [Standalone Use](#standalone-use) section describes how to ship code dependencies to executors.
In addition, PySpark fully supports interactive use---simply run `./bin/pyspark` to launch an interactive shell.
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index b17929542c..2e9dec4856 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -41,7 +41,7 @@ System Properties:
* `spark.yarn.submit.file.replication`, the HDFS replication level for the files uploaded into HDFS for the application. These include things like the spark jar, the app jar, and any distributed cache files/archives.
* `spark.yarn.preserve.staging.files`, set to true to preserve the staged files(spark jar, app jar, distributed cache files) at the end of the job rather then delete them.
* `spark.yarn.scheduler.heartbeat.interval-ms`, the interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. Default is 5 seconds.
-* `spark.yarn.max.worker.failures`, the maximum number of executor failures before failing the application. Default is the number of executors requested times 2 with minimum of 3.
+* `spark.yarn.max.executor.failures`, the maximum number of executor failures before failing the application. Default is the number of executors requested times 2 with minimum of 3.
# Launching Spark on YARN
@@ -60,11 +60,10 @@ The command to launch the Spark application on the cluster is as follows:
--jar <YOUR_APP_JAR_FILE> \
--class <APP_MAIN_CLASS> \
--args <APP_MAIN_ARGUMENTS> \
- --num-workers <NUMBER_OF_EXECUTORS> \
- --master-class <ApplicationMaster_CLASS>
- --master-memory <MEMORY_FOR_MASTER> \
- --worker-memory <MEMORY_PER_EXECUTOR> \
- --worker-cores <CORES_PER_EXECUTOR> \
+ --num-executors <NUMBER_OF_EXECUTOR_PROCESSES> \
+ --driver-memory <MEMORY_FOR_ApplicationMaster> \
+ --executor-memory <MEMORY_PER_EXECUTOR> \
+ --executor-cores <CORES_PER_EXECUTOR> \
--name <application_name> \
--queue <queue_name> \
--addJars <any_local_files_used_in_SparkContext.addJar> \
@@ -85,10 +84,10 @@ For example:
--jar examples/target/scala-{{site.SCALA_BINARY_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
--class org.apache.spark.examples.SparkPi \
--args yarn-cluster \
- --num-workers 3 \
- --master-memory 4g \
- --worker-memory 2g \
- --worker-cores 1
+ --num-executors 3 \
+ --driver-memory 4g \
+ --executor-memory 2g \
+ --executor-cores 1
The above starts a YARN client program which starts the default Application Master. Then SparkPi will be run as a child thread of Application Master. The client will periodically poll the Application Master for status updates and display them in the console. The client will exit once your application has finished running. Refer to the "Viewing Logs" section below for how to see driver and executor logs.
@@ -100,12 +99,12 @@ With yarn-client mode, the application will be launched locally, just like runni
Configuration in yarn-client mode:
-In order to tune worker cores/number/memory etc., you need to export environment variables or add them to the spark configuration file (./conf/spark_env.sh). The following are the list of options.
+In order to tune executor cores/number/memory etc., you need to export environment variables or add them to the spark configuration file (./conf/spark_env.sh). The following are the list of options.
-* `SPARK_WORKER_INSTANCES`, Number of executors to start (Default: 2)
-* `SPARK_WORKER_CORES`, Number of cores per executor (Default: 1).
-* `SPARK_WORKER_MEMORY`, Memory per executor (e.g. 1000M, 2G) (Default: 1G)
-* `SPARK_MASTER_MEMORY`, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
+* `SPARK_EXECUTOR_INSTANCES`, Number of executors to start (Default: 2)
+* `SPARK_EXECUTOR_CORES`, Number of cores per executor (Default: 1).
+* `SPARK_EXECUTOR_MEMORY`, Memory per executor (e.g. 1000M, 2G) (Default: 1G)
+* `SPARK_DRIVER_MEMORY`, Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)
* `SPARK_YARN_APP_NAME`, The name of your application (Default: Spark)
* `SPARK_YARN_QUEUE`, The YARN queue to use for allocation requests (Default: 'default')
* `SPARK_YARN_DIST_FILES`, Comma separated list of files to be distributed with the job.
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 87785cdc60..910484ed54 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -61,9 +61,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true
- // Default to numWorkers * 2, with minimum of 3
- private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
- math.max(args.numWorkers * 2, 3))
+ // Default to numExecutors * 2, with minimum of 3
+ private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
+ sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
private var registered = false
@@ -96,7 +96,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// Call this to force generation of secret so it gets populated into the
// hadoop UGI. This has to happen before the startUserClass which does a
- // doAs in order for the credentials to be passed on to the worker containers.
+ // doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
// Start the user's JAR
@@ -115,7 +115,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
// Allocate all containers
- allocateWorkers()
+ allocateExecutors()
// Wait for the user class to Finish
userThread.join()
@@ -215,7 +215,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
t
}
- // this need to happen before allocateWorkers
+ // this need to happen before allocateExecutors
private def waitForSparkContextInitialized() {
logInfo("Waiting for spark context initialization")
try {
@@ -260,21 +260,21 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
}
- private def allocateWorkers() {
+ private def allocateExecutors() {
try {
- logInfo("Allocating " + args.numWorkers + " workers.")
+ logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
// Exists the loop if the user thread exits.
- while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
- if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of worker failures reached")
+ "max number of executor failures reached")
}
yarnAllocator.allocateContainers(
- math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+ math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
}
@@ -283,7 +283,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
- logInfo("All workers have launched.")
+ logInfo("All executors have launched.")
// Launch a progress reporter thread, else the app will get killed after expiration
// (def: 10mins) timeout.
@@ -309,15 +309,15 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
val t = new Thread {
override def run() {
while (userThread.isAlive) {
- if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of worker failures reached")
+ "max number of executor failures reached")
}
- val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
- if (missingWorkerCount > 0) {
+ val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
+ if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingWorkerCount))
- yarnAllocator.allocateContainers(missingWorkerCount)
+ format(missingExecutorCount))
+ yarnAllocator.allocateContainers(missingExecutorCount)
}
else sendProgress()
Thread.sleep(sleepTime)
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index b735d01df8..7b0e020263 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -34,7 +34,7 @@ import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
+class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
extends Logging {
def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
@@ -89,7 +89,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
if (minimumMemory > 0) {
- val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
if (numCore > 0) {
@@ -102,7 +102,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
waitForSparkMaster()
// Allocate all containers
- allocateWorkers()
+ allocateExecutors()
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
@@ -199,7 +199,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
}
- private def allocateWorkers() {
+ private def allocateExecutors() {
// Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
@@ -208,16 +208,16 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
args, preferredNodeLocationData, sparkConf)
- logInfo("Allocating " + args.numWorkers + " workers.")
+ logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
- while ((yarnAllocator.getNumWorkersRunning < args.numWorkers) && (!driverClosed)) {
- yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+ while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
+ yarnAllocator.allocateContainers(math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
Thread.sleep(100)
}
- logInfo("All workers have launched.")
+ logInfo("All executors have launched.")
}
@@ -228,10 +228,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
val t = new Thread {
override def run() {
while (!driverClosed) {
- val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
- if (missingWorkerCount > 0) {
- logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
- yarnAllocator.allocateContainers(missingWorkerCount)
+ val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
+ if (missingExecutorCount > 0) {
+ logInfo("Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers")
+ yarnAllocator.allocateContainers(missingExecutorCount)
}
else sendProgress()
Thread.sleep(sleepTime)
@@ -264,9 +264,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
}
-object WorkerLauncher {
+object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
- new WorkerLauncher(args).run()
+ new ExecutorLauncher(args).run()
}
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 8c686e393f..981e8b05f6 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -38,16 +38,16 @@ import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
import org.apache.spark.{SparkConf, Logging}
-class WorkerRunnable(
+class ExecutorRunnable(
container: Container,
conf: Configuration,
spConf: SparkConf,
masterAddress: String,
slaveId: String,
hostname: String,
- workerMemory: Int,
- workerCores: Int)
- extends Runnable with WorkerRunnableUtil with Logging {
+ executorMemory: Int,
+ executorCores: Int)
+ extends Runnable with ExecutorRunnableUtil with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
var cm: ContainerManager = _
@@ -55,7 +55,7 @@ class WorkerRunnable(
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
def run = {
- logInfo("Starting Worker Container")
+ logInfo("Starting Executor Container")
cm = connectToCM
startContainer
}
@@ -81,8 +81,8 @@ class WorkerRunnable(
credentials.writeTokenStorageToStream(dob)
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
- val commands = prepareCommand(masterAddress, slaveId, hostname, workerMemory, workerCores)
- logInfo("Setting up worker with commands: " + commands)
+ val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores)
+ logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)
// Send the start request to the ContainerManager
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index e91257be8e..2056667af5 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -58,9 +58,9 @@ private[yarn] class YarnAllocationHandler(
val conf: Configuration,
val resourceManager: AMRMProtocol,
val appAttemptId: ApplicationAttemptId,
- val maxWorkers: Int,
- val workerMemory: Int,
- val workerCores: Int,
+ val maxExecutors: Int,
+ val executorMemory: Int,
+ val executorCores: Int,
val preferredHostToCount: Map[String, Int],
val preferredRackToCount: Map[String, Int],
val sparkConf: SparkConf)
@@ -84,39 +84,39 @@ private[yarn] class YarnAllocationHandler(
// Containers to be released in next request to RM
private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
- private val numWorkersRunning = new AtomicInteger()
- // Used to generate a unique id per worker
- private val workerIdCounter = new AtomicInteger()
+ private val numExecutorsRunning = new AtomicInteger()
+ // Used to generate a unique id per executor
+ private val executorIdCounter = new AtomicInteger()
private val lastResponseId = new AtomicInteger()
- private val numWorkersFailed = new AtomicInteger()
+ private val numExecutorsFailed = new AtomicInteger()
- def getNumWorkersRunning: Int = numWorkersRunning.intValue
+ def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
- def getNumWorkersFailed: Int = numWorkersFailed.intValue
+ def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
}
- def allocateContainers(workersToRequest: Int) {
+ def allocateContainers(executorsToRequest: Int) {
// We need to send the request only once from what I understand ... but for now, not modifying
// this much.
// Keep polling the Resource Manager for containers
- val amResp = allocateWorkerResources(workersToRequest).getAMResponse
+ val amResp = allocateExecutorResources(executorsToRequest).getAMResponse
val _allocatedContainers = amResp.getAllocatedContainers()
if (_allocatedContainers.size > 0) {
logDebug("""
Allocated containers: %d
- Current worker count: %d
+ Current executor count: %d
Containers released: %s
Containers to be released: %s
Cluster resources: %s
""".format(
_allocatedContainers.size,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers,
amResp.getAvailableResources))
@@ -221,59 +221,59 @@ private[yarn] class YarnAllocationHandler(
// Run each of the allocated containers
for (container <- allocatedContainers) {
- val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
- val workerHostname = container.getNodeId.getHost
+ val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
+ val executorHostname = container.getNodeId.getHost
val containerId = container.getId
assert(
- container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+ container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
- if (numWorkersRunningNow > maxWorkers) {
+ if (numExecutorsRunningNow > maxExecutors) {
logInfo("""Ignoring container %s at host %s, since we already have the required number of
- containers for it.""".format(containerId, workerHostname))
+ containers for it.""".format(containerId, executorHostname))
releasedContainerList.add(containerId)
// reset counter back to old value.
- numWorkersRunning.decrementAndGet()
+ numExecutorsRunning.decrementAndGet()
}
else {
// Deallocate + allocate can result in reusing id's wrongly - so use a different counter
- // (workerIdCounter)
- val workerId = workerIdCounter.incrementAndGet().toString
+ // (executorIdCounter)
+ val executorId = executorIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
- logInfo("launching container on " + containerId + " host " + workerHostname)
+ logInfo("launching container on " + containerId + " host " + executorHostname)
// Just to be safe, simply remove it from pendingReleaseContainers.
// Should not be there, but ..
pendingReleaseContainers.remove(containerId)
- val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
+ val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
allocatedHostToContainersMap.synchronized {
- val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
+ val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId]())
containerSet += containerId
- allocatedContainerToHostMap.put(containerId, workerHostname)
+ allocatedContainerToHostMap.put(containerId, executorHostname)
if (rack != null) {
allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
}
}
new Thread(
- new WorkerRunnable(container, conf, sparkConf, driverUrl, workerId,
- workerHostname, workerMemory, workerCores)
+ new ExecutorRunnable(container, conf, sparkConf, driverUrl, executorId,
+ executorHostname, executorMemory, executorCores)
).start()
}
}
logDebug("""
Finished processing %d containers.
- Current number of workers running: %d,
+ Current number of executors running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
allocatedContainers.size,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}
@@ -292,7 +292,7 @@ private[yarn] class YarnAllocationHandler(
}
else {
// Simply decrement count - next iteration of ReporterThread will take care of allocating.
- numWorkersRunning.decrementAndGet()
+ numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s (state: %s, exit status: %s)".format(
containerId,
completedContainer.getState,
@@ -302,7 +302,7 @@ private[yarn] class YarnAllocationHandler(
// now I think its ok as none of the containers are expected to exit
if (completedContainer.getExitStatus() != 0) {
logInfo("Container marked as failed: " + containerId)
- numWorkersFailed.incrementAndGet()
+ numExecutorsFailed.incrementAndGet()
}
}
@@ -332,12 +332,12 @@ private[yarn] class YarnAllocationHandler(
}
logDebug("""
Finished processing %d completed containers.
- Current number of workers running: %d,
+ Current number of executors running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
completedContainers.size,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}
@@ -387,18 +387,18 @@ private[yarn] class YarnAllocationHandler(
retval
}
- private def allocateWorkerResources(numWorkers: Int): AllocateResponse = {
+ private def allocateExecutorResources(numExecutors: Int): AllocateResponse = {
var resourceRequests: List[ResourceRequest] = null
// default.
- if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
- logDebug("numWorkers: " + numWorkers + ", host preferences: " + preferredHostToCount.isEmpty)
+ if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
+ logDebug("numExecutors: " + numExecutors + ", host preferences: " + preferredHostToCount.isEmpty)
resourceRequests = List(
- createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY))
+ createResourceRequest(AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
}
else {
- // request for all hosts in preferred nodes and for numWorkers -
+ // request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
val hostContainerRequests: ArrayBuffer[ResourceRequest] =
new ArrayBuffer[ResourceRequest](preferredHostToCount.size)
@@ -419,7 +419,7 @@ private[yarn] class YarnAllocationHandler(
val anyContainerRequests: ResourceRequest = createResourceRequest(
AllocationType.ANY,
resource = null,
- numWorkers,
+ numExecutors,
YarnAllocationHandler.PRIORITY)
val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
@@ -441,9 +441,9 @@ private[yarn] class YarnAllocationHandler(
val releasedContainerList = createReleasedContainerList()
req.addAllReleases(releasedContainerList)
- if (numWorkers > 0) {
- logInfo("Allocating %d worker containers with %d of memory each.".format(numWorkers,
- workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+ if (numExecutors > 0) {
+ logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors,
+ executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
}
else {
logDebug("Empty allocation req .. release : " + releasedContainerList)
@@ -464,7 +464,7 @@ private[yarn] class YarnAllocationHandler(
private def createResourceRequest(
requestType: AllocationType.AllocationType,
resource:String,
- numWorkers: Int,
+ numExecutors: Int,
priority: Int): ResourceRequest = {
// If hostname specified, we need atleast two requests - node local and rack local.
@@ -473,7 +473,7 @@ private[yarn] class YarnAllocationHandler(
case AllocationType.HOST => {
assert(YarnAllocationHandler.ANY_HOST != resource)
val hostname = resource
- val nodeLocal = createResourceRequestImpl(hostname, numWorkers, priority)
+ val nodeLocal = createResourceRequestImpl(hostname, numExecutors, priority)
// Add to host->rack mapping
YarnAllocationHandler.populateRackInfo(conf, hostname)
@@ -482,10 +482,10 @@ private[yarn] class YarnAllocationHandler(
}
case AllocationType.RACK => {
val rack = resource
- createResourceRequestImpl(rack, numWorkers, priority)
+ createResourceRequestImpl(rack, numExecutors, priority)
}
case AllocationType.ANY => createResourceRequestImpl(
- YarnAllocationHandler.ANY_HOST, numWorkers, priority)
+ YarnAllocationHandler.ANY_HOST, numExecutors, priority)
case _ => throw new IllegalArgumentException(
"Unexpected/unsupported request type: " + requestType)
}
@@ -493,13 +493,13 @@ private[yarn] class YarnAllocationHandler(
private def createResourceRequestImpl(
hostname:String,
- numWorkers: Int,
+ numExecutors: Int,
priority: Int): ResourceRequest = {
val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
val memCapability = Records.newRecord(classOf[Resource])
// There probably is some overhead here, let's reserve a bit more memory.
- memCapability.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ memCapability.setMemory(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
rsrcRequest.setCapability(memCapability)
val pri = Records.newRecord(classOf[Priority])
@@ -508,7 +508,7 @@ private[yarn] class YarnAllocationHandler(
rsrcRequest.setHostName(hostname)
- rsrcRequest.setNumContainers(java.lang.Math.max(numWorkers, 0))
+ rsrcRequest.setNumContainers(java.lang.Math.max(numExecutors, 0))
rsrcRequest
}
@@ -560,9 +560,9 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
- args.numWorkers,
- args.workerMemory,
- args.workerCores,
+ args.numExecutors,
+ args.executorMemory,
+ args.executorCores,
Map[String, Int](),
Map[String, Int](),
sparkConf)
@@ -582,9 +582,9 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
- args.numWorkers,
- args.workerMemory,
- args.workerCores,
+ args.numExecutors,
+ args.executorMemory,
+ args.executorCores,
hostToCount,
rackToCount,
sparkConf)
@@ -594,9 +594,9 @@ object YarnAllocationHandler {
conf: Configuration,
resourceManager: AMRMProtocol,
appAttemptId: ApplicationAttemptId,
- maxWorkers: Int,
- workerMemory: Int,
- workerCores: Int,
+ maxExecutors: Int,
+ executorMemory: Int,
+ executorCores: Int,
map: collection.Map[String, collection.Set[SplitInfo]],
sparkConf: SparkConf): YarnAllocationHandler = {
@@ -606,9 +606,9 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
- maxWorkers,
- workerMemory,
- workerCores,
+ maxExecutors,
+ executorMemory,
+ executorCores,
hostToCount,
rackToCount,
sparkConf)
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index f76a5ddd39..25cc9016b1 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -24,9 +24,9 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userJar: String = null
var userClass: String = null
var userArgs: Seq[String] = Seq[String]()
- var workerMemory = 1024
- var workerCores = 1
- var numWorkers = 2
+ var executorMemory = 1024
+ var executorCores = 1
+ var numExecutors = 2
parseArgs(args.toList)
@@ -36,7 +36,8 @@ class ApplicationMasterArguments(val args: Array[String]) {
var args = inputArgs
while (! args.isEmpty) {
-
+ // --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0,
+ // the properties with executor in their names are preferred.
args match {
case ("--jar") :: value :: tail =>
userJar = value
@@ -50,16 +51,16 @@ class ApplicationMasterArguments(val args: Array[String]) {
userArgsBuffer += value
args = tail
- case ("--num-workers") :: IntParam(value) :: tail =>
- numWorkers = value
+ case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
+ numExecutors = value
args = tail
- case ("--worker-memory") :: IntParam(value) :: tail =>
- workerMemory = value
+ case ("--worker-memory" | "--executor-memory") :: IntParam(value) :: tail =>
+ executorMemory = value
args = tail
- case ("--worker-cores") :: IntParam(value) :: tail =>
- workerCores = value
+ case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
+ executorCores = value
args = tail
case Nil =>
@@ -86,9 +87,9 @@ class ApplicationMasterArguments(val args: Array[String]) {
" --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" +
- " --num-workers NUM Number of workers to start (Default: 2)\n" +
- " --worker-cores NUM Number of cores for the workers (Default: 1)\n" +
- " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n")
+ " --num-executors NUM Number of executors to start (Default: 2)\n" +
+ " --executor-cores NUM Number of cores for the executors (Default: 1)\n" +
+ " --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n")
System.exit(exitCode)
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 1f894a677d..a001060cdb 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -33,9 +33,9 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
var userJar: String = null
var userClass: String = null
var userArgs: Seq[String] = Seq[String]()
- var workerMemory = 1024 // MB
- var workerCores = 1
- var numWorkers = 2
+ var executorMemory = 1024 // MB
+ var executorCores = 1
+ var numExecutors = 2
var amQueue = sparkConf.get("QUEUE", "default")
var amMemory: Int = 512 // MB
var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
@@ -67,24 +67,39 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
userArgsBuffer += value
args = tail
- case ("--master-class") :: value :: tail =>
+ case ("--master-class" | "--am-class") :: value :: tail =>
+ if (args(0) == "--master-class") {
+ println("--master-class is deprecated. Use --am-class instead.")
+ }
amClass = value
args = tail
- case ("--master-memory") :: MemoryParam(value) :: tail =>
+ case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail =>
+ if (args(0) == "--master-memory") {
+ println("--master-memory is deprecated. Use --driver-memory instead.")
+ }
amMemory = value
args = tail
- case ("--num-workers") :: IntParam(value) :: tail =>
- numWorkers = value
+ case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
+ if (args(0) == "--num-workers") {
+ println("--num-workers is deprecated. Use --num-executors instead.")
+ }
+ numExecutors = value
args = tail
- case ("--worker-memory") :: MemoryParam(value) :: tail =>
- workerMemory = value
+ case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
+ if (args(0) == "--worker-memory") {
+ println("--worker-memory is deprecated. Use --executor-memory instead.")
+ }
+ executorMemory = value
args = tail
- case ("--worker-cores") :: IntParam(value) :: tail =>
- workerCores = value
+ case ("--worker-cores" | "--executor-memory") :: IntParam(value) :: tail =>
+ if (args(0) == "--worker-cores") {
+ println("--worker-cores is deprecated. Use --executor-cores instead.")
+ }
+ executorCores = value
args = tail
case ("--queue") :: value :: tail =>
@@ -133,11 +148,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
" --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" +
- " --num-workers NUM Number of workers to start (Default: 2)\n" +
- " --worker-cores NUM Number of cores for the workers (Default: 1).\n" +
- " --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" +
- " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
- " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
+ " --num-executors NUM Number of executors to start (Default: 2)\n" +
+ " --executor-cores NUM Number of cores for the executors (Default: 1).\n" +
+ " --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
+ " --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n" +
" --name NAME The name of your application (Default: Spark)\n" +
" --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
" --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 74c5e0f18e..57e5761cba 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -73,10 +73,10 @@ trait ClientBase extends Logging {
((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) ->
"Error: You must specify a user jar when running in standalone mode!"),
(args.userClass == null) -> "Error: You must specify a user class!",
- (args.numWorkers <= 0) -> "Error: You must specify at least 1 worker!",
+ (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
(args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" +
"greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
- (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size" +
+ (args.executorMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Executor memory size" +
"must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString)
).foreach { case(cond, errStr) =>
if (cond) {
@@ -95,9 +95,9 @@ trait ClientBase extends Logging {
logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
// If we have requested more then the clusters max for a single resource then exit.
- if (args.workerMemory > maxMem) {
- logError("Required worker memory (%d MB), is above the max threshold (%d MB) of this cluster.".
- format(args.workerMemory, maxMem))
+ if (args.executorMemory > maxMem) {
+ logError("Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster.".
+ format(args.executorMemory, maxMem))
System.exit(1)
}
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
@@ -276,7 +276,7 @@ trait ClientBase extends Logging {
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
- // Set the environment variables to be passed on to the Workers.
+ // Set the environment variables to be passed on to the executors.
distCacheMgr.setDistFilesEnv(env)
distCacheMgr.setDistArchivesEnv(env)
@@ -360,9 +360,9 @@ trait ClientBase extends Logging {
" --class " + args.userClass +
" --jar " + args.userJar +
userArgsToString(args) +
- " --worker-memory " + args.workerMemory +
- " --worker-cores " + args.workerCores +
- " --num-workers " + args.numWorkers +
+ " --executor-memory " + args.executorMemory +
+ " --executor-cores " + args.executorCores +
+ " --num-executors " + args.numExecutors +
" 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 535abbfb7f..68cda0f1c9 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -46,10 +46,10 @@ class ClientDistributedCacheManager() extends Logging {
/**
* Add a resource to the list of distributed cache resources. This list can
- * be sent to the ApplicationMaster and possibly the workers so that it can
+ * be sent to the ApplicationMaster and possibly the executors so that it can
* be downloaded into the Hadoop distributed cache for use by this application.
* Adds the LocalResource to the localResources HashMap passed in and saves
- * the stats of the resources to they can be sent to the workers and verified.
+ * the stats of the resources to they can be sent to the executors and verified.
*
* @param fs FileSystem
* @param conf Configuration
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index bfa8f84bf7..da0a6f74ef 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -39,7 +39,7 @@ import org.apache.spark.{SparkConf, Logging}
import org.apache.hadoop.yarn.conf.YarnConfiguration
-trait WorkerRunnableUtil extends Logging {
+trait ExecutorRunnableUtil extends Logging {
val yarnConf: YarnConfiguration
val sparkConf: SparkConf
@@ -49,13 +49,13 @@ trait WorkerRunnableUtil extends Logging {
masterAddress: String,
slaveId: String,
hostname: String,
- workerMemory: Int,
- workerCores: Int) = {
+ executorMemory: Int,
+ executorCores: Int) = {
// Extra options for the JVM
var JAVA_OPTS = ""
// Set the JVM memory
- val workerMemoryString = workerMemory + "m"
- JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " "
+ val executorMemoryString = executorMemory + "m"
+ JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
}
@@ -97,7 +97,7 @@ trait WorkerRunnableUtil extends Logging {
val commands = List[String](javaCommand +
" -server " +
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
- // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in
+ // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
// an inconsistent state.
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do
// 'something' to fail job ... akin to blacklisting trackers in mapred ?
@@ -107,7 +107,7 @@ trait WorkerRunnableUtil extends Logging {
masterAddress + " " +
slaveId + " " +
hostname + " " +
- workerCores +
+ executorCores +
" 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 522e0a9ad7..6b91e6b9eb 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -25,7 +25,7 @@ import org.apache.spark.util.Utils
/**
*
- * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM.
+ * This scheduler launches executors through Yarn - by calling into Client to launch ExecutorLauncher as AM.
*/
private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
@@ -40,7 +40,7 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur
override def postStartHook() {
- // The yarn application is running, but the worker might not yet ready
+ // The yarn application is running, but the executor might not yet ready
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
Thread.sleep(2000L)
logInfo("YarnClientClusterScheduler.postStartHook done")
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index e7130d2407..d1f13e3c36 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -53,20 +53,24 @@ private[spark] class YarnClientSchedulerBackend(
"--class", "notused",
"--jar", null,
"--args", hostport,
- "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
+ "--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher"
)
// process any optional arguments, use the defaults already defined in ClientArguments
// if things aren't specified
- Map("--master-memory" -> "SPARK_MASTER_MEMORY",
- "--num-workers" -> "SPARK_WORKER_INSTANCES",
- "--worker-memory" -> "SPARK_WORKER_MEMORY",
- "--worker-cores" -> "SPARK_WORKER_CORES",
- "--queue" -> "SPARK_YARN_QUEUE",
- "--name" -> "SPARK_YARN_APP_NAME",
- "--files" -> "SPARK_YARN_DIST_FILES",
- "--archives" -> "SPARK_YARN_DIST_ARCHIVES")
- .foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) }
+ Map("SPARK_MASTER_MEMORY" -> "--driver-memory",
+ "SPARK_DRIVER_MEMORY" -> "--driver-memory",
+ "SPARK_WORKER_INSTANCES" -> "--num-executors",
+ "SPARK_WORKER_MEMORY" -> "--executor-memory",
+ "SPARK_WORKER_CORES" -> "--executor-cores",
+ "SPARK_EXECUTOR_INSTANCES" -> "--num-executors",
+ "SPARK_EXECUTOR_MEMORY" -> "--executor-memory",
+ "SPARK_EXECUTOR_CORES" -> "--executor-cores",
+ "SPARK_YARN_QUEUE" -> "--queue",
+ "SPARK_YARN_APP_NAME" -> "--name",
+ "SPARK_YARN_DIST_FILES" -> "--files",
+ "SPARK_YARN_DIST_ARCHIVES" -> "--archives")
+ .foreach { case (optParam, optName) => addArg(optName, optParam, argsArrayBuf) }
logDebug("ClientArguments called with: " + argsArrayBuf)
val args = new ClientArguments(argsArrayBuf.toArray, conf)
@@ -77,7 +81,7 @@ private[spark] class YarnClientSchedulerBackend(
def waitForApp() {
- // TODO : need a better way to find out whether the workers are ready or not
+ // TODO : need a better way to find out whether the executors are ready or not
// maybe by resource usage report?
while(true) {
val report = client.getApplicationReport(appId)
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 57d1577429..30735cbfdf 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -64,9 +64,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var isLastAMRetry: Boolean = true
private var amClient: AMRMClient[ContainerRequest] = _
- // Default to numWorkers * 2, with minimum of 3
- private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
- math.max(args.numWorkers * 2, 3))
+ // Default to numExecutors * 2, with minimum of 3
+ private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
+ sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
private var registered = false
@@ -101,7 +101,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// Call this to force generation of secret so it gets populated into the
// hadoop UGI. This has to happen before the startUserClass which does a
- // doAs in order for the credentials to be passed on to the worker containers.
+ // doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
// Start the user's JAR
@@ -120,7 +120,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
// Allocate all containers
- allocateWorkers()
+ allocateExecutors()
// Wait for the user class to Finish
userThread.join()
@@ -202,7 +202,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
t
}
- // This need to happen before allocateWorkers()
+ // This need to happen before allocateExecutors()
private def waitForSparkContextInitialized() {
logInfo("Waiting for Spark context initialization")
try {
@@ -247,18 +247,18 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
}
- private def allocateWorkers() {
+ private def allocateExecutors() {
try {
- logInfo("Allocating " + args.numWorkers + " workers.")
+ logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
- yarnAllocator.addResourceRequests(args.numWorkers)
+ yarnAllocator.addResourceRequests(args.numExecutors)
// Exits the loop if the user thread exits.
- while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
- if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of worker failures reached")
+ "max number of executor failures reached")
}
yarnAllocator.allocateResources()
ApplicationMaster.incrementAllocatorLoop(1)
@@ -269,7 +269,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
- logInfo("All workers have launched.")
+ logInfo("All executors have launched.")
// Launch a progress reporter thread, else the app will get killed after expiration
// (def: 10mins) timeout.
@@ -294,16 +294,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
val t = new Thread {
override def run() {
while (userThread.isAlive) {
- if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of worker failures reached")
+ "max number of executor failures reached")
}
- val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning -
+ val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
- if (missingWorkerCount > 0) {
+ if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingWorkerCount))
- yarnAllocator.addResourceRequests(missingWorkerCount)
+ format(missingExecutorCount))
+ yarnAllocator.addResourceRequests(missingExecutorCount)
}
sendProgress()
Thread.sleep(sleepTime)
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index f1c1fea0b5..b697f10391 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -35,7 +35,7 @@ import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
+class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
extends Logging {
def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
@@ -93,7 +93,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
waitForSparkMaster()
// Allocate all containers
- allocateWorkers()
+ allocateExecutors()
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
@@ -175,7 +175,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
}
- private def allocateWorkers() {
+ private def allocateExecutors() {
// Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
@@ -189,18 +189,18 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
preferredNodeLocationData,
sparkConf)
- logInfo("Allocating " + args.numWorkers + " workers.")
+ logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
- yarnAllocator.addResourceRequests(args.numWorkers)
- while ((yarnAllocator.getNumWorkersRunning < args.numWorkers) && (!driverClosed)) {
+ yarnAllocator.addResourceRequests(args.numExecutors)
+ while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
yarnAllocator.allocateResources()
Thread.sleep(100)
}
- logInfo("All workers have launched.")
+ logInfo("All executors have launched.")
}
@@ -211,12 +211,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
val t = new Thread {
override def run() {
while (!driverClosed) {
- val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning -
+ val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
- if (missingWorkerCount > 0) {
+ if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingWorkerCount))
- yarnAllocator.addResourceRequests(missingWorkerCount)
+ format(missingExecutorCount))
+ yarnAllocator.addResourceRequests(missingExecutorCount)
}
sendProgress()
Thread.sleep(sleepTime)
@@ -244,9 +244,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
}
-object WorkerLauncher {
+object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
- new WorkerLauncher(args).run()
+ new ExecutorLauncher(args).run()
}
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index ab4a79be70..53c403f7d0 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -38,16 +38,16 @@ import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
import org.apache.spark.{SparkConf, Logging}
-class WorkerRunnable(
+class ExecutorRunnable(
container: Container,
conf: Configuration,
spConf: SparkConf,
masterAddress: String,
slaveId: String,
hostname: String,
- workerMemory: Int,
- workerCores: Int)
- extends Runnable with WorkerRunnableUtil with Logging {
+ executorMemory: Int,
+ executorCores: Int)
+ extends Runnable with ExecutorRunnableUtil with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
var nmClient: NMClient = _
@@ -55,7 +55,7 @@ class WorkerRunnable(
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
def run = {
- logInfo("Starting Worker Container")
+ logInfo("Starting Executor Container")
nmClient = NMClient.createNMClient()
nmClient.init(yarnConf)
nmClient.start()
@@ -78,9 +78,9 @@ class WorkerRunnable(
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
- val commands = prepareCommand(masterAddress, slaveId, hostname, workerMemory, workerCores)
+ val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores)
- logInfo("Setting up worker with commands: " + commands)
+ logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)
// Send the start request to the ContainerManager
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 1ac61124cb..e31c4060e8 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -60,9 +60,9 @@ private[yarn] class YarnAllocationHandler(
val conf: Configuration,
val amClient: AMRMClient[ContainerRequest],
val appAttemptId: ApplicationAttemptId,
- val maxWorkers: Int,
- val workerMemory: Int,
- val workerCores: Int,
+ val maxExecutors: Int,
+ val executorMemory: Int,
+ val executorCores: Int,
val preferredHostToCount: Map[String, Int],
val preferredRackToCount: Map[String, Int],
val sparkConf: SparkConf)
@@ -89,20 +89,20 @@ private[yarn] class YarnAllocationHandler(
// Number of container requests that have been sent to, but not yet allocated by the
// ApplicationMaster.
private val numPendingAllocate = new AtomicInteger()
- private val numWorkersRunning = new AtomicInteger()
- // Used to generate a unique id per worker
- private val workerIdCounter = new AtomicInteger()
+ private val numExecutorsRunning = new AtomicInteger()
+ // Used to generate a unique id per executor
+ private val executorIdCounter = new AtomicInteger()
private val lastResponseId = new AtomicInteger()
- private val numWorkersFailed = new AtomicInteger()
+ private val numExecutorsFailed = new AtomicInteger()
def getNumPendingAllocate: Int = numPendingAllocate.intValue
- def getNumWorkersRunning: Int = numWorkersRunning.intValue
+ def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
- def getNumWorkersFailed: Int = numWorkersFailed.intValue
+ def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
}
def releaseContainer(container: Container) {
@@ -127,13 +127,13 @@ private[yarn] class YarnAllocationHandler(
logDebug("""
Allocated containers: %d
- Current worker count: %d
+ Current executor count: %d
Containers released: %s
Containers to-be-released: %s
Cluster resources: %s
""".format(
allocatedContainers.size,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers,
allocateResponse.getAvailableResources))
@@ -240,64 +240,64 @@ private[yarn] class YarnAllocationHandler(
// Run each of the allocated containers.
for (container <- allocatedContainersToProcess) {
- val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
- val workerHostname = container.getNodeId.getHost
+ val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
+ val executorHostname = container.getNodeId.getHost
val containerId = container.getId
- val workerMemoryOverhead = (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
- assert(container.getResource.getMemory >= workerMemoryOverhead)
+ val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ assert(container.getResource.getMemory >= executorMemoryOverhead)
- if (numWorkersRunningNow > maxWorkers) {
+ if (numExecutorsRunningNow > maxExecutors) {
logInfo("""Ignoring container %s at host %s, since we already have the required number of
- containers for it.""".format(containerId, workerHostname))
+ containers for it.""".format(containerId, executorHostname))
releaseContainer(container)
- numWorkersRunning.decrementAndGet()
+ numExecutorsRunning.decrementAndGet()
} else {
- val workerId = workerIdCounter.incrementAndGet().toString
+ val executorId = executorIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
- logInfo("Launching container %s for on host %s".format(containerId, workerHostname))
+ logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
// To be safe, remove the container from `pendingReleaseContainers`.
pendingReleaseContainers.remove(containerId)
- val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
+ val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
allocatedHostToContainersMap.synchronized {
- val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
+ val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId]())
containerSet += containerId
- allocatedContainerToHostMap.put(containerId, workerHostname)
+ allocatedContainerToHostMap.put(containerId, executorHostname)
if (rack != null) {
allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
}
}
- logInfo("Launching WorkerRunnable. driverUrl: %s, workerHostname: %s".format(driverUrl, workerHostname))
- val workerRunnable = new WorkerRunnable(
+ logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(driverUrl, executorHostname))
+ val executorRunnable = new ExecutorRunnable(
container,
conf,
sparkConf,
driverUrl,
- workerId,
- workerHostname,
- workerMemory,
- workerCores)
- new Thread(workerRunnable).start()
+ executorId,
+ executorHostname,
+ executorMemory,
+ executorCores)
+ new Thread(executorRunnable).start()
}
}
logDebug("""
Finished allocating %s containers (from %s originally).
- Current number of workers running: %d,
+ Current number of executors running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
allocatedContainersToProcess,
allocatedContainers,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}
@@ -314,9 +314,9 @@ private[yarn] class YarnAllocationHandler(
// `pendingReleaseContainers`.
pendingReleaseContainers.remove(containerId)
} else {
- // Decrement the number of workers running. The next iteration of the ApplicationMaster's
+ // Decrement the number of executors running. The next iteration of the ApplicationMaster's
// reporting thread will take care of allocating.
- numWorkersRunning.decrementAndGet()
+ numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s (state: %s, exit status: %s)".format(
containerId,
completedContainer.getState,
@@ -326,7 +326,7 @@ private[yarn] class YarnAllocationHandler(
// now I think its ok as none of the containers are expected to exit
if (completedContainer.getExitStatus() != 0) {
logInfo("Container marked as failed: " + containerId)
- numWorkersFailed.incrementAndGet()
+ numExecutorsFailed.incrementAndGet()
}
}
@@ -364,12 +364,12 @@ private[yarn] class YarnAllocationHandler(
}
logDebug("""
Finished processing %d completed containers.
- Current number of workers running: %d,
+ Current number of executors running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
completedContainers.size,
- numWorkersRunning.get(),
+ numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}
@@ -421,18 +421,18 @@ private[yarn] class YarnAllocationHandler(
retval
}
- def addResourceRequests(numWorkers: Int) {
+ def addResourceRequests(numExecutors: Int) {
val containerRequests: List[ContainerRequest] =
- if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
- logDebug("numWorkers: " + numWorkers + ", host preferences: " +
+ if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
+ logDebug("numExecutors: " + numExecutors + ", host preferences: " +
preferredHostToCount.isEmpty)
createResourceRequests(
AllocationType.ANY,
resource = null,
- numWorkers,
+ numExecutors,
YarnAllocationHandler.PRIORITY).toList
} else {
- // Request for all hosts in preferred nodes and for numWorkers -
+ // Request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size)
for ((candidateHost, candidateCount) <- preferredHostToCount) {
@@ -452,7 +452,7 @@ private[yarn] class YarnAllocationHandler(
val anyContainerRequests = createResourceRequests(
AllocationType.ANY,
resource = null,
- numWorkers,
+ numExecutors,
YarnAllocationHandler.PRIORITY)
val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
@@ -468,11 +468,11 @@ private[yarn] class YarnAllocationHandler(
amClient.addContainerRequest(request)
}
- if (numWorkers > 0) {
- numPendingAllocate.addAndGet(numWorkers)
- logInfo("Will Allocate %d worker containers, each with %d memory".format(
- numWorkers,
- (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
+ if (numExecutors > 0) {
+ numPendingAllocate.addAndGet(numExecutors)
+ logInfo("Will Allocate %d executor containers, each with %d memory".format(
+ numExecutors,
+ (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
} else {
logDebug("Empty allocation request ...")
}
@@ -494,7 +494,7 @@ private[yarn] class YarnAllocationHandler(
private def createResourceRequests(
requestType: AllocationType.AllocationType,
resource: String,
- numWorkers: Int,
+ numExecutors: Int,
priority: Int
): ArrayBuffer[ContainerRequest] = {
@@ -507,7 +507,7 @@ private[yarn] class YarnAllocationHandler(
val nodeLocal = constructContainerRequests(
Array(hostname),
racks = null,
- numWorkers,
+ numExecutors,
priority)
// Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler.
@@ -516,10 +516,10 @@ private[yarn] class YarnAllocationHandler(
}
case AllocationType.RACK => {
val rack = resource
- constructContainerRequests(hosts = null, Array(rack), numWorkers, priority)
+ constructContainerRequests(hosts = null, Array(rack), numExecutors, priority)
}
case AllocationType.ANY => constructContainerRequests(
- hosts = null, racks = null, numWorkers, priority)
+ hosts = null, racks = null, numExecutors, priority)
case _ => throw new IllegalArgumentException(
"Unexpected/unsupported request type: " + requestType)
}
@@ -528,18 +528,18 @@ private[yarn] class YarnAllocationHandler(
private def constructContainerRequests(
hosts: Array[String],
racks: Array[String],
- numWorkers: Int,
+ numExecutors: Int,
priority: Int
): ArrayBuffer[ContainerRequest] = {
- val memoryRequest = workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
- val resource = Resource.newInstance(memoryRequest, workerCores)
+ val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ val resource = Resource.newInstance(memoryRequest, executorCores)
val prioritySetting = Records.newRecord(classOf[Priority])
prioritySetting.setPriority(priority)
val requests = new ArrayBuffer[ContainerRequest]()
- for (i <- 0 until numWorkers) {
+ for (i <- 0 until numExecutors) {
requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
}
requests
@@ -574,9 +574,9 @@ object YarnAllocationHandler {
conf,
amClient,
appAttemptId,
- args.numWorkers,
- args.workerMemory,
- args.workerCores,
+ args.numExecutors,
+ args.executorMemory,
+ args.executorCores,
Map[String, Int](),
Map[String, Int](),
sparkConf)
@@ -596,9 +596,9 @@ object YarnAllocationHandler {
conf,
amClient,
appAttemptId,
- args.numWorkers,
- args.workerMemory,
- args.workerCores,
+ args.numExecutors,
+ args.executorMemory,
+ args.executorCores,
hostToSplitCount,
rackToSplitCount,
sparkConf)
@@ -608,9 +608,9 @@ object YarnAllocationHandler {
conf: Configuration,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
- maxWorkers: Int,
- workerMemory: Int,
- workerCores: Int,
+ maxExecutors: Int,
+ executorMemory: Int,
+ executorCores: Int,
map: collection.Map[String, collection.Set[SplitInfo]],
sparkConf: SparkConf
): YarnAllocationHandler = {
@@ -619,9 +619,9 @@ object YarnAllocationHandler {
conf,
amClient,
appAttemptId,
- maxWorkers,
- workerMemory,
- workerCores,
+ maxExecutors,
+ executorMemory,
+ executorCores,
hostToCount,
rackToCount,
sparkConf)