aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bin/compute-classpath.cmd3
-rw-r--r--[-rwxr-xr-x]bin/spark-class2.cmd46
-rwxr-xr-xbin/spark-submit2
-rw-r--r--bin/spark-submit.cmd34
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala19
-rw-r--r--python/pyspark/java_gateway.py17
6 files changed, 95 insertions, 26 deletions
diff --git a/bin/compute-classpath.cmd b/bin/compute-classpath.cmd
index 58710cd1bd..5ad52452a5 100644
--- a/bin/compute-classpath.cmd
+++ b/bin/compute-classpath.cmd
@@ -36,7 +36,8 @@ rem Load environment variables from conf\spark-env.cmd, if it exists
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
rem Build up classpath
-set CLASSPATH=%FWDIR%conf
+set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%;%FWDIR%conf
+
if exist "%FWDIR%RELEASE" (
for %%d in ("%FWDIR%lib\spark-assembly*.jar") do (
set ASSEMBLY_JAR=%%d
diff --git a/bin/spark-class2.cmd b/bin/spark-class2.cmd
index e2c5f9c385..6c56728191 100755..100644
--- a/bin/spark-class2.cmd
+++ b/bin/spark-class2.cmd
@@ -17,6 +17,8 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
+rem Any changes to this file must be reflected in SparkSubmitDriverBootstrapper.scala!
+
setlocal enabledelayedexpansion
set SCALA_VERSION=2.10
@@ -38,7 +40,7 @@ if not "x%1"=="x" goto arg_given
if not "x%SPARK_MEM%"=="x" (
echo Warning: SPARK_MEM is deprecated, please use a more specific config option
- echo e.g., spark.executor.memory or SPARK_DRIVER_MEMORY.
+ echo e.g., spark.executor.memory or spark.driver.memory.
)
rem Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
@@ -67,10 +69,18 @@ rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS%
if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY%
-rem All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS.
-) else if "%1"=="org.apache.spark.repl.Main" (
- set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_REPL_OPTS%
+rem Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
+rem SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY.
+rem The repl also uses SPARK_REPL_OPTS.
+) else if "%1"=="org.apache.spark.deploy.SparkSubmit" (
+ set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_SUBMIT_OPTS% %SPARK_REPL_OPTS%
+ if not "x%SPARK_SUBMIT_LIBRARY_PATH%"=="x" (
+ set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_SUBMIT_LIBRARY_PATH%
+ ) else if not "x%SPARK_LIBRARY_PATH%"=="x" (
+ set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_LIBRARY_PATH%
+ )
if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
+ if not "x%SPARK_SUBMIT_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_SUBMIT_DRIVER_MEMORY%
) else (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS%
if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
@@ -80,9 +90,9 @@ rem Set JAVA_OPTS to be able to load native libraries and to set heap size
for /f "tokens=3" %%i in ('java -version 2^>^&1 ^| find "version"') do set jversion=%%i
for /f "tokens=1 delims=_" %%i in ("%jversion:~1,-1%") do set jversion=%%i
if "%jversion%" geq "1.8.0" (
- set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
+ set JAVA_OPTS=%OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
) else (
- set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
+ set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
)
rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala!
@@ -115,5 +125,27 @@ rem Figure out where java is.
set RUNNER=java
if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
-"%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %*
+rem In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself.
+rem Here we must parse the properties file for relevant "spark.driver.*" configs before launching
+rem the driver JVM itself. Instead of handling this complexity here, we launch a separate JVM
+rem to prepare the launch environment of this driver JVM.
+
+rem In this case, leave out the main class (org.apache.spark.deploy.SparkSubmit) and use our own.
+rem Leaving out the first argument is surprisingly difficult to do in Windows. Note that this must
+rem be done here because the Windows "shift" command does not work in a conditional block.
+set BOOTSTRAP_ARGS=
+shift
+:start_parse
+if "%~1" == "" goto end_parse
+set BOOTSTRAP_ARGS=%BOOTSTRAP_ARGS% %~1
+shift
+goto start_parse
+:end_parse
+
+if not [%SPARK_SUBMIT_BOOTSTRAP_DRIVER%] == [] (
+ set SPARK_CLASS=1
+ "%RUNNER%" org.apache.spark.deploy.SparkSubmitDriverBootstrapper %BOOTSTRAP_ARGS%
+) else (
+ "%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %*
+)
:exit
diff --git a/bin/spark-submit b/bin/spark-submit
index 32c911cd04..277c4ce571 100755
--- a/bin/spark-submit
+++ b/bin/spark-submit
@@ -17,7 +17,7 @@
# limitations under the License.
#
-# NOTE: Any changes in this file must be reflected in SparkClassLauncher.scala!
+# NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala!
export SPARK_HOME="$(cd `dirname $0`/..; pwd)"
ORIG_ARGS=("$@")
diff --git a/bin/spark-submit.cmd b/bin/spark-submit.cmd
index 6eb702ed8c..cf6046d154 100644
--- a/bin/spark-submit.cmd
+++ b/bin/spark-submit.cmd
@@ -17,23 +17,28 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
+rem NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala!
+
set SPARK_HOME=%~dp0..
set ORIG_ARGS=%*
-rem Clear the values of all variables used
-set DEPLOY_MODE=
-set DRIVER_MEMORY=
+rem Reset the values of all variables used
+set SPARK_SUBMIT_DEPLOY_MODE=client
+set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_HOME%\conf\spark-defaults.conf
+set SPARK_SUBMIT_DRIVER_MEMORY=
set SPARK_SUBMIT_LIBRARY_PATH=
set SPARK_SUBMIT_CLASSPATH=
set SPARK_SUBMIT_OPTS=
-set SPARK_DRIVER_MEMORY=
+set SPARK_SUBMIT_BOOTSTRAP_DRIVER=
:loop
if [%1] == [] goto continue
if [%1] == [--deploy-mode] (
- set DEPLOY_MODE=%2
+ set SPARK_SUBMIT_DEPLOY_MODE=%2
+ ) else if [%1] == [--properties-file] (
+ set SPARK_SUBMIT_PROPERTIES_FILE=%2
) else if [%1] == [--driver-memory] (
- set DRIVER_MEMORY=%2
+ set SPARK_SUBMIT_DRIVER_MEMORY=%2
) else if [%1] == [--driver-library-path] (
set SPARK_SUBMIT_LIBRARY_PATH=%2
) else if [%1] == [--driver-class-path] (
@@ -45,12 +50,19 @@ if [%1] == [] goto continue
goto loop
:continue
-if [%DEPLOY_MODE%] == [] (
- set DEPLOY_MODE=client
-)
+rem For client mode, the driver will be launched in the same JVM that launches
+rem SparkSubmit, so we may need to read the properties file for any extra class
+rem paths, library paths, java options and memory early on. Otherwise, it will
+rem be too late by the time the driver JVM has started.
-if not [%DRIVER_MEMORY%] == [] if [%DEPLOY_MODE%] == [client] (
- set SPARK_DRIVER_MEMORY=%DRIVER_MEMORY%
+if [%SPARK_SUBMIT_DEPLOY_MODE%] == [client] (
+ if exist %SPARK_SUBMIT_PROPERTIES_FILE% (
+ rem Parse the properties file only if the special configs exist
+ for /f %%i in ('findstr /r /c:"^[\t ]*spark.driver.memory" /c:"^[\t ]*spark.driver.extra" ^
+ %SPARK_SUBMIT_PROPERTIES_FILE%') do (
+ set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
+ )
+ )
)
cmd /V /E /C %SPARK_HOME%\bin\spark-class.cmd org.apache.spark.deploy.SparkSubmit %ORIG_ARGS%
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
index af607e6a4a..7ca96ed57c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -133,17 +133,24 @@ private[spark] object SparkSubmitDriverBootstrapper {
val process = builder.start()
// Redirect stdin, stdout, and stderr to/from the child JVM
- val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
- stdinThread.start()
stdoutThread.start()
stderrThread.start()
- // Terminate on broken pipe, which signals that the parent process has exited. This is
- // important for the PySpark shell, where Spark submit itself is a python subprocess.
- stdinThread.join()
- process.destroy()
+ // In Windows, the subprocess reads directly from our stdin, so we should avoid spawning
+ // a thread that contends with the subprocess in reading from System.in.
+ if (Utils.isWindows) {
+ // For the PySpark shell, the termination of this process is handled in java_gateway.py
+ process.waitFor()
+ } else {
+ // Terminate on broken pipe, which signals that the parent process has exited. This is
+ // important for the PySpark shell, where Spark submit itself is a python subprocess.
+ val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
+ stdinThread.start()
+ stdinThread.join()
+ process.destroy()
+ }
}
}
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 6f4f62f23b..9c70fa5c16 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+import atexit
import os
import sys
import signal
@@ -69,6 +70,22 @@ def launch_gateway():
error_msg += "--------------------------------------------------------------\n"
raise Exception(error_msg)
+ # In Windows, ensure the Java child processes do not linger after Python has exited.
+ # In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when
+ # the parent process' stdin sends an EOF). In Windows, however, this is not possible
+ # because java.lang.Process reads directly from the parent process' stdin, contending
+ # with any opportunity to read an EOF from the parent. Note that this is only best
+ # effort and will not take effect if the python process is violently terminated.
+ if on_windows:
+ # In Windows, the child process here is "spark-submit.cmd", not the JVM itself
+ # (because the UNIX "exec" command is not available). This means we cannot simply
+ # call proc.kill(), which kills only the "spark-submit.cmd" process but not the
+ # JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all
+ # child processes in the tree (http://technet.microsoft.com/en-us/library/bb491009.aspx)
+ def killChild():
+ Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)])
+ atexit.register(killChild)
+
# Create a thread to echo output from the GatewayServer, which is required
# for Java log output to show up:
class EchoOutputThread(Thread):