aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-13 14:52:14 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-13 14:52:14 -0700
commit84979499dba39e28242c34fc3a3278af271aabfc (patch)
tree5152d033e3ef2901d000b4f72de3d788ebef3f79
parent5b7ee173e1757cdc5a0a42892d98ff3473e9dc8d (diff)
parent0700d1920a4a33648b7461f1ca64e3a94638ad65 (diff)
downloadspark-84979499dba39e28242c34fc3a3278af271aabfc.tar.gz
spark-84979499dba39e28242c34fc3a3278af271aabfc.tar.bz2
spark-84979499dba39e28242c34fc3a3278af271aabfc.zip
Merge pull request #273 from dennybritz/executorVars
Let the user specify environment variables to be passed to the Executors
-rw-r--r--core/src/main/scala/spark/SparkContext.scala12
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala8
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala17
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala19
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala19
5 files changed, 30 insertions, 45 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 8739c8bb6d..6b81a6f259 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -92,6 +92,12 @@ class SparkContext(master: String, jobName: String, val sparkHome: String, jars:
private[spark] val addedFiles = HashMap[String, Long]()
private[spark] val addedJars = HashMap[String, Long]()
+ // Environment variables to pass to our executors
+ private[spark] val executorEnvs = HashMap[String, String]()
+ Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH",
+ "SPARK_JAVA_OPTS", "SPARK_TESTING").foreach { key => executorEnvs.put(key, System.getenv(key)) }
+
+
// Add each JAR given through the constructor
jars.foreach { addJar(_) }
@@ -433,6 +439,12 @@ class SparkContext(master: String, jobName: String, val sparkHome: String, jars:
addedJars.clear()
}
+ /* Sets an environment variable that will be passed to the executors */
+ def putExecutorEnv(key: String, value: String) {
+ logInfo("Setting executor environment variable " + key + "=" + value)
+ executorEnvs.put(key,value)
+ }
+
/** Shut down the SparkContext. */
def stop() {
dagScheduler.stop()
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 73722a82e0..637e763c9e 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -116,10 +116,12 @@ private[spark] class ExecutorRunner(
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
for ((key, value) <- jobDesc.command.environment) {
- env.put(key, value)
+ if (value == null) {
+ logInfo("Environment variable not set: " + key)
+ } else {
+ env.put(key, value)
+ }
}
- env.put("SPARK_CORES", cores.toString)
- env.put("SPARK_MEMORY", memory.toString)
// In case we are running this from within the Spark Shell
// so we are not creating a parent process.
env.put("SPARK_LAUNCH_WITH_SCALA", "0")
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 88cb114544..7aba7324ab 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -20,15 +20,6 @@ private[spark] class SparkDeploySchedulerBackend(
val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
- // Environment variables to pass to our executors
- val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
- "SPARK_MEM",
- "SPARK_CLASSPATH",
- "SPARK_LIBRARY_PATH",
- "SPARK_JAVA_OPTS",
- "SPARK_TESTING"
- )
-
// Memory used by each executor (in megabytes)
val executorMemory = {
if (System.getenv("SPARK_MEM") != null) {
@@ -42,17 +33,11 @@ private[spark] class SparkDeploySchedulerBackend(
override def start() {
super.start()
- val environment = new HashMap[String, String]
- for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
- if (System.getenv(key) != null) {
- environment(key) = System.getenv(key)
- }
- }
val masterUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.master.host"), System.getProperty("spark.master.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
val args = Seq(masterUrl, "{{SLAVEID}}", "{{HOSTNAME}}", "{{CORES}}")
- val command = Command("spark.executor.StandaloneExecutorBackend", args, environment)
+ val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command)
client = new Client(sc.env.actorSystem, master, jobDesc, this)
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index e6d8b9d822..29dd36be15 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -33,15 +33,6 @@ private[spark] class CoarseMesosSchedulerBackend(
with MScheduler
with Logging {
- // Environment variables to pass to our executors
- val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
- "SPARK_MEM",
- "SPARK_CLASSPATH",
- "SPARK_LIBRARY_PATH",
- "SPARK_JAVA_OPTS",
- "SPARK_TESTING"
- )
-
val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
// Memory used by each executor (in megabytes)
@@ -123,13 +114,15 @@ private[spark] class CoarseMesosSchedulerBackend(
val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
runScript, masterUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
val environment = Environment.newBuilder()
- for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
- if (System.getenv(key) != null) {
+ sc.executorEnvs.foreach { case(key, value) =>
+ if (value == null) {
+ logInfo("Environment variable not set: " + key)
+ } else {
environment.addVariables(Environment.Variable.newBuilder()
.setName(key)
- .setValue(System.getenv(key))
+ .setValue(value)
.build())
- }
+ }
}
return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build()
}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index 6f01c8c09d..c4aee5c9cb 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -29,15 +29,6 @@ private[spark] class MesosSchedulerBackend(
with MScheduler
with Logging {
- // Environment variables to pass to our executors
- val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
- "SPARK_MEM",
- "SPARK_CLASSPATH",
- "SPARK_LIBRARY_PATH",
- "SPARK_JAVA_OPTS",
- "SPARK_TESTING"
- )
-
// Memory used by each executor (in megabytes)
val EXECUTOR_MEMORY = {
if (System.getenv("SPARK_MEM") != null) {
@@ -94,13 +85,15 @@ private[spark] class MesosSchedulerBackend(
}
val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
val environment = Environment.newBuilder()
- for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
- if (System.getenv(key) != null) {
+ sc.executorEnvs.foreach { case(key, value) =>
+ if (value == null) {
+ logInfo("Environment variable not set: " + key)
+ } else {
environment.addVariables(Environment.Variable.newBuilder()
.setName(key)
- .setValue(System.getenv(key))
+ .setValue(value)
.build())
- }
+ }
}
val memory = Resource.newBuilder()
.setName("mem")