diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-13 14:52:14 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-13 14:52:14 -0700 |
commit | 84979499dba39e28242c34fc3a3278af271aabfc (patch) | |
tree | 5152d033e3ef2901d000b4f72de3d788ebef3f79 | |
parent | 5b7ee173e1757cdc5a0a42892d98ff3473e9dc8d (diff) | |
parent | 0700d1920a4a33648b7461f1ca64e3a94638ad65 (diff) | |
download | spark-84979499dba39e28242c34fc3a3278af271aabfc.tar.gz spark-84979499dba39e28242c34fc3a3278af271aabfc.tar.bz2 spark-84979499dba39e28242c34fc3a3278af271aabfc.zip |
Merge pull request #273 from dennybritz/executorVars
Let the user specify environment variables to be passed to the Executors
5 files changed, 30 insertions, 45 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 8739c8bb6d..6b81a6f259 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -92,6 +92,12 @@ class SparkContext(master: String, jobName: String, val sparkHome: String, jars: private[spark] val addedFiles = HashMap[String, Long]() private[spark] val addedJars = HashMap[String, Long]() + // Environment variables to pass to our executors + private[spark] val executorEnvs = HashMap[String, String]() + Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", + "SPARK_JAVA_OPTS", "SPARK_TESTING").foreach { key => executorEnvs.put(key, System.getenv(key)) } + + // Add each JAR given through the constructor jars.foreach { addJar(_) } @@ -433,6 +439,12 @@ class SparkContext(master: String, jobName: String, val sparkHome: String, jars: addedJars.clear() } + /* Sets an environment variable that will be passed to the executors */ + def putExecutorEnv(key: String, value: String) { + logInfo("Setting executor environment variable " + key + "=" + value) + executorEnvs.put(key,value) + } + /** Shut down the SparkContext. */ def stop() { dagScheduler.stop() diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 73722a82e0..637e763c9e 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -116,10 +116,12 @@ private[spark] class ExecutorRunner( val builder = new ProcessBuilder(command: _*).directory(executorDir) val env = builder.environment() for ((key, value) <- jobDesc.command.environment) { - env.put(key, value) + if (value == null) { + logInfo("Environment variable not set: " + key) + } else { + env.put(key, value) + } } - env.put("SPARK_CORES", cores.toString) - env.put("SPARK_MEMORY", memory.toString) // In case we are running this from within the Spark Shell // so we are not creating a parent process. env.put("SPARK_LAUNCH_WITH_SCALA", "0") diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 88cb114544..7aba7324ab 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -20,15 +20,6 @@ private[spark] class SparkDeploySchedulerBackend( val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt - // Environment variables to pass to our executors - val ENV_VARS_TO_SEND_TO_EXECUTORS = Array( - "SPARK_MEM", - "SPARK_CLASSPATH", - "SPARK_LIBRARY_PATH", - "SPARK_JAVA_OPTS", - "SPARK_TESTING" - ) - // Memory used by each executor (in megabytes) val executorMemory = { if (System.getenv("SPARK_MEM") != null) { @@ -42,17 +33,11 @@ private[spark] class SparkDeploySchedulerBackend( override def start() { super.start() - val environment = new HashMap[String, String] - for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) { - if (System.getenv(key) != null) { - environment(key) = System.getenv(key) - } - } val masterUrl = "akka://spark@%s:%s/user/%s".format( System.getProperty("spark.master.host"), System.getProperty("spark.master.port"), StandaloneSchedulerBackend.ACTOR_NAME) val args = Seq(masterUrl, "{{SLAVEID}}", "{{HOSTNAME}}", "{{CORES}}") - val command = Command("spark.executor.StandaloneExecutorBackend", args, environment) + val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs) val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command) client = new Client(sc.env.actorSystem, master, jobDesc, this) diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala index e6d8b9d822..29dd36be15 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala @@ -33,15 +33,6 @@ private[spark] class CoarseMesosSchedulerBackend( with MScheduler with Logging { - // Environment variables to pass to our executors - val ENV_VARS_TO_SEND_TO_EXECUTORS = Array( - "SPARK_MEM", - "SPARK_CLASSPATH", - "SPARK_LIBRARY_PATH", - "SPARK_JAVA_OPTS", - "SPARK_TESTING" - ) - val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures // Memory used by each executor (in megabytes) @@ -123,13 +114,15 @@ private[spark] class CoarseMesosSchedulerBackend( val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format( runScript, masterUrl, offer.getSlaveId.getValue, offer.getHostname, numCores) val environment = Environment.newBuilder() - for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) { - if (System.getenv(key) != null) { + sc.executorEnvs.foreach { case(key, value) => + if (value == null) { + logInfo("Environment variable not set: " + key) + } else { environment.addVariables(Environment.Variable.newBuilder() .setName(key) - .setValue(System.getenv(key)) + .setValue(value) .build()) - } + } } return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build() } diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index 6f01c8c09d..c4aee5c9cb 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -29,15 +29,6 @@ private[spark] class MesosSchedulerBackend( with MScheduler with Logging { - // Environment variables to pass to our executors - val ENV_VARS_TO_SEND_TO_EXECUTORS = Array( - "SPARK_MEM", - "SPARK_CLASSPATH", - "SPARK_LIBRARY_PATH", - "SPARK_JAVA_OPTS", - "SPARK_TESTING" - ) - // Memory used by each executor (in megabytes) val EXECUTOR_MEMORY = { if (System.getenv("SPARK_MEM") != null) { @@ -94,13 +85,15 @@ private[spark] class MesosSchedulerBackend( } val execScript = new File(sparkHome, "spark-executor").getCanonicalPath val environment = Environment.newBuilder() - for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) { - if (System.getenv(key) != null) { + sc.executorEnvs.foreach { case(key, value) => + if (value == null) { + logInfo("Environment variable not set: " + key) + } else { environment.addVariables(Environment.Variable.newBuilder() .setName(key) - .setValue(System.getenv(key)) + .setValue(value) .build()) - } + } } val memory = Resource.newBuilder() .setName("mem") |