aboutsummaryrefslogtreecommitdiff
path: root/yarn/alpha
diff options
context:
space:
mode:
authorRaymond Liu <raymond.liu@intel.com>2014-01-09 14:16:07 +0800
committerRaymond Liu <raymond.liu@intel.com>2014-01-14 10:36:00 +0800
commit161ab93989d6fafdfb772e9230df984a5f902c7d (patch)
tree69f62d68ed723258f93afeaec89266fe017dd632 /yarn/alpha
parent79a5ba349779a94d16e3e5ce37ffabc47bd1b5a0 (diff)
downloadspark-161ab93989d6fafdfb772e9230df984a5f902c7d.tar.gz
spark-161ab93989d6fafdfb772e9230df984a5f902c7d.tar.bz2
spark-161ab93989d6fafdfb772e9230df984a5f902c7d.zip
Yarn workerRunnable refactor
Diffstat (limited to 'yarn/alpha')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala126
1 files changed, 4 insertions, 122 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 1c28d6c86d..8c686e393f 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -25,12 +25,10 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -43,16 +41,17 @@ import org.apache.spark.{SparkConf, Logging}
class WorkerRunnable(
container: Container,
conf: Configuration,
- sparkConf: SparkConf,
+ spConf: SparkConf,
masterAddress: String,
slaveId: String,
hostname: String,
workerMemory: Int,
workerCores: Int)
- extends Runnable with Logging {
+ extends Runnable with WorkerRunnableUtil with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
var cm: ContainerManager = _
+ val sparkConf = spConf
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
def run = {
@@ -75,43 +74,6 @@ class WorkerRunnable(
val env = prepareEnvironment
ctx.setEnvironment(env)
- // Extra options for the JVM
- var JAVA_OPTS = ""
- // Set the JVM memory
- val workerMemoryString = workerMemory + "m"
- JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " "
- if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
- JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
- }
-
- JAVA_OPTS += " -Djava.io.tmpdir=" +
- new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
-
- // Commenting it out for now - so that people can refer to the properties if required. Remove
- // it once cpuset version is pushed out.
- // The context is, default gc for server class machines end up using all cores to do gc - hence
- // if there are multiple containers in same node, spark gc effects all other containers
- // performance (which can also be other spark containers)
- // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
- // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
- // of cores on a node.
-/*
- else {
- // If no java_opts specified, default to using -XX:+CMSIncrementalMode
- // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont
- // want to mess with it.
- // In our expts, using (default) throughput collector has severe perf ramnifications in
- // multi-tennent machines
- // The options are based on
- // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
- JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
- JAVA_OPTS += " -XX:+CMSIncrementalMode "
- JAVA_OPTS += " -XX:+CMSIncrementalPacing "
- JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
- JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
- }
-*/
-
ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
@@ -119,28 +81,7 @@ class WorkerRunnable(
credentials.writeTokenStorageToStream(dob)
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
- var javaCommand = "java"
- val javaHome = System.getenv("JAVA_HOME")
- if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
- javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
- }
-
- val commands = List[String](javaCommand +
- " -server " +
- // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
- // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in
- // an inconsistent state.
- // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
- // 'something' to fail job ... akin to blacklisting trackers in mapred ?
- " -XX:OnOutOfMemoryError='kill %p' " +
- JAVA_OPTS +
- " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
- masterAddress + " " +
- slaveId + " " +
- hostname + " " +
- workerCores +
- " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
- " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+ val commands = prepareCommand(masterAddress, slaveId, hostname, workerMemory, workerCores)
logInfo("Setting up worker with commands: " + commands)
ctx.setCommands(commands)
@@ -151,65 +92,6 @@ class WorkerRunnable(
cm.startContainer(startReq)
}
- private def setupDistributedCache(
- file: String,
- rtype: LocalResourceType,
- localResources: HashMap[String, LocalResource],
- timestamp: String,
- size: String,
- vis: String) = {
- val uri = new URI(file)
- val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
- amJarRsrc.setType(rtype)
- amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
- amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
- amJarRsrc.setTimestamp(timestamp.toLong)
- amJarRsrc.setSize(size.toLong)
- localResources(uri.getFragment()) = amJarRsrc
- }
-
- def prepareLocalResources: HashMap[String, LocalResource] = {
- logInfo("Preparing Local resources")
- val localResources = HashMap[String, LocalResource]()
-
- if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
- val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
- val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
- val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
- val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
- for( i <- 0 to distFiles.length - 1) {
- setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
- fileSizes(i), visibilities(i))
- }
- }
-
- if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
- val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
- val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
- val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
- val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
- for( i <- 0 to distArchives.length - 1) {
- setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
- timeStamps(i), fileSizes(i), visibilities(i))
- }
- }
-
- logInfo("Prepared Local resources " + localResources)
- localResources
- }
-
- def prepareEnvironment: HashMap[String, String] = {
- val env = new HashMap[String, String]()
-
- ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
-
- // Allow users to specify some environment variables
- Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
-
- System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
- env
- }
-
def connectToCM: ContainerManager = {
val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)