From 698373211ef3cdf841c82d48168cd5dbe00a57b4 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Thu, 13 Mar 2014 12:11:33 -0700 Subject: SPARK-1183. Don't use "worker" to mean executor Author: Sandy Ryza Closes #120 from sryza/sandy-spark-1183 and squashes the following commits: 5066a4a [Sandy Ryza] Remove "worker" in a couple comments 0bd1e46 [Sandy Ryza] Remove --am-class from usage bfc8fe0 [Sandy Ryza] Remove am-class from doc and fix yarn-alpha 607539f [Sandy Ryza] Address review comments 74d087a [Sandy Ryza] SPARK-1183. Don't use "worker" to mean executor --- .../deploy/yarn/ApplicationMasterArguments.scala | 27 ++-- .../apache/spark/deploy/yarn/ClientArguments.scala | 46 ++++-- .../org/apache/spark/deploy/yarn/ClientBase.scala | 18 +-- .../yarn/ClientDistributedCacheManager.scala | 4 +- .../spark/deploy/yarn/ExecutorRunnableUtil.scala | 176 +++++++++++++++++++++ .../spark/deploy/yarn/WorkerRunnableUtil.scala | 176 --------------------- .../cluster/YarnClientClusterScheduler.scala | 4 +- .../cluster/YarnClientSchedulerBackend.scala | 26 +-- 8 files changed, 248 insertions(+), 229 deletions(-) create mode 100644 yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala delete mode 100644 yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala (limited to 'yarn/common') diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index f76a5ddd39..25cc9016b1 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -24,9 +24,9 @@ class ApplicationMasterArguments(val args: Array[String]) { var userJar: String = null var userClass: String = null var userArgs: Seq[String] = Seq[String]() - var workerMemory = 1024 - var workerCores = 1 - var numWorkers = 2 + var executorMemory = 1024 + var executorCores = 1 + var numExecutors = 2 parseArgs(args.toList) @@ -36,7 +36,8 @@ class ApplicationMasterArguments(val args: Array[String]) { var args = inputArgs while (! args.isEmpty) { - + // --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0, + // the properties with executor in their names are preferred. args match { case ("--jar") :: value :: tail => userJar = value @@ -50,16 +51,16 @@ class ApplicationMasterArguments(val args: Array[String]) { userArgsBuffer += value args = tail - case ("--num-workers") :: IntParam(value) :: tail => - numWorkers = value + case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => + numExecutors = value args = tail - case ("--worker-memory") :: IntParam(value) :: tail => - workerMemory = value + case ("--worker-memory" | "--executor-memory") :: IntParam(value) :: tail => + executorMemory = value args = tail - case ("--worker-cores") :: IntParam(value) :: tail => - workerCores = value + case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail => + executorCores = value args = tail case Nil => @@ -86,9 +87,9 @@ class ApplicationMasterArguments(val args: Array[String]) { " --class CLASS_NAME Name of your application's main class (required)\n" + " --args ARGS Arguments to be passed to your application's main class.\n" + " Mutliple invocations are possible, each will be passed in order.\n" + - " --num-workers NUM Number of workers to start (Default: 2)\n" + - " --worker-cores NUM Number of cores for the workers (Default: 1)\n" + - " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n") + " --num-executors NUM Number of executors to start (Default: 2)\n" + + " --executor-cores NUM Number of cores for the executors (Default: 1)\n" + + " --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n") System.exit(exitCode) } } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 1f894a677d..a001060cdb 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -33,9 +33,9 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { var userJar: String = null var userClass: String = null var userArgs: Seq[String] = Seq[String]() - var workerMemory = 1024 // MB - var workerCores = 1 - var numWorkers = 2 + var executorMemory = 1024 // MB + var executorCores = 1 + var numExecutors = 2 var amQueue = sparkConf.get("QUEUE", "default") var amMemory: Int = 512 // MB var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster" @@ -67,24 +67,39 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { userArgsBuffer += value args = tail - case ("--master-class") :: value :: tail => + case ("--master-class" | "--am-class") :: value :: tail => + if (args(0) == "--master-class") { + println("--master-class is deprecated. Use --am-class instead.") + } amClass = value args = tail - case ("--master-memory") :: MemoryParam(value) :: tail => + case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail => + if (args(0) == "--master-memory") { + println("--master-memory is deprecated. Use --driver-memory instead.") + } amMemory = value args = tail - case ("--num-workers") :: IntParam(value) :: tail => - numWorkers = value + case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => + if (args(0) == "--num-workers") { + println("--num-workers is deprecated. Use --num-executors instead.") + } + numExecutors = value args = tail - case ("--worker-memory") :: MemoryParam(value) :: tail => - workerMemory = value + case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail => + if (args(0) == "--worker-memory") { + println("--worker-memory is deprecated. Use --executor-memory instead.") + } + executorMemory = value args = tail - case ("--worker-cores") :: IntParam(value) :: tail => - workerCores = value + case ("--worker-cores" | "--executor-memory") :: IntParam(value) :: tail => + if (args(0) == "--worker-cores") { + println("--worker-cores is deprecated. Use --executor-cores instead.") + } + executorCores = value args = tail case ("--queue") :: value :: tail => @@ -133,11 +148,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { " --class CLASS_NAME Name of your application's main class (required)\n" + " --args ARGS Arguments to be passed to your application's main class.\n" + " Mutliple invocations are possible, each will be passed in order.\n" + - " --num-workers NUM Number of workers to start (Default: 2)\n" + - " --worker-cores NUM Number of cores for the workers (Default: 1).\n" + - " --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" + - " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + - " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + + " --num-executors NUM Number of executors to start (Default: 2)\n" + + " --executor-cores NUM Number of cores for the executors (Default: 1).\n" + + " --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)\n" + + " --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n" + " --name NAME The name of your application (Default: Spark)\n" + " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" + " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" + diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 74c5e0f18e..57e5761cba 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -73,10 +73,10 @@ trait ClientBase extends Logging { ((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) -> "Error: You must specify a user jar when running in standalone mode!"), (args.userClass == null) -> "Error: You must specify a user class!", - (args.numWorkers <= 0) -> "Error: You must specify at least 1 worker!", + (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!", (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" + "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD), - (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size" + + (args.executorMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Executor memory size" + "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString) ).foreach { case(cond, errStr) => if (cond) { @@ -95,9 +95,9 @@ trait ClientBase extends Logging { logInfo("Max mem capabililty of a single resource in this cluster " + maxMem) // If we have requested more then the clusters max for a single resource then exit. - if (args.workerMemory > maxMem) { - logError("Required worker memory (%d MB), is above the max threshold (%d MB) of this cluster.". - format(args.workerMemory, maxMem)) + if (args.executorMemory > maxMem) { + logError("Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster.". + format(args.executorMemory, maxMem)) System.exit(1) } val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD @@ -276,7 +276,7 @@ trait ClientBase extends Logging { env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() - // Set the environment variables to be passed on to the Workers. + // Set the environment variables to be passed on to the executors. distCacheMgr.setDistFilesEnv(env) distCacheMgr.setDistArchivesEnv(env) @@ -360,9 +360,9 @@ trait ClientBase extends Logging { " --class " + args.userClass + " --jar " + args.userJar + userArgsToString(args) + - " --worker-memory " + args.workerMemory + - " --worker-cores " + args.workerCores + - " --num-workers " + args.numWorkers + + " --executor-memory " + args.executorMemory + + " --executor-cores " + args.executorCores + + " --num-executors " + args.numExecutors + " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index 535abbfb7f..68cda0f1c9 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -46,10 +46,10 @@ class ClientDistributedCacheManager() extends Logging { /** * Add a resource to the list of distributed cache resources. This list can - * be sent to the ApplicationMaster and possibly the workers so that it can + * be sent to the ApplicationMaster and possibly the executors so that it can * be downloaded into the Hadoop distributed cache for use by this application. * Adds the LocalResource to the localResources HashMap passed in and saves - * the stats of the resources to they can be sent to the workers and verified. + * the stats of the resources to they can be sent to the executors and verified. * * @param fs FileSystem * @param conf Configuration diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala new file mode 100644 index 0000000000..da0a6f74ef --- /dev/null +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.net.URI +import java.nio.ByteBuffer +import java.security.PrivilegedExceptionAction + +import scala.collection.JavaConversions._ +import scala.collection.mutable.HashMap + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.DataOutputBuffer +import org.apache.hadoop.net.NetUtils +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api._ +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.api.protocolrecords._ +import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.hadoop.yarn.conf.YarnConfiguration + + +trait ExecutorRunnableUtil extends Logging { + + val yarnConf: YarnConfiguration + val sparkConf: SparkConf + lazy val env = prepareEnvironment + + def prepareCommand( + masterAddress: String, + slaveId: String, + hostname: String, + executorMemory: Int, + executorCores: Int) = { + // Extra options for the JVM + var JAVA_OPTS = "" + // Set the JVM memory + val executorMemoryString = executorMemory + "m" + JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " " + if (env.isDefinedAt("SPARK_JAVA_OPTS")) { + JAVA_OPTS += env("SPARK_JAVA_OPTS") + " " + } + + JAVA_OPTS += " -Djava.io.tmpdir=" + + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " + + // Commenting it out for now - so that people can refer to the properties if required. Remove + // it once cpuset version is pushed out. + // The context is, default gc for server class machines end up using all cores to do gc - hence + // if there are multiple containers in same node, spark gc effects all other containers + // performance (which can also be other spark containers) + // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in + // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset + // of cores on a node. + /* + else { + // If no java_opts specified, default to using -XX:+CMSIncrementalMode + // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont + // want to mess with it. + // In our expts, using (default) throughput collector has severe perf ramnifications in + // multi-tennent machines + // The options are based on + // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline + JAVA_OPTS += " -XX:+UseConcMarkSweepGC " + JAVA_OPTS += " -XX:+CMSIncrementalMode " + JAVA_OPTS += " -XX:+CMSIncrementalPacing " + JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 " + JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 " + } + */ + + var javaCommand = "java" + val javaHome = System.getenv("JAVA_HOME") + if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) { + javaCommand = Environment.JAVA_HOME.$() + "/bin/java" + } + + val commands = List[String](javaCommand + + " -server " + + // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling. + // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in + // an inconsistent state. + // TODO: If the OOM is not recoverable by rescheduling it on different node, then do + // 'something' to fail job ... akin to blacklisting trackers in mapred ? + " -XX:OnOutOfMemoryError='kill %p' " + + JAVA_OPTS + + " org.apache.spark.executor.CoarseGrainedExecutorBackend " + + masterAddress + " " + + slaveId + " " + + hostname + " " + + executorCores + + " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + + " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") + + commands + } + + private def setupDistributedCache( + file: String, + rtype: LocalResourceType, + localResources: HashMap[String, LocalResource], + timestamp: String, + size: String, + vis: String) = { + val uri = new URI(file) + val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] + amJarRsrc.setType(rtype) + amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis)) + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri)) + amJarRsrc.setTimestamp(timestamp.toLong) + amJarRsrc.setSize(size.toLong) + localResources(uri.getFragment()) = amJarRsrc + } + + def prepareLocalResources: HashMap[String, LocalResource] = { + logInfo("Preparing Local resources") + val localResources = HashMap[String, LocalResource]() + + if (System.getenv("SPARK_YARN_CACHE_FILES") != null) { + val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',') + val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',') + val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',') + val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',') + for( i <- 0 to distFiles.length - 1) { + setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i), + fileSizes(i), visibilities(i)) + } + } + + if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) { + val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',') + val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',') + val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',') + val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',') + for( i <- 0 to distArchives.length - 1) { + setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, + timeStamps(i), fileSizes(i), visibilities(i)) + } + } + + logInfo("Prepared Local resources " + localResources) + localResources + } + + def prepareEnvironment: HashMap[String, String] = { + val env = new HashMap[String, String]() + + ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) + + // Allow users to specify some environment variables + Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) + + System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v } + env + } + +} diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala deleted file mode 100644 index bfa8f84bf7..0000000000 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.net.URI -import java.nio.ByteBuffer -import java.security.PrivilegedExceptionAction - -import scala.collection.JavaConversions._ -import scala.collection.mutable.HashMap - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.DataOutputBuffer -import org.apache.hadoop.net.NetUtils -import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.yarn.api._ -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.api.protocolrecords._ -import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records} - -import org.apache.spark.{SparkConf, Logging} -import org.apache.hadoop.yarn.conf.YarnConfiguration - - -trait WorkerRunnableUtil extends Logging { - - val yarnConf: YarnConfiguration - val sparkConf: SparkConf - lazy val env = prepareEnvironment - - def prepareCommand( - masterAddress: String, - slaveId: String, - hostname: String, - workerMemory: Int, - workerCores: Int) = { - // Extra options for the JVM - var JAVA_OPTS = "" - // Set the JVM memory - val workerMemoryString = workerMemory + "m" - JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " " - if (env.isDefinedAt("SPARK_JAVA_OPTS")) { - JAVA_OPTS += env("SPARK_JAVA_OPTS") + " " - } - - JAVA_OPTS += " -Djava.io.tmpdir=" + - new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " - - // Commenting it out for now - so that people can refer to the properties if required. Remove - // it once cpuset version is pushed out. - // The context is, default gc for server class machines end up using all cores to do gc - hence - // if there are multiple containers in same node, spark gc effects all other containers - // performance (which can also be other spark containers) - // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in - // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset - // of cores on a node. - /* - else { - // If no java_opts specified, default to using -XX:+CMSIncrementalMode - // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont - // want to mess with it. - // In our expts, using (default) throughput collector has severe perf ramnifications in - // multi-tennent machines - // The options are based on - // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline - JAVA_OPTS += " -XX:+UseConcMarkSweepGC " - JAVA_OPTS += " -XX:+CMSIncrementalMode " - JAVA_OPTS += " -XX:+CMSIncrementalPacing " - JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 " - JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 " - } - */ - - var javaCommand = "java" - val javaHome = System.getenv("JAVA_HOME") - if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) { - javaCommand = Environment.JAVA_HOME.$() + "/bin/java" - } - - val commands = List[String](javaCommand + - " -server " + - // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling. - // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in - // an inconsistent state. - // TODO: If the OOM is not recoverable by rescheduling it on different node, then do - // 'something' to fail job ... akin to blacklisting trackers in mapred ? - " -XX:OnOutOfMemoryError='kill %p' " + - JAVA_OPTS + - " org.apache.spark.executor.CoarseGrainedExecutorBackend " + - masterAddress + " " + - slaveId + " " + - hostname + " " + - workerCores + - " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + - " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") - - commands - } - - private def setupDistributedCache( - file: String, - rtype: LocalResourceType, - localResources: HashMap[String, LocalResource], - timestamp: String, - size: String, - vis: String) = { - val uri = new URI(file) - val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] - amJarRsrc.setType(rtype) - amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis)) - amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri)) - amJarRsrc.setTimestamp(timestamp.toLong) - amJarRsrc.setSize(size.toLong) - localResources(uri.getFragment()) = amJarRsrc - } - - def prepareLocalResources: HashMap[String, LocalResource] = { - logInfo("Preparing Local resources") - val localResources = HashMap[String, LocalResource]() - - if (System.getenv("SPARK_YARN_CACHE_FILES") != null) { - val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',') - val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',') - val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',') - val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',') - for( i <- 0 to distFiles.length - 1) { - setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i), - fileSizes(i), visibilities(i)) - } - } - - if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) { - val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',') - val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',') - val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',') - val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',') - for( i <- 0 to distArchives.length - 1) { - setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, - timeStamps(i), fileSizes(i), visibilities(i)) - } - } - - logInfo("Prepared Local resources " + localResources) - localResources - } - - def prepareEnvironment: HashMap[String, String] = { - val env = new HashMap[String, String]() - - ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) - - // Allow users to specify some environment variables - Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) - - System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v } - env - } - -} diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala index 522e0a9ad7..6b91e6b9eb 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -25,7 +25,7 @@ import org.apache.spark.util.Utils /** * - * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM. + * This scheduler launches executors through Yarn - by calling into Client to launch ExecutorLauncher as AM. */ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { @@ -40,7 +40,7 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur override def postStartHook() { - // The yarn application is running, but the worker might not yet ready + // The yarn application is running, but the executor might not yet ready // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt Thread.sleep(2000L) logInfo("YarnClientClusterScheduler.postStartHook done") diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index e7130d2407..d1f13e3c36 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -53,20 +53,24 @@ private[spark] class YarnClientSchedulerBackend( "--class", "notused", "--jar", null, "--args", hostport, - "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher" + "--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher" ) // process any optional arguments, use the defaults already defined in ClientArguments // if things aren't specified - Map("--master-memory" -> "SPARK_MASTER_MEMORY", - "--num-workers" -> "SPARK_WORKER_INSTANCES", - "--worker-memory" -> "SPARK_WORKER_MEMORY", - "--worker-cores" -> "SPARK_WORKER_CORES", - "--queue" -> "SPARK_YARN_QUEUE", - "--name" -> "SPARK_YARN_APP_NAME", - "--files" -> "SPARK_YARN_DIST_FILES", - "--archives" -> "SPARK_YARN_DIST_ARCHIVES") - .foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) } + Map("SPARK_MASTER_MEMORY" -> "--driver-memory", + "SPARK_DRIVER_MEMORY" -> "--driver-memory", + "SPARK_WORKER_INSTANCES" -> "--num-executors", + "SPARK_WORKER_MEMORY" -> "--executor-memory", + "SPARK_WORKER_CORES" -> "--executor-cores", + "SPARK_EXECUTOR_INSTANCES" -> "--num-executors", + "SPARK_EXECUTOR_MEMORY" -> "--executor-memory", + "SPARK_EXECUTOR_CORES" -> "--executor-cores", + "SPARK_YARN_QUEUE" -> "--queue", + "SPARK_YARN_APP_NAME" -> "--name", + "SPARK_YARN_DIST_FILES" -> "--files", + "SPARK_YARN_DIST_ARCHIVES" -> "--archives") + .foreach { case (optParam, optName) => addArg(optName, optParam, argsArrayBuf) } logDebug("ClientArguments called with: " + argsArrayBuf) val args = new ClientArguments(argsArrayBuf.toArray, conf) @@ -77,7 +81,7 @@ private[spark] class YarnClientSchedulerBackend( def waitForApp() { - // TODO : need a better way to find out whether the workers are ready or not + // TODO : need a better way to find out whether the executors are ready or not // maybe by resource usage report? while(true) { val report = client.getApplicationReport(appId) -- cgit v1.2.3