aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorKostas Sakellis <kostas@cloudera.com>2015-01-05 23:26:33 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-01-05 23:26:33 -0800
commit451546aa6d2e61e43b0c0f0669f18cfb7489e584 (patch)
treeb2ca14f6b27a4582f134eabc4311077a5e329f05 /yarn
parent04d55d8e8e4890d110ce5561b5c1ae608c34a7c9 (diff)
downloadspark-451546aa6d2e61e43b0c0f0669f18cfb7489e584.tar.gz
spark-451546aa6d2e61e43b0c0f0669f18cfb7489e584.tar.bz2
spark-451546aa6d2e61e43b0c0f0669f18cfb7489e584.zip
SPARK-4843 [YARN] Squash ExecutorRunnableUtil and ExecutorRunnable
ExecutorRunnableUtil is a parent of ExecutorRunnable because of the yarn-alpha and yarn-stable split. Now that yarn-alpha is gone, this commit squashes the unnecessary hierarchy. The methods from ExecutorRunnableUtil are added as private. Author: Kostas Sakellis <kostas@cloudera.com> Closes #3696 from ksakellis/kostas-spark-4843 and squashes the following commits: 486716f [Kostas Sakellis] Moved prepareEnvironment call to after yarnConf declaration 470e22e [Kostas Sakellis] Fixed indentation and renamed sparkConf variable 9b1b1c9 [Kostas Sakellis] SPARK-4843 [YARN] Squash ExecutorRunnableUtil and ExecutorRunnable
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala182
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala203
2 files changed, 172 insertions, 213 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index fdd3c2300f..6d9198c122 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -17,32 +17,33 @@
package org.apache.spark.deploy.yarn
+import java.net.URI
import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.spark.util.Utils
import scala.collection.JavaConversions._
+import scala.collection.mutable.{HashMap, ListBuffer}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils
-import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.client.api.NMClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{SecurityManager, SparkConf, Logging}
import org.apache.spark.network.util.JavaUtils
-
class ExecutorRunnable(
container: Container,
conf: Configuration,
- spConf: SparkConf,
+ sparkConf: SparkConf,
masterAddress: String,
slaveId: String,
hostname: String,
@@ -50,13 +51,13 @@ class ExecutorRunnable(
executorCores: Int,
appId: String,
securityMgr: SecurityManager)
- extends Runnable with ExecutorRunnableUtil with Logging {
+ extends Runnable with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
var nmClient: NMClient = _
- val sparkConf = spConf
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
+ lazy val env = prepareEnvironment
+
def run = {
logInfo("Starting Executor Container")
nmClient = NMClient.createNMClient()
@@ -110,4 +111,165 @@ class ExecutorRunnable(
nmClient.startContainer(container, ctx)
}
+ private def prepareCommand(
+ masterAddress: String,
+ slaveId: String,
+ hostname: String,
+ executorMemory: Int,
+ executorCores: Int,
+ appId: String,
+ localResources: HashMap[String, LocalResource]): List[String] = {
+ // Extra options for the JVM
+ val javaOpts = ListBuffer[String]()
+
+ // Set the environment variable through a command prefix
+ // to append to the existing value of the variable
+ var prefixEnv: Option[String] = None
+
+ // Set the JVM memory
+ val executorMemoryString = executorMemory + "m"
+ javaOpts += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
+
+ // Set extra Java options for the executor, if defined
+ sys.props.get("spark.executor.extraJavaOptions").foreach { opts =>
+ javaOpts += opts
+ }
+ sys.env.get("SPARK_JAVA_OPTS").foreach { opts =>
+ javaOpts += opts
+ }
+ sys.props.get("spark.executor.extraLibraryPath").foreach { p =>
+ prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(p)))
+ }
+
+ javaOpts += "-Djava.io.tmpdir=" +
+ new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+
+ // Certain configs need to be passed here because they are needed before the Executor
+ // registers with the Scheduler and transfers the spark configs. Since the Executor backend
+ // uses Akka to connect to the scheduler, the akka settings are needed as well as the
+ // authentication settings.
+ sparkConf.getAll.
+ filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
+ foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
+
+ sparkConf.getAkkaConf.
+ foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
+
+ // Commenting it out for now - so that people can refer to the properties if required. Remove
+ // it once cpuset version is pushed out.
+ // The context is, default gc for server class machines end up using all cores to do gc - hence
+ // if there are multiple containers in same node, spark gc effects all other containers
+ // performance (which can also be other spark containers)
+ // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
+ // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
+ // of cores on a node.
+ /*
+ else {
+ // If no java_opts specified, default to using -XX:+CMSIncrementalMode
+ // It might be possible that other modes/config is being done in
+ // spark.executor.extraJavaOptions, so we dont want to mess with it.
+ // In our expts, using (default) throughput collector has severe perf ramnifications in
+ // multi-tennent machines
+ // The options are based on
+ // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use
+ // %20the%20Concurrent%20Low%20Pause%20Collector|outline
+ javaOpts += " -XX:+UseConcMarkSweepGC "
+ javaOpts += " -XX:+CMSIncrementalMode "
+ javaOpts += " -XX:+CMSIncrementalPacing "
+ javaOpts += " -XX:CMSIncrementalDutyCycleMin=0 "
+ javaOpts += " -XX:CMSIncrementalDutyCycle=10 "
+ }
+ */
+
+ // For log4j configuration to reference
+ javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
+
+ val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java",
+ "-server",
+ // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
+ // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
+ // an inconsistent state.
+ // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
+ // 'something' to fail job ... akin to blacklisting trackers in mapred ?
+ "-XX:OnOutOfMemoryError='kill %p'") ++
+ javaOpts ++
+ Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
+ masterAddress.toString,
+ slaveId.toString,
+ hostname.toString,
+ executorCores.toString,
+ appId,
+ "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+ "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+
+ // TODO: it would be nicer to just make sure there are no null commands here
+ commands.map(s => if (s == null) "null" else s).toList
+ }
+
+ private def setupDistributedCache(
+ file: String,
+ rtype: LocalResourceType,
+ localResources: HashMap[String, LocalResource],
+ timestamp: String,
+ size: String,
+ vis: String): Unit = {
+ val uri = new URI(file)
+ val amJarRsrc = Records.newRecord(classOf[LocalResource])
+ amJarRsrc.setType(rtype)
+ amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
+ amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
+ amJarRsrc.setTimestamp(timestamp.toLong)
+ amJarRsrc.setSize(size.toLong)
+ localResources(uri.getFragment()) = amJarRsrc
+ }
+
+ private def prepareLocalResources: HashMap[String, LocalResource] = {
+ logInfo("Preparing Local resources")
+ val localResources = HashMap[String, LocalResource]()
+
+ if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
+ val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
+ val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
+ val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
+ val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
+ for( i <- 0 to distFiles.length - 1) {
+ setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
+ fileSizes(i), visibilities(i))
+ }
+ }
+
+ if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
+ val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
+ val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
+ val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
+ val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
+ for( i <- 0 to distArchives.length - 1) {
+ setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
+ timeStamps(i), fileSizes(i), visibilities(i))
+ }
+ }
+
+ logInfo("Prepared Local resources " + localResources)
+ localResources
+ }
+
+ private def prepareEnvironment: HashMap[String, String] = {
+ val env = new HashMap[String, String]()
+ val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
+ ClientBase.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
+
+ sparkConf.getExecutorEnv.foreach { case (key, value) =>
+ // This assumes each executor environment variable set here is a path
+ // This is kept for backward compatibility and consistency with hadoop
+ YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
+ }
+
+ // Keep this for backwards compatibility but users should move to the config
+ sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
+ YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
+ }
+
+ System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v }
+ env
+ }
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
deleted file mode 100644
index 22d73ecf6d..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{HashMap, ListBuffer}
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.util.Utils
-
-trait ExecutorRunnableUtil extends Logging {
-
- val yarnConf: YarnConfiguration
- val sparkConf: SparkConf
- lazy val env = prepareEnvironment
-
- def prepareCommand(
- masterAddress: String,
- slaveId: String,
- hostname: String,
- executorMemory: Int,
- executorCores: Int,
- appId: String,
- localResources: HashMap[String, LocalResource]): List[String] = {
- // Extra options for the JVM
- val javaOpts = ListBuffer[String]()
-
- // Set the environment variable through a command prefix
- // to append to the existing value of the variable
- var prefixEnv: Option[String] = None
-
- // Set the JVM memory
- val executorMemoryString = executorMemory + "m"
- javaOpts += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
-
- // Set extra Java options for the executor, if defined
- sys.props.get("spark.executor.extraJavaOptions").foreach { opts =>
- javaOpts += opts
- }
- sys.env.get("SPARK_JAVA_OPTS").foreach { opts =>
- javaOpts += opts
- }
- sys.props.get("spark.executor.extraLibraryPath").foreach { p =>
- prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(p)))
- }
-
- javaOpts += "-Djava.io.tmpdir=" +
- new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
-
- // Certain configs need to be passed here because they are needed before the Executor
- // registers with the Scheduler and transfers the spark configs. Since the Executor backend
- // uses Akka to connect to the scheduler, the akka settings are needed as well as the
- // authentication settings.
- sparkConf.getAll.
- filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
- foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
-
- sparkConf.getAkkaConf.
- foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
-
- // Commenting it out for now - so that people can refer to the properties if required. Remove
- // it once cpuset version is pushed out.
- // The context is, default gc for server class machines end up using all cores to do gc - hence
- // if there are multiple containers in same node, spark gc effects all other containers
- // performance (which can also be other spark containers)
- // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
- // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
- // of cores on a node.
- /*
- else {
- // If no java_opts specified, default to using -XX:+CMSIncrementalMode
- // It might be possible that other modes/config is being done in
- // spark.executor.extraJavaOptions, so we dont want to mess with it.
- // In our expts, using (default) throughput collector has severe perf ramnifications in
- // multi-tennent machines
- // The options are based on
- // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use
- // %20the%20Concurrent%20Low%20Pause%20Collector|outline
- javaOpts += " -XX:+UseConcMarkSweepGC "
- javaOpts += " -XX:+CMSIncrementalMode "
- javaOpts += " -XX:+CMSIncrementalPacing "
- javaOpts += " -XX:CMSIncrementalDutyCycleMin=0 "
- javaOpts += " -XX:CMSIncrementalDutyCycle=10 "
- }
- */
-
- // For log4j configuration to reference
- javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
-
- val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java",
- "-server",
- // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
- // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
- // an inconsistent state.
- // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
- // 'something' to fail job ... akin to blacklisting trackers in mapred ?
- "-XX:OnOutOfMemoryError='kill %p'") ++
- javaOpts ++
- Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
- masterAddress.toString,
- slaveId.toString,
- hostname.toString,
- executorCores.toString,
- appId,
- "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
- "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
-
- // TODO: it would be nicer to just make sure there are no null commands here
- commands.map(s => if (s == null) "null" else s).toList
- }
-
- private def setupDistributedCache(
- file: String,
- rtype: LocalResourceType,
- localResources: HashMap[String, LocalResource],
- timestamp: String,
- size: String,
- vis: String): Unit = {
- val uri = new URI(file)
- val amJarRsrc = Records.newRecord(classOf[LocalResource])
- amJarRsrc.setType(rtype)
- amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
- amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
- amJarRsrc.setTimestamp(timestamp.toLong)
- amJarRsrc.setSize(size.toLong)
- localResources(uri.getFragment()) = amJarRsrc
- }
-
- def prepareLocalResources: HashMap[String, LocalResource] = {
- logInfo("Preparing Local resources")
- val localResources = HashMap[String, LocalResource]()
-
- if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
- val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
- val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
- val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
- val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
- for( i <- 0 to distFiles.length - 1) {
- setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
- fileSizes(i), visibilities(i))
- }
- }
-
- if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
- val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
- val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
- val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
- val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
- for( i <- 0 to distArchives.length - 1) {
- setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
- timeStamps(i), fileSizes(i), visibilities(i))
- }
- }
-
- logInfo("Prepared Local resources " + localResources)
- localResources
- }
-
- def prepareEnvironment: HashMap[String, String] = {
- val env = new HashMap[String, String]()
- val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
- ClientBase.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
-
- sparkConf.getExecutorEnv.foreach { case (key, value) =>
- // This assumes each executor environment variable set here is a path
- // This is kept for backward compatibility and consistency with hadoop
- YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
- }
-
- // Keep this for backwards compatibility but users should move to the config
- sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
- YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
- }
-
- System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v }
- env
- }
-
-}