diff options
-rw-r--r-- | core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala | 33 | ||||
-rwxr-xr-x | run | 18 | ||||
-rw-r--r-- | run2.cmd | 7 |
3 files changed, 45 insertions, 13 deletions
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 04a774658e..4d31657d9e 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -77,9 +77,35 @@ private[spark] class ExecutorRunner( def buildCommandSeq(): Seq[String] = { val command = appDesc.command - val script = if (System.getProperty("os.name").startsWith("Windows")) "run.cmd" else "run" - val runScript = new File(sparkHome, script).getCanonicalPath - Seq(runScript, command.mainClass) ++ (command.arguments ++ Seq(appId)).map(substituteVariables) + val runner = if (System.getenv("JAVA_HOME") == null) { + "java" + } else { + System.getenv("JAVA_HOME") + "/bin/java" + } + // SPARK-698: do not call the run.cmd script, as process.destroy() + // fails to kill a process tree on Windows + Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++ + command.arguments.map(substituteVariables) + } + + /* + * Attention: this must always be aligned with the environment variables in the run scripts and the + * way the JAVA_OPTS are assembled there. + */ + def buildJavaOpts(): Seq[String] = { + val _javaLibPath = if (System.getenv("SPARK_LIBRARY_PATH") == null) { + "" + } else { + "-Djava.library.path=" + System.getenv("SPARK_LIBRARY_PATH") + } + + Seq("-cp", + System.getenv("CLASSPATH"), + System.getenv("SPARK_JAVA_OPTS"), + _javaLibPath, + "-Xms" + memory.toString + "M", + "-Xmx" + memory.toString + "M") + .filter(_ != null) } /** Spawn a thread that will redirect a given stream to a file */ @@ -115,7 +141,6 @@ private[spark] class ExecutorRunner( for ((key, value) <- appDesc.command.environment) { env.put(key, value) } - env.put("SPARK_MEM", memory.toString + "m") // In case we are running this from within the Spark Shell, avoid creating a "scala" // parent process for the executor command env.put("SPARK_LAUNCH_WITH_SCALA", "0") @@ -23,26 +23,29 @@ fi if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m} SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true" - SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS # Empty by default + # Do not overwrite SPARK_JAVA_OPTS environment variable in this script + OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS" # Empty by default +else + OUR_JAVA_OPTS="$SPARK_JAVA_OPTS" fi # Add java opts for master, worker, executor. The opts maybe null case "$1" in 'spark.deploy.master.Master') - SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_MASTER_OPTS" + OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_MASTER_OPTS" ;; 'spark.deploy.worker.Worker') - SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_WORKER_OPTS" + OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_WORKER_OPTS" ;; 'spark.executor.StandaloneExecutorBackend') - SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS" + OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS" ;; 'spark.executor.MesosExecutorBackend') - SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS" + OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS" ;; 'spark.repl.Main') - SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_REPL_OPTS" + OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_REPL_OPTS" ;; esac @@ -84,7 +87,7 @@ fi export SPARK_MEM # Set JAVA_OPTS to be able to load native libraries and to set heap size -JAVA_OPTS="$SPARK_JAVA_OPTS" +JAVA_OPTS="$OUR_JAVA_OPTS" JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH" JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM" # Load extra JAVA_OPTS from conf/java-opts, if it exists @@ -92,6 +95,7 @@ if [ -e $FWDIR/conf/java-opts ] ; then JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`" fi export JAVA_OPTS +# Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in ExecutorRunner.scala! CORE_DIR="$FWDIR/core" REPL_DIR="$FWDIR/repl" @@ -23,7 +23,9 @@ if "%1"=="spark.deploy.worker.Worker" set RUNNING_DAEMON=1 if "x%SPARK_DAEMON_MEMORY%" == "x" set SPARK_DAEMON_MEMORY=512m set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true if "%RUNNING_DAEMON%"=="1" set SPARK_MEM=%SPARK_DAEMON_MEMORY% -if "%RUNNING_DAEMON%"=="1" set SPARK_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% +rem Do not overwrite SPARK_JAVA_OPTS environment variable in this script +if "%RUNNING_DAEMON%"=="0" set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% +if "%RUNNING_DAEMON%"=="1" set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% rem Check that SCALA_HOME has been specified if not "x%SCALA_HOME%"=="x" goto scala_exists @@ -40,9 +42,10 @@ rem variable so that our process sees it and can report it to Mesos if "x%SPARK_MEM%"=="x" set SPARK_MEM=512m rem Set JAVA_OPTS to be able to load native libraries and to set heap size -set JAVA_OPTS=%SPARK_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%SPARK_MEM% -Xmx%SPARK_MEM% +set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%SPARK_MEM% -Xmx%SPARK_MEM% rem Load extra JAVA_OPTS from conf/java-opts, if it exists if exist "%FWDIR%conf\java-opts.cmd" call "%FWDIR%conf\java-opts.cmd" +rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in ExecutorRunner.scala! set CORE_DIR=%FWDIR%core set REPL_DIR=%FWDIR%repl |