aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala33
-rwxr-xr-xrun18
-rw-r--r--run2.cmd7
3 files changed, 45 insertions, 13 deletions
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 04a774658e..4d31657d9e 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -77,9 +77,35 @@ private[spark] class ExecutorRunner(
def buildCommandSeq(): Seq[String] = {
val command = appDesc.command
- val script = if (System.getProperty("os.name").startsWith("Windows")) "run.cmd" else "run"
- val runScript = new File(sparkHome, script).getCanonicalPath
- Seq(runScript, command.mainClass) ++ (command.arguments ++ Seq(appId)).map(substituteVariables)
+ val runner = if (System.getenv("JAVA_HOME") == null) {
+ "java"
+ } else {
+ System.getenv("JAVA_HOME") + "/bin/java"
+ }
+ // SPARK-698: do not call the run.cmd script, as process.destroy()
+ // fails to kill a process tree on Windows
+ Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++
+ command.arguments.map(substituteVariables)
+ }
+
+ /*
+ * Attention: this must always be aligned with the environment variables in the run scripts and the
+ * way the JAVA_OPTS are assembled there.
+ */
+ def buildJavaOpts(): Seq[String] = {
+ val _javaLibPath = if (System.getenv("SPARK_LIBRARY_PATH") == null) {
+ ""
+ } else {
+ "-Djava.library.path=" + System.getenv("SPARK_LIBRARY_PATH")
+ }
+
+ Seq("-cp",
+ System.getenv("CLASSPATH"),
+ System.getenv("SPARK_JAVA_OPTS"),
+ _javaLibPath,
+ "-Xms" + memory.toString + "M",
+ "-Xmx" + memory.toString + "M")
+ .filter(_ != null)
}
/** Spawn a thread that will redirect a given stream to a file */
@@ -115,7 +141,6 @@ private[spark] class ExecutorRunner(
for ((key, value) <- appDesc.command.environment) {
env.put(key, value)
}
- env.put("SPARK_MEM", memory.toString + "m")
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
env.put("SPARK_LAUNCH_WITH_SCALA", "0")
diff --git a/run b/run
index e656e38ccf..0fb15f8b24 100755
--- a/run
+++ b/run
@@ -23,26 +23,29 @@ fi
if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then
SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m}
SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
- SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS # Empty by default
+ # Do not overwrite SPARK_JAVA_OPTS environment variable in this script
+ OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS" # Empty by default
+else
+ OUR_JAVA_OPTS="$SPARK_JAVA_OPTS"
fi
# Add java opts for master, worker, executor. The opts maybe null
case "$1" in
'spark.deploy.master.Master')
- SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_MASTER_OPTS"
+ OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_MASTER_OPTS"
;;
'spark.deploy.worker.Worker')
- SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_WORKER_OPTS"
+ OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_WORKER_OPTS"
;;
'spark.executor.StandaloneExecutorBackend')
- SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
+ OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
;;
'spark.executor.MesosExecutorBackend')
- SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
+ OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
;;
'spark.repl.Main')
- SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_REPL_OPTS"
+ OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_REPL_OPTS"
;;
esac
@@ -84,7 +87,7 @@ fi
export SPARK_MEM
# Set JAVA_OPTS to be able to load native libraries and to set heap size
-JAVA_OPTS="$SPARK_JAVA_OPTS"
+JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"
# Load extra JAVA_OPTS from conf/java-opts, if it exists
@@ -92,6 +95,7 @@ if [ -e $FWDIR/conf/java-opts ] ; then
JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
fi
export JAVA_OPTS
+# Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in ExecutorRunner.scala!
CORE_DIR="$FWDIR/core"
REPL_DIR="$FWDIR/repl"
diff --git a/run2.cmd b/run2.cmd
index c6f43dde5b..bf76844d11 100644
--- a/run2.cmd
+++ b/run2.cmd
@@ -23,7 +23,9 @@ if "%1"=="spark.deploy.worker.Worker" set RUNNING_DAEMON=1
if "x%SPARK_DAEMON_MEMORY%" == "x" set SPARK_DAEMON_MEMORY=512m
set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true
if "%RUNNING_DAEMON%"=="1" set SPARK_MEM=%SPARK_DAEMON_MEMORY%
-if "%RUNNING_DAEMON%"=="1" set SPARK_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS%
+rem Do not overwrite SPARK_JAVA_OPTS environment variable in this script
+if "%RUNNING_DAEMON%"=="0" set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS%
+if "%RUNNING_DAEMON%"=="1" set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS%
rem Check that SCALA_HOME has been specified
if not "x%SCALA_HOME%"=="x" goto scala_exists
@@ -40,9 +42,10 @@ rem variable so that our process sees it and can report it to Mesos
if "x%SPARK_MEM%"=="x" set SPARK_MEM=512m
rem Set JAVA_OPTS to be able to load native libraries and to set heap size
-set JAVA_OPTS=%SPARK_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%SPARK_MEM% -Xmx%SPARK_MEM%
+set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%SPARK_MEM% -Xmx%SPARK_MEM%
rem Load extra JAVA_OPTS from conf/java-opts, if it exists
if exist "%FWDIR%conf\java-opts.cmd" call "%FWDIR%conf\java-opts.cmd"
+rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in ExecutorRunner.scala!
set CORE_DIR=%FWDIR%core
set REPL_DIR=%FWDIR%repl