aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala49
1 files changed, 30 insertions, 19 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index 96f8aa9339..32f8861dc9 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -21,7 +21,7 @@ import java.io.File
import java.net.URI
import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{HashMap, ListBuffer}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.api._
@@ -44,9 +44,9 @@ trait ExecutorRunnableUtil extends Logging {
hostname: String,
executorMemory: Int,
executorCores: Int,
- localResources: HashMap[String, LocalResource]) = {
+ localResources: HashMap[String, LocalResource]): List[String] = {
// Extra options for the JVM
- var JAVA_OPTS = ""
+ val JAVA_OPTS = ListBuffer[String]()
// Set the JVM memory
val executorMemoryString = executorMemory + "m"
JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
@@ -56,10 +56,21 @@ trait ExecutorRunnableUtil extends Logging {
JAVA_OPTS += opts
}
- JAVA_OPTS += " -Djava.io.tmpdir=" +
- new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
+ JAVA_OPTS += "-Djava.io.tmpdir=" +
+ new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
+ // Certain configs need to be passed here because they are needed before the Executor
+ // registers with the Scheduler and transfers the spark configs. Since the Executor backend
+ // uses Akka to connect to the scheduler, the akka settings are needed as well as the
+ // authentication settings.
+ sparkConf.getAll.
+ filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
+ foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }
+
+ sparkConf.getAkkaConf.
+ foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }
+
// Commenting it out for now - so that people can refer to the properties if required. Remove
// it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence
@@ -85,25 +96,25 @@ trait ExecutorRunnableUtil extends Logging {
}
*/
- val commands = List[String](
- Environment.JAVA_HOME.$() + "/bin/java" +
- " -server " +
+ val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java",
+ "-server",
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
// Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
// an inconsistent state.
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do
// 'something' to fail job ... akin to blacklisting trackers in mapred ?
- " -XX:OnOutOfMemoryError='kill %p' " +
- JAVA_OPTS +
- " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
- masterAddress + " " +
- slaveId + " " +
- hostname + " " +
- executorCores +
- " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
- " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
-
- commands
+ "-XX:OnOutOfMemoryError='kill %p'") ++
+ JAVA_OPTS ++
+ Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
+ masterAddress.toString,
+ slaveId.toString,
+ hostname.toString,
+ executorCores.toString,
+ "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+ "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+
+ // TODO: it would be nicer to just make sure there are no null commands here
+ commands.map(s => if (s == null) "null" else s).toList
}
private def setupDistributedCache(