From 6c1dee2e42587e5722bdec86bd102c120e2e1dee Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Fri, 15 Oct 2010 19:42:26 -0700 Subject: Added code so that Spark jobs can be launched from outside the Spark directory by setting SPARK_HOME and locating the executor relative to that. Entries on SPARK_CLASSPATH and SPARK_LIBRARY_PATH are also passed along to worker nodes. --- run | 14 ++++++++++---- src/scala/spark/MesosScheduler.scala | 31 +++++++++++++++++++++++++++++-- 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/run b/run index 8be8f73220..627f44a563 100755 --- a/run +++ b/run @@ -3,14 +3,20 @@ # Figure out where the Scala framework is installed FWDIR=`dirname $0` +# Export this as SPARK_HOME +export SPARK_HOME="$FWDIR" + # Load environment variables from conf/spark-env.sh, if it exists if [ -e $FWDIR/conf/spark-env.sh ] ; then . $FWDIR/conf/spark-env.sh fi +MESOS_CLASSPATH="" +MESOS_LIBRARY_PATH="" + if [ "x$MESOS_HOME" != "x" ] ; then - SPARK_CLASSPATH="$MESOS_HOME/lib/java/mesos.jar:$SPARK_CLASSPATH" - SPARK_LIBRARY_PATH="$MESOS_HOME/lib/java:$SPARK_LIBARY_PATH" + MESOS_CLASSPATH="$MESOS_HOME/lib/java/mesos.jar" + MESOS_LIBRARY_PATH="$MESOS_HOME/lib/java" fi if [ "x$SPARK_MEM" == "x" ] ; then @@ -19,7 +25,7 @@ fi # Set JAVA_OPTS to be able to load native libraries and to set heap size JAVA_OPTS="$SPARK_JAVA_OPTS" -JAVA_OPTS+=" -Djava.library.path=$SPARK_LIBRARY_PATH:$FWDIR/third_party:$FWDIR/src/native" +JAVA_OPTS+=" -Djava.library.path=$SPARK_LIBRARY_PATH:$FWDIR/third_party:$FWDIR/src/native:$MESOS_LIBRARY_PATH" JAVA_OPTS+=" -Xms$SPARK_MEM -Xmx$SPARK_MEM" # Load extra JAVA_OPTS from conf/java-opts, if it exists if [ -e $FWDIR/conf/java-opts ] ; then @@ -28,7 +34,7 @@ fi export JAVA_OPTS # Build up classpath -CLASSPATH="$SPARK_CLASSPATH:$FWDIR/build/classes" +CLASSPATH="$SPARK_CLASSPATH:$FWDIR/build/classes:$MESOS_CLASSPATH" CLASSPATH+=:$FWDIR/conf CLASSPATH+=:$FWDIR/third_party/mesos.jar CLASSPATH+=:$FWDIR/third_party/asm-3.2/lib/all/asm-all-3.2.jar diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala index 40680a625f..bc24bf37fd 100644 --- a/src/scala/spark/MesosScheduler.scala +++ b/src/scala/spark/MesosScheduler.scala @@ -28,6 +28,13 @@ private class MesosScheduler( master: String, frameworkName: String, execArg: Array[Byte]) extends MScheduler with spark.Scheduler with Logging { + // Environment variables to pass to our executors + val ENV_VARS_TO_SEND_TO_EXECUTORS = Array( + "SPARK_MEM", + "SPARK_CLASSPATH", + "SPARK_LIBRARY_PATH" + ) + // Lock used to wait for scheduler to be registered var isRegistered = false val registeredLock = new Object() @@ -70,8 +77,28 @@ extends MScheduler with spark.Scheduler with Logging override def getFrameworkName(d: SchedulerDriver): String = frameworkName - override def getExecutorInfo(d: SchedulerDriver): ExecutorInfo = - new ExecutorInfo(new File("spark-executor").getCanonicalPath(), execArg) + // Get Spark's home location from either the spark.home Java property + // or the SPARK_HOME environment variable (in that order of preference). + // If neither of these is set, throws an exception. + def getSparkHome(): String = { + if (System.getProperty("spark.home") != null) + System.getProperty("spark.home") + else if (System.getenv("SPARK_HOME") != null) + System.getenv("SPARK_HOME") + else + throw new SparkException("Spark home is not set; either set the " + + "spark.home system property or the SPARK_HOME environment variable") + } + + override def getExecutorInfo(d: SchedulerDriver): ExecutorInfo = { + val execScript = new File(getSparkHome, "spark-executor").getCanonicalPath + val params = new JHashMap[String, String] + for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) { + if (System.getenv(key) != null) + params(key) = System.getenv(key) + } + new ExecutorInfo(execScript, execArg) + } /** * The primary means to submit a job to the scheduler. Given a list of tasks, -- cgit v1.2.3