aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2010-10-15 19:42:26 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2010-10-15 19:42:26 -0700
commit6c1dee2e42587e5722bdec86bd102c120e2e1dee (patch)
tree31f0dd2a472119dd5895d90222bd34512f0ab782
parentecb1af576e5e0be9f67cf0375f998d1fa7ac3a07 (diff)
downloadspark-6c1dee2e42587e5722bdec86bd102c120e2e1dee.tar.gz
spark-6c1dee2e42587e5722bdec86bd102c120e2e1dee.tar.bz2
spark-6c1dee2e42587e5722bdec86bd102c120e2e1dee.zip
Added code so that Spark jobs can be launched from outside the Spark
directory by setting SPARK_HOME and locating the executor relative to that. Entries on SPARK_CLASSPATH and SPARK_LIBRARY_PATH are also passed along to worker nodes.
-rwxr-xr-xrun14
-rw-r--r--src/scala/spark/MesosScheduler.scala31
2 files changed, 39 insertions, 6 deletions
diff --git a/run b/run
index 8be8f73220..627f44a563 100755
--- a/run
+++ b/run
@@ -3,14 +3,20 @@
# Figure out where the Scala framework is installed
FWDIR=`dirname $0`
+# Export this as SPARK_HOME
+export SPARK_HOME="$FWDIR"
+
# Load environment variables from conf/spark-env.sh, if it exists
if [ -e $FWDIR/conf/spark-env.sh ] ; then
. $FWDIR/conf/spark-env.sh
fi
+MESOS_CLASSPATH=""
+MESOS_LIBRARY_PATH=""
+
if [ "x$MESOS_HOME" != "x" ] ; then
- SPARK_CLASSPATH="$MESOS_HOME/lib/java/mesos.jar:$SPARK_CLASSPATH"
- SPARK_LIBRARY_PATH="$MESOS_HOME/lib/java:$SPARK_LIBARY_PATH"
+ MESOS_CLASSPATH="$MESOS_HOME/lib/java/mesos.jar"
+ MESOS_LIBRARY_PATH="$MESOS_HOME/lib/java"
fi
if [ "x$SPARK_MEM" == "x" ] ; then
@@ -19,7 +25,7 @@ fi
# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$SPARK_JAVA_OPTS"
-JAVA_OPTS+=" -Djava.library.path=$SPARK_LIBRARY_PATH:$FWDIR/third_party:$FWDIR/src/native"
+JAVA_OPTS+=" -Djava.library.path=$SPARK_LIBRARY_PATH:$FWDIR/third_party:$FWDIR/src/native:$MESOS_LIBRARY_PATH"
JAVA_OPTS+=" -Xms$SPARK_MEM -Xmx$SPARK_MEM"
# Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e $FWDIR/conf/java-opts ] ; then
@@ -28,7 +34,7 @@ fi
export JAVA_OPTS
# Build up classpath
-CLASSPATH="$SPARK_CLASSPATH:$FWDIR/build/classes"
+CLASSPATH="$SPARK_CLASSPATH:$FWDIR/build/classes:$MESOS_CLASSPATH"
CLASSPATH+=:$FWDIR/conf
CLASSPATH+=:$FWDIR/third_party/mesos.jar
CLASSPATH+=:$FWDIR/third_party/asm-3.2/lib/all/asm-all-3.2.jar
diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala
index 40680a625f..bc24bf37fd 100644
--- a/src/scala/spark/MesosScheduler.scala
+++ b/src/scala/spark/MesosScheduler.scala
@@ -28,6 +28,13 @@ private class MesosScheduler(
master: String, frameworkName: String, execArg: Array[Byte])
extends MScheduler with spark.Scheduler with Logging
{
+ // Environment variables to pass to our executors
+ val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
+ "SPARK_MEM",
+ "SPARK_CLASSPATH",
+ "SPARK_LIBRARY_PATH"
+ )
+
// Lock used to wait for scheduler to be registered
var isRegistered = false
val registeredLock = new Object()
@@ -70,8 +77,28 @@ extends MScheduler with spark.Scheduler with Logging
override def getFrameworkName(d: SchedulerDriver): String = frameworkName
- override def getExecutorInfo(d: SchedulerDriver): ExecutorInfo =
- new ExecutorInfo(new File("spark-executor").getCanonicalPath(), execArg)
+ // Get Spark's home location from either the spark.home Java property
+ // or the SPARK_HOME environment variable (in that order of preference).
+ // If neither of these is set, throws an exception.
+ def getSparkHome(): String = {
+ if (System.getProperty("spark.home") != null)
+ System.getProperty("spark.home")
+ else if (System.getenv("SPARK_HOME") != null)
+ System.getenv("SPARK_HOME")
+ else
+ throw new SparkException("Spark home is not set; either set the " +
+ "spark.home system property or the SPARK_HOME environment variable")
+ }
+
+ override def getExecutorInfo(d: SchedulerDriver): ExecutorInfo = {
+ val execScript = new File(getSparkHome, "spark-executor").getCanonicalPath
+ val params = new JHashMap[String, String]
+ for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
+ if (System.getenv(key) != null)
+ params(key) = System.getenv(key)
+ }
+ new ExecutorInfo(execScript, execArg)
+ }
/**
* The primary means to submit a job to the scheduler. Given a list of tasks,