aboutsummaryrefslogtreecommitdiff
path: root/yarn/common
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-03-13 12:11:33 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-13 12:11:33 -0700
commit698373211ef3cdf841c82d48168cd5dbe00a57b4 (patch)
treea07edbe4835a7b01aa48cf9bd35c0d6939d21d78 /yarn/common
parente4e8d8f395aea48f0cae00d7c381a863c48a2837 (diff)
downloadspark-698373211ef3cdf841c82d48168cd5dbe00a57b4.tar.gz
spark-698373211ef3cdf841c82d48168cd5dbe00a57b4.tar.bz2
spark-698373211ef3cdf841c82d48168cd5dbe00a57b4.zip
SPARK-1183. Don't use "worker" to mean executor
Author: Sandy Ryza <sandy@cloudera.com> Closes #120 from sryza/sandy-spark-1183 and squashes the following commits: 5066a4a [Sandy Ryza] Remove "worker" in a couple comments 0bd1e46 [Sandy Ryza] Remove --am-class from usage bfc8fe0 [Sandy Ryza] Remove am-class from doc and fix yarn-alpha 607539f [Sandy Ryza] Address review comments 74d087a [Sandy Ryza] SPARK-1183. Don't use "worker" to mean executor
Diffstat (limited to 'yarn/common')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala27
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala46
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala18
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala4
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala (renamed from yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala)14
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala4
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala26
7 files changed, 79 insertions, 60 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index f76a5ddd39..25cc9016b1 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -24,9 +24,9 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userJar: String = null
var userClass: String = null
var userArgs: Seq[String] = Seq[String]()
- var workerMemory = 1024
- var workerCores = 1
- var numWorkers = 2
+ var executorMemory = 1024
+ var executorCores = 1
+ var numExecutors = 2
parseArgs(args.toList)
@@ -36,7 +36,8 @@ class ApplicationMasterArguments(val args: Array[String]) {
var args = inputArgs
while (! args.isEmpty) {
-
+ // --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0,
+ // the properties with executor in their names are preferred.
args match {
case ("--jar") :: value :: tail =>
userJar = value
@@ -50,16 +51,16 @@ class ApplicationMasterArguments(val args: Array[String]) {
userArgsBuffer += value
args = tail
- case ("--num-workers") :: IntParam(value) :: tail =>
- numWorkers = value
+ case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
+ numExecutors = value
args = tail
- case ("--worker-memory") :: IntParam(value) :: tail =>
- workerMemory = value
+ case ("--worker-memory" | "--executor-memory") :: IntParam(value) :: tail =>
+ executorMemory = value
args = tail
- case ("--worker-cores") :: IntParam(value) :: tail =>
- workerCores = value
+ case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
+ executorCores = value
args = tail
case Nil =>
@@ -86,9 +87,9 @@ class ApplicationMasterArguments(val args: Array[String]) {
" --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" +
- " --num-workers NUM Number of workers to start (Default: 2)\n" +
- " --worker-cores NUM Number of cores for the workers (Default: 1)\n" +
- " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n")
+ " --num-executors NUM Number of executors to start (Default: 2)\n" +
+ " --executor-cores NUM Number of cores for the executors (Default: 1)\n" +
+ " --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n")
System.exit(exitCode)
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 1f894a677d..a001060cdb 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -33,9 +33,9 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
var userJar: String = null
var userClass: String = null
var userArgs: Seq[String] = Seq[String]()
- var workerMemory = 1024 // MB
- var workerCores = 1
- var numWorkers = 2
+ var executorMemory = 1024 // MB
+ var executorCores = 1
+ var numExecutors = 2
var amQueue = sparkConf.get("QUEUE", "default")
var amMemory: Int = 512 // MB
var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
@@ -67,24 +67,39 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
userArgsBuffer += value
args = tail
- case ("--master-class") :: value :: tail =>
+ case ("--master-class" | "--am-class") :: value :: tail =>
+ if (args(0) == "--master-class") {
+ println("--master-class is deprecated. Use --am-class instead.")
+ }
amClass = value
args = tail
- case ("--master-memory") :: MemoryParam(value) :: tail =>
+ case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail =>
+ if (args(0) == "--master-memory") {
+ println("--master-memory is deprecated. Use --driver-memory instead.")
+ }
amMemory = value
args = tail
- case ("--num-workers") :: IntParam(value) :: tail =>
- numWorkers = value
+ case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
+ if (args(0) == "--num-workers") {
+ println("--num-workers is deprecated. Use --num-executors instead.")
+ }
+ numExecutors = value
args = tail
- case ("--worker-memory") :: MemoryParam(value) :: tail =>
- workerMemory = value
+ case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
+ if (args(0) == "--worker-memory") {
+ println("--worker-memory is deprecated. Use --executor-memory instead.")
+ }
+ executorMemory = value
args = tail
- case ("--worker-cores") :: IntParam(value) :: tail =>
- workerCores = value
+ case ("--worker-cores" | "--executor-memory") :: IntParam(value) :: tail =>
+ if (args(0) == "--worker-cores") {
+ println("--worker-cores is deprecated. Use --executor-cores instead.")
+ }
+ executorCores = value
args = tail
case ("--queue") :: value :: tail =>
@@ -133,11 +148,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
" --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" +
- " --num-workers NUM Number of workers to start (Default: 2)\n" +
- " --worker-cores NUM Number of cores for the workers (Default: 1).\n" +
- " --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" +
- " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
- " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
+ " --num-executors NUM Number of executors to start (Default: 2)\n" +
+ " --executor-cores NUM Number of cores for the executors (Default: 1).\n" +
+ " --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
+ " --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n" +
" --name NAME The name of your application (Default: Spark)\n" +
" --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
" --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 74c5e0f18e..57e5761cba 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -73,10 +73,10 @@ trait ClientBase extends Logging {
((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) ->
"Error: You must specify a user jar when running in standalone mode!"),
(args.userClass == null) -> "Error: You must specify a user class!",
- (args.numWorkers <= 0) -> "Error: You must specify at least 1 worker!",
+ (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
(args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" +
"greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
- (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size" +
+ (args.executorMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Executor memory size" +
"must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString)
).foreach { case(cond, errStr) =>
if (cond) {
@@ -95,9 +95,9 @@ trait ClientBase extends Logging {
logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
// If we have requested more then the clusters max for a single resource then exit.
- if (args.workerMemory > maxMem) {
- logError("Required worker memory (%d MB), is above the max threshold (%d MB) of this cluster.".
- format(args.workerMemory, maxMem))
+ if (args.executorMemory > maxMem) {
+ logError("Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster.".
+ format(args.executorMemory, maxMem))
System.exit(1)
}
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
@@ -276,7 +276,7 @@ trait ClientBase extends Logging {
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
- // Set the environment variables to be passed on to the Workers.
+ // Set the environment variables to be passed on to the executors.
distCacheMgr.setDistFilesEnv(env)
distCacheMgr.setDistArchivesEnv(env)
@@ -360,9 +360,9 @@ trait ClientBase extends Logging {
" --class " + args.userClass +
" --jar " + args.userJar +
userArgsToString(args) +
- " --worker-memory " + args.workerMemory +
- " --worker-cores " + args.workerCores +
- " --num-workers " + args.numWorkers +
+ " --executor-memory " + args.executorMemory +
+ " --executor-cores " + args.executorCores +
+ " --num-executors " + args.numExecutors +
" 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 535abbfb7f..68cda0f1c9 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -46,10 +46,10 @@ class ClientDistributedCacheManager() extends Logging {
/**
* Add a resource to the list of distributed cache resources. This list can
- * be sent to the ApplicationMaster and possibly the workers so that it can
+ * be sent to the ApplicationMaster and possibly the executors so that it can
* be downloaded into the Hadoop distributed cache for use by this application.
* Adds the LocalResource to the localResources HashMap passed in and saves
- * the stats of the resources to they can be sent to the workers and verified.
+ * the stats of the resources to they can be sent to the executors and verified.
*
* @param fs FileSystem
* @param conf Configuration
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index bfa8f84bf7..da0a6f74ef 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -39,7 +39,7 @@ import org.apache.spark.{SparkConf, Logging}
import org.apache.hadoop.yarn.conf.YarnConfiguration
-trait WorkerRunnableUtil extends Logging {
+trait ExecutorRunnableUtil extends Logging {
val yarnConf: YarnConfiguration
val sparkConf: SparkConf
@@ -49,13 +49,13 @@ trait WorkerRunnableUtil extends Logging {
masterAddress: String,
slaveId: String,
hostname: String,
- workerMemory: Int,
- workerCores: Int) = {
+ executorMemory: Int,
+ executorCores: Int) = {
// Extra options for the JVM
var JAVA_OPTS = ""
// Set the JVM memory
- val workerMemoryString = workerMemory + "m"
- JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " "
+ val executorMemoryString = executorMemory + "m"
+ JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
}
@@ -97,7 +97,7 @@ trait WorkerRunnableUtil extends Logging {
val commands = List[String](javaCommand +
" -server " +
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
- // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in
+ // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
// an inconsistent state.
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do
// 'something' to fail job ... akin to blacklisting trackers in mapred ?
@@ -107,7 +107,7 @@ trait WorkerRunnableUtil extends Logging {
masterAddress + " " +
slaveId + " " +
hostname + " " +
- workerCores +
+ executorCores +
" 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 522e0a9ad7..6b91e6b9eb 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -25,7 +25,7 @@ import org.apache.spark.util.Utils
/**
*
- * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM.
+ * This scheduler launches executors through Yarn - by calling into Client to launch ExecutorLauncher as AM.
*/
private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
@@ -40,7 +40,7 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur
override def postStartHook() {
- // The yarn application is running, but the worker might not yet ready
+ // The yarn application is running, but the executor might not yet ready
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
Thread.sleep(2000L)
logInfo("YarnClientClusterScheduler.postStartHook done")
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index e7130d2407..d1f13e3c36 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -53,20 +53,24 @@ private[spark] class YarnClientSchedulerBackend(
"--class", "notused",
"--jar", null,
"--args", hostport,
- "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
+ "--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher"
)
// process any optional arguments, use the defaults already defined in ClientArguments
// if things aren't specified
- Map("--master-memory" -> "SPARK_MASTER_MEMORY",
- "--num-workers" -> "SPARK_WORKER_INSTANCES",
- "--worker-memory" -> "SPARK_WORKER_MEMORY",
- "--worker-cores" -> "SPARK_WORKER_CORES",
- "--queue" -> "SPARK_YARN_QUEUE",
- "--name" -> "SPARK_YARN_APP_NAME",
- "--files" -> "SPARK_YARN_DIST_FILES",
- "--archives" -> "SPARK_YARN_DIST_ARCHIVES")
- .foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) }
+ Map("SPARK_MASTER_MEMORY" -> "--driver-memory",
+ "SPARK_DRIVER_MEMORY" -> "--driver-memory",
+ "SPARK_WORKER_INSTANCES" -> "--num-executors",
+ "SPARK_WORKER_MEMORY" -> "--executor-memory",
+ "SPARK_WORKER_CORES" -> "--executor-cores",
+ "SPARK_EXECUTOR_INSTANCES" -> "--num-executors",
+ "SPARK_EXECUTOR_MEMORY" -> "--executor-memory",
+ "SPARK_EXECUTOR_CORES" -> "--executor-cores",
+ "SPARK_YARN_QUEUE" -> "--queue",
+ "SPARK_YARN_APP_NAME" -> "--name",
+ "SPARK_YARN_DIST_FILES" -> "--files",
+ "SPARK_YARN_DIST_ARCHIVES" -> "--archives")
+ .foreach { case (optParam, optName) => addArg(optName, optParam, argsArrayBuf) }
logDebug("ClientArguments called with: " + argsArrayBuf)
val args = new ClientArguments(argsArrayBuf.toArray, conf)
@@ -77,7 +81,7 @@ private[spark] class YarnClientSchedulerBackend(
def waitForApp() {
- // TODO : need a better way to find out whether the workers are ready or not
+ // TODO : need a better way to find out whether the executors are ready or not
// maybe by resource usage report?
while(true) {
val report = client.getApplicationReport(appId)