diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-03-13 12:11:33 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-03-13 12:11:33 -0700 |
commit | 698373211ef3cdf841c82d48168cd5dbe00a57b4 (patch) | |
tree | a07edbe4835a7b01aa48cf9bd35c0d6939d21d78 /yarn/common | |
parent | e4e8d8f395aea48f0cae00d7c381a863c48a2837 (diff) | |
download | spark-698373211ef3cdf841c82d48168cd5dbe00a57b4.tar.gz spark-698373211ef3cdf841c82d48168cd5dbe00a57b4.tar.bz2 spark-698373211ef3cdf841c82d48168cd5dbe00a57b4.zip |
SPARK-1183. Don't use "worker" to mean executor
Author: Sandy Ryza <sandy@cloudera.com>
Closes #120 from sryza/sandy-spark-1183 and squashes the following commits:
5066a4a [Sandy Ryza] Remove "worker" in a couple comments
0bd1e46 [Sandy Ryza] Remove --am-class from usage
bfc8fe0 [Sandy Ryza] Remove am-class from doc and fix yarn-alpha
607539f [Sandy Ryza] Address review comments
74d087a [Sandy Ryza] SPARK-1183. Don't use "worker" to mean executor
Diffstat (limited to 'yarn/common')
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala | 27 | ||||
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 46 | ||||
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala | 18 | ||||
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala | 4 | ||||
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala (renamed from yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala) | 14 | ||||
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala | 4 | ||||
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala | 26 |
7 files changed, 79 insertions, 60 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index f76a5ddd39..25cc9016b1 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -24,9 +24,9 @@ class ApplicationMasterArguments(val args: Array[String]) { var userJar: String = null var userClass: String = null var userArgs: Seq[String] = Seq[String]() - var workerMemory = 1024 - var workerCores = 1 - var numWorkers = 2 + var executorMemory = 1024 + var executorCores = 1 + var numExecutors = 2 parseArgs(args.toList) @@ -36,7 +36,8 @@ class ApplicationMasterArguments(val args: Array[String]) { var args = inputArgs while (! args.isEmpty) { - + // --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0, + // the properties with executor in their names are preferred. args match { case ("--jar") :: value :: tail => userJar = value @@ -50,16 +51,16 @@ class ApplicationMasterArguments(val args: Array[String]) { userArgsBuffer += value args = tail - case ("--num-workers") :: IntParam(value) :: tail => - numWorkers = value + case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => + numExecutors = value args = tail - case ("--worker-memory") :: IntParam(value) :: tail => - workerMemory = value + case ("--worker-memory" | "--executor-memory") :: IntParam(value) :: tail => + executorMemory = value args = tail - case ("--worker-cores") :: IntParam(value) :: tail => - workerCores = value + case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail => + executorCores = value args = tail case Nil => @@ -86,9 +87,9 @@ class ApplicationMasterArguments(val args: Array[String]) { " --class CLASS_NAME Name of your application's main class (required)\n" + " --args ARGS Arguments to be passed to your application's main class.\n" + " Mutliple invocations are possible, each will be passed in order.\n" + - " --num-workers NUM Number of workers to start (Default: 2)\n" + - " --worker-cores NUM Number of cores for the workers (Default: 1)\n" + - " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n") + " --num-executors NUM Number of executors to start (Default: 2)\n" + + " --executor-cores NUM Number of cores for the executors (Default: 1)\n" + + " --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n") System.exit(exitCode) } } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 1f894a677d..a001060cdb 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -33,9 +33,9 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { var userJar: String = null var userClass: String = null var userArgs: Seq[String] = Seq[String]() - var workerMemory = 1024 // MB - var workerCores = 1 - var numWorkers = 2 + var executorMemory = 1024 // MB + var executorCores = 1 + var numExecutors = 2 var amQueue = sparkConf.get("QUEUE", "default") var amMemory: Int = 512 // MB var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster" @@ -67,24 +67,39 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { userArgsBuffer += value args = tail - case ("--master-class") :: value :: tail => + case ("--master-class" | "--am-class") :: value :: tail => + if (args(0) == "--master-class") { + println("--master-class is deprecated. Use --am-class instead.") + } amClass = value args = tail - case ("--master-memory") :: MemoryParam(value) :: tail => + case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail => + if (args(0) == "--master-memory") { + println("--master-memory is deprecated. Use --driver-memory instead.") + } amMemory = value args = tail - case ("--num-workers") :: IntParam(value) :: tail => - numWorkers = value + case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => + if (args(0) == "--num-workers") { + println("--num-workers is deprecated. Use --num-executors instead.") + } + numExecutors = value args = tail - case ("--worker-memory") :: MemoryParam(value) :: tail => - workerMemory = value + case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail => + if (args(0) == "--worker-memory") { + println("--worker-memory is deprecated. Use --executor-memory instead.") + } + executorMemory = value args = tail - case ("--worker-cores") :: IntParam(value) :: tail => - workerCores = value + case ("--worker-cores" | "--executor-memory") :: IntParam(value) :: tail => + if (args(0) == "--worker-cores") { + println("--worker-cores is deprecated. Use --executor-cores instead.") + } + executorCores = value args = tail case ("--queue") :: value :: tail => @@ -133,11 +148,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { " --class CLASS_NAME Name of your application's main class (required)\n" + " --args ARGS Arguments to be passed to your application's main class.\n" + " Mutliple invocations are possible, each will be passed in order.\n" + - " --num-workers NUM Number of workers to start (Default: 2)\n" + - " --worker-cores NUM Number of cores for the workers (Default: 1).\n" + - " --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" + - " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + - " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + + " --num-executors NUM Number of executors to start (Default: 2)\n" + + " --executor-cores NUM Number of cores for the executors (Default: 1).\n" + + " --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)\n" + + " --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n" + " --name NAME The name of your application (Default: Spark)\n" + " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" + " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" + diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 74c5e0f18e..57e5761cba 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -73,10 +73,10 @@ trait ClientBase extends Logging { ((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) -> "Error: You must specify a user jar when running in standalone mode!"), (args.userClass == null) -> "Error: You must specify a user class!", - (args.numWorkers <= 0) -> "Error: You must specify at least 1 worker!", + (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!", (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" + "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD), - (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size" + + (args.executorMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Executor memory size" + "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString) ).foreach { case(cond, errStr) => if (cond) { @@ -95,9 +95,9 @@ trait ClientBase extends Logging { logInfo("Max mem capabililty of a single resource in this cluster " + maxMem) // If we have requested more then the clusters max for a single resource then exit. - if (args.workerMemory > maxMem) { - logError("Required worker memory (%d MB), is above the max threshold (%d MB) of this cluster.". - format(args.workerMemory, maxMem)) + if (args.executorMemory > maxMem) { + logError("Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster.". + format(args.executorMemory, maxMem)) System.exit(1) } val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD @@ -276,7 +276,7 @@ trait ClientBase extends Logging { env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() - // Set the environment variables to be passed on to the Workers. + // Set the environment variables to be passed on to the executors. distCacheMgr.setDistFilesEnv(env) distCacheMgr.setDistArchivesEnv(env) @@ -360,9 +360,9 @@ trait ClientBase extends Logging { " --class " + args.userClass + " --jar " + args.userJar + userArgsToString(args) + - " --worker-memory " + args.workerMemory + - " --worker-cores " + args.workerCores + - " --num-workers " + args.numWorkers + + " --executor-memory " + args.executorMemory + + " --executor-cores " + args.executorCores + + " --num-executors " + args.numExecutors + " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index 535abbfb7f..68cda0f1c9 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -46,10 +46,10 @@ class ClientDistributedCacheManager() extends Logging { /** * Add a resource to the list of distributed cache resources. This list can - * be sent to the ApplicationMaster and possibly the workers so that it can + * be sent to the ApplicationMaster and possibly the executors so that it can * be downloaded into the Hadoop distributed cache for use by this application. * Adds the LocalResource to the localResources HashMap passed in and saves - * the stats of the resources to they can be sent to the workers and verified. + * the stats of the resources to they can be sent to the executors and verified. * * @param fs FileSystem * @param conf Configuration diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala index bfa8f84bf7..da0a6f74ef 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -39,7 +39,7 @@ import org.apache.spark.{SparkConf, Logging} import org.apache.hadoop.yarn.conf.YarnConfiguration -trait WorkerRunnableUtil extends Logging { +trait ExecutorRunnableUtil extends Logging { val yarnConf: YarnConfiguration val sparkConf: SparkConf @@ -49,13 +49,13 @@ trait WorkerRunnableUtil extends Logging { masterAddress: String, slaveId: String, hostname: String, - workerMemory: Int, - workerCores: Int) = { + executorMemory: Int, + executorCores: Int) = { // Extra options for the JVM var JAVA_OPTS = "" // Set the JVM memory - val workerMemoryString = workerMemory + "m" - JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " " + val executorMemoryString = executorMemory + "m" + JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " " if (env.isDefinedAt("SPARK_JAVA_OPTS")) { JAVA_OPTS += env("SPARK_JAVA_OPTS") + " " } @@ -97,7 +97,7 @@ trait WorkerRunnableUtil extends Logging { val commands = List[String](javaCommand + " -server " + // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling. - // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in + // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in // an inconsistent state. // TODO: If the OOM is not recoverable by rescheduling it on different node, then do // 'something' to fail job ... akin to blacklisting trackers in mapred ? @@ -107,7 +107,7 @@ trait WorkerRunnableUtil extends Logging { masterAddress + " " + slaveId + " " + hostname + " " + - workerCores + + executorCores + " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala index 522e0a9ad7..6b91e6b9eb 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -25,7 +25,7 @@ import org.apache.spark.util.Utils /** * - * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM. + * This scheduler launches executors through Yarn - by calling into Client to launch ExecutorLauncher as AM. */ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { @@ -40,7 +40,7 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur override def postStartHook() { - // The yarn application is running, but the worker might not yet ready + // The yarn application is running, but the executor might not yet ready // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt Thread.sleep(2000L) logInfo("YarnClientClusterScheduler.postStartHook done") diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index e7130d2407..d1f13e3c36 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -53,20 +53,24 @@ private[spark] class YarnClientSchedulerBackend( "--class", "notused", "--jar", null, "--args", hostport, - "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher" + "--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher" ) // process any optional arguments, use the defaults already defined in ClientArguments // if things aren't specified - Map("--master-memory" -> "SPARK_MASTER_MEMORY", - "--num-workers" -> "SPARK_WORKER_INSTANCES", - "--worker-memory" -> "SPARK_WORKER_MEMORY", - "--worker-cores" -> "SPARK_WORKER_CORES", - "--queue" -> "SPARK_YARN_QUEUE", - "--name" -> "SPARK_YARN_APP_NAME", - "--files" -> "SPARK_YARN_DIST_FILES", - "--archives" -> "SPARK_YARN_DIST_ARCHIVES") - .foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) } + Map("SPARK_MASTER_MEMORY" -> "--driver-memory", + "SPARK_DRIVER_MEMORY" -> "--driver-memory", + "SPARK_WORKER_INSTANCES" -> "--num-executors", + "SPARK_WORKER_MEMORY" -> "--executor-memory", + "SPARK_WORKER_CORES" -> "--executor-cores", + "SPARK_EXECUTOR_INSTANCES" -> "--num-executors", + "SPARK_EXECUTOR_MEMORY" -> "--executor-memory", + "SPARK_EXECUTOR_CORES" -> "--executor-cores", + "SPARK_YARN_QUEUE" -> "--queue", + "SPARK_YARN_APP_NAME" -> "--name", + "SPARK_YARN_DIST_FILES" -> "--files", + "SPARK_YARN_DIST_ARCHIVES" -> "--archives") + .foreach { case (optParam, optName) => addArg(optName, optParam, argsArrayBuf) } logDebug("ClientArguments called with: " + argsArrayBuf) val args = new ClientArguments(argsArrayBuf.toArray, conf) @@ -77,7 +81,7 @@ private[spark] class YarnClientSchedulerBackend( def waitForApp() { - // TODO : need a better way to find out whether the workers are ready or not + // TODO : need a better way to find out whether the executors are ready or not // maybe by resource usage report? while(true) { val report = client.getApplicationReport(appId) |