aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-10-13 13:08:44 -0700
committerDenny <dennybritz@gmail.com>2012-10-13 13:08:44 -0700
commit67c42a41d09b6f134d1d8d5387dd331fea49a737 (patch)
treeb4c48fa14f2dacee25fe023dc96563397712feb4 /core
parent5b7ee173e1757cdc5a0a42892d98ff3473e9dc8d (diff)
downloadspark-67c42a41d09b6f134d1d8d5387dd331fea49a737.tar.gz
spark-67c42a41d09b6f134d1d8d5387dd331fea49a737.tar.bz2
spark-67c42a41d09b6f134d1d8d5387dd331fea49a737.zip
Let the user specify environment variables to be passed to the Executors.
Also removed unused variables in the ExecutorRunner.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala12
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala17
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala19
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala19
5 files changed, 21 insertions, 48 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 8739c8bb6d..6b81a6f259 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -92,6 +92,12 @@ class SparkContext(master: String, jobName: String, val sparkHome: String, jars:
private[spark] val addedFiles = HashMap[String, Long]()
private[spark] val addedJars = HashMap[String, Long]()
+ // Environment variables to pass to our executors
+ private[spark] val executorEnvs = HashMap[String, String]()
+ Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH",
+ "SPARK_JAVA_OPTS", "SPARK_TESTING").foreach { key => executorEnvs.put(key, System.getenv(key)) }
+
+
// Add each JAR given through the constructor
jars.foreach { addJar(_) }
@@ -433,6 +439,12 @@ class SparkContext(master: String, jobName: String, val sparkHome: String, jars:
addedJars.clear()
}
+ /* Sets an environment variable that will be passed to the executors */
+ def putExecutorEnv(key: String, value: String) {
+ logInfo("Setting executor environment variable " + key + "=" + value)
+ executorEnvs.put(key,value)
+ }
+
/** Shut down the SparkContext. */
def stop() {
dagScheduler.stop()
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 73722a82e0..303cbbda00 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -118,8 +118,6 @@ private[spark] class ExecutorRunner(
for ((key, value) <- jobDesc.command.environment) {
env.put(key, value)
}
- env.put("SPARK_CORES", cores.toString)
- env.put("SPARK_MEMORY", memory.toString)
// In case we are running this from within the Spark Shell
// so we are not creating a parent process.
env.put("SPARK_LAUNCH_WITH_SCALA", "0")
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 88cb114544..7aba7324ab 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -20,15 +20,6 @@ private[spark] class SparkDeploySchedulerBackend(
val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
- // Environment variables to pass to our executors
- val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
- "SPARK_MEM",
- "SPARK_CLASSPATH",
- "SPARK_LIBRARY_PATH",
- "SPARK_JAVA_OPTS",
- "SPARK_TESTING"
- )
-
// Memory used by each executor (in megabytes)
val executorMemory = {
if (System.getenv("SPARK_MEM") != null) {
@@ -42,17 +33,11 @@ private[spark] class SparkDeploySchedulerBackend(
override def start() {
super.start()
- val environment = new HashMap[String, String]
- for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
- if (System.getenv(key) != null) {
- environment(key) = System.getenv(key)
- }
- }
val masterUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.master.host"), System.getProperty("spark.master.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
val args = Seq(masterUrl, "{{SLAVEID}}", "{{HOSTNAME}}", "{{CORES}}")
- val command = Command("spark.executor.StandaloneExecutorBackend", args, environment)
+ val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command)
client = new Client(sc.env.actorSystem, master, jobDesc, this)
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index e6d8b9d822..37746827a8 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -33,15 +33,6 @@ private[spark] class CoarseMesosSchedulerBackend(
with MScheduler
with Logging {
- // Environment variables to pass to our executors
- val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
- "SPARK_MEM",
- "SPARK_CLASSPATH",
- "SPARK_LIBRARY_PATH",
- "SPARK_JAVA_OPTS",
- "SPARK_TESTING"
- )
-
val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
// Memory used by each executor (in megabytes)
@@ -123,13 +114,11 @@ private[spark] class CoarseMesosSchedulerBackend(
val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
runScript, masterUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
val environment = Environment.newBuilder()
- for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
- if (System.getenv(key) != null) {
- environment.addVariables(Environment.Variable.newBuilder()
- .setName(key)
- .setValue(System.getenv(key))
+ sc.executorEnvs.foreach { case(k,v) =>
+ environment.addVariables(Environment.Variable.newBuilder()
+ .setName(k)
+ .setValue(System.getenv(v))
.build())
- }
}
return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build()
}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index 6f01c8c09d..f3bee8e1e7 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -29,15 +29,6 @@ private[spark] class MesosSchedulerBackend(
with MScheduler
with Logging {
- // Environment variables to pass to our executors
- val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
- "SPARK_MEM",
- "SPARK_CLASSPATH",
- "SPARK_LIBRARY_PATH",
- "SPARK_JAVA_OPTS",
- "SPARK_TESTING"
- )
-
// Memory used by each executor (in megabytes)
val EXECUTOR_MEMORY = {
if (System.getenv("SPARK_MEM") != null) {
@@ -94,13 +85,11 @@ private[spark] class MesosSchedulerBackend(
}
val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
val environment = Environment.newBuilder()
- for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
- if (System.getenv(key) != null) {
- environment.addVariables(Environment.Variable.newBuilder()
- .setName(key)
- .setValue(System.getenv(key))
+ sc.executorEnvs.foreach { case(k,v) =>
+ environment.addVariables(Environment.Variable.newBuilder()
+ .setName(k)
+ .setValue(System.getenv(v))
.build())
- }
}
val memory = Resource.newBuilder()
.setName("mem")