aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xbin/pyspark35
-rw-r--r--bin/pyspark2.cmd21
-rwxr-xr-xbin/spark-shell6
-rwxr-xr-xbin/spark-shell.cmd2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala55
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala2
-rw-r--r--python/pyspark/java_gateway.py10
-rw-r--r--python/pyspark/shell.py2
10 files changed, 107 insertions, 34 deletions
diff --git a/bin/pyspark b/bin/pyspark
index 10e35e0f17..9e1364e44c 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -25,6 +25,12 @@ export SPARK_HOME="$FWDIR"
SCALA_VERSION=2.10
+if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
+ echo "Usage: ./bin/pyspark [options]"
+ ./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
+ exit 0
+fi
+
# Exit if the user hasn't compiled Spark
if [ ! -f "$FWDIR/RELEASE" ]; then
# Exit if the user hasn't compiled Spark
@@ -52,13 +58,34 @@ export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py
+# If IPython options are specified, assume user wants to run IPython
if [ -n "$IPYTHON_OPTS" ]; then
IPYTHON=1
fi
-# Only use ipython if no command line arguments were provided [SPARK-1134]
-if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then
- exec ipython $IPYTHON_OPTS
+# Build up arguments list manually to preserve quotes and backslashes.
+# We export Spark submit arguments as an environment variable because shell.py must run as a
+# PYTHONSTARTUP script, which does not take in arguments. This is required for IPython notebooks.
+
+PYSPARK_SUBMIT_ARGS=""
+whitespace="[[:space:]]"
+for i in "$@"; do
+ if [[ $i =~ \" ]]; then i=$(echo $i | sed 's/\"/\\\"/g'); fi
+ if [[ $i =~ $whitespace ]]; then i=\"$i\"; fi
+ PYSPARK_SUBMIT_ARGS="$PYSPARK_SUBMIT_ARGS $i"
+done
+export PYSPARK_SUBMIT_ARGS
+
+# If a python file is provided, directly run spark-submit.
+if [[ "$1" =~ \.py$ ]]; then
+ echo -e "\nWARNING: Running python applications through ./bin/pyspark is deprecated as of Spark 1.0." 1>&2
+ echo -e "Use ./bin/spark-submit <python file>\n" 1>&2
+ exec $FWDIR/bin/spark-submit "$@"
else
- exec "$PYSPARK_PYTHON" "$@"
+ # Only use ipython if no command line arguments were provided [SPARK-1134]
+ if [[ "$IPYTHON" = "1" ]]; then
+ exec ipython $IPYTHON_OPTS
+ else
+ exec "$PYSPARK_PYTHON"
+ fi
fi
diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd
index d7cfd5eec5..0ef9eea953 100644
--- a/bin/pyspark2.cmd
+++ b/bin/pyspark2.cmd
@@ -31,7 +31,7 @@ set FOUND_JAR=0
for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do (
set FOUND_JAR=1
)
-if "%FOUND_JAR%"=="0" (
+if [%FOUND_JAR%] == [0] (
echo Failed to find Spark assembly JAR.
echo You need to build Spark with sbt\sbt assembly before running this program.
goto exit
@@ -42,15 +42,30 @@ rem Load environment variables from conf\spark-env.cmd, if it exists
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
rem Figure out which Python to use.
-if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python
+if [%PYSPARK_PYTHON%] == [] set PYSPARK_PYTHON=python
set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
+set PYSPARK_SUBMIT_ARGS=%*
echo Running %PYSPARK_PYTHON% with PYTHONPATH=%PYTHONPATH%
-"%PYSPARK_PYTHON%" %*
+rem Check whether the argument is a file
+for /f %%i in ('echo %1^| findstr /R "\.py"') do (
+ set PYTHON_FILE=%%i
+)
+
+if [%PYTHON_FILE%] == [] (
+ %PYSPARK_PYTHON%
+) else (
+ echo.
+ echo WARNING: Running python applications through ./bin/pyspark.cmd is deprecated as of Spark 1.0.
+ echo Use ./bin/spark-submit ^<python file^>
+ echo.
+ "%FWDIR%\bin\spark-submit.cmd" %PYSPARK_SUBMIT_ARGS%
+)
+
:exit
diff --git a/bin/spark-shell b/bin/spark-shell
index 7f03349c5e..c158683ab3 100755
--- a/bin/spark-shell
+++ b/bin/spark-shell
@@ -28,7 +28,7 @@ esac
# Enter posix mode for bash
set -o posix
-if [[ "$@" == *--help* ]]; then
+if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./bin/spark-shell [options]"
./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit 0
@@ -46,11 +46,11 @@ function main(){
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
- $FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main
+ $FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
- $FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main
+ $FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
fi
}
diff --git a/bin/spark-shell.cmd b/bin/spark-shell.cmd
index ca0c722c92..4b9708a8c0 100755
--- a/bin/spark-shell.cmd
+++ b/bin/spark-shell.cmd
@@ -19,4 +19,4 @@ rem
set SPARK_HOME=%~dp0..
-cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-internal %* --class org.apache.spark.repl.Main
+cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell %* --class org.apache.spark.repl.Main
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index e20d4486c8..2dfa02bd26 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -42,7 +42,7 @@ object PythonRunner {
// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument
val pathElements = new ArrayBuffer[String]
- pathElements ++= pyFiles.split(",")
+ pathElements ++= Option(pyFiles).getOrElse("").split(",")
pathElements += PythonUtils.sparkPythonPath
pathElements += sys.env.getOrElse("PYTHONPATH", "")
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index e86182e4c5..a99b2176e2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -41,10 +41,10 @@ object SparkSubmit {
private var clusterManager: Int = LOCAL
/**
- * A special jar name that indicates the class being run is inside of Spark itself,
- * and therefore no user jar is needed.
+ * Special primary resource names that represent shells rather than application jars.
*/
- private val RESERVED_JAR_NAME = "spark-internal"
+ private val SPARK_SHELL = "spark-shell"
+ private val PYSPARK_SHELL = "pyspark-shell"
def main(args: Array[String]) {
val appArgs = new SparkSubmitArguments(args)
@@ -71,8 +71,8 @@ object SparkSubmit {
* entries for the child, a list of system properties, a list of env vars
* and the main class for the child
*/
- private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String],
- ArrayBuffer[String], Map[String, String], String) = {
+ private[spark] def createLaunchEnv(args: SparkSubmitArguments)
+ : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
if (args.master.startsWith("local")) {
clusterManager = LOCAL
} else if (args.master.startsWith("yarn")) {
@@ -121,24 +121,30 @@ object SparkSubmit {
printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
}
- // If we're running a Python app, set the Java class to run to be our PythonRunner, add
- // Python files to deployment list, and pass the main file and Python path to PythonRunner
+ // If we're running a python app, set the main class to our specific python runner
if (isPython) {
if (deployOnCluster) {
printErrorAndExit("Cannot currently run Python driver programs on cluster")
}
- args.mainClass = "org.apache.spark.deploy.PythonRunner"
- args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource)
+ if (args.primaryResource == PYSPARK_SHELL) {
+ args.mainClass = "py4j.GatewayServer"
+ args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
+ } else {
+ // If a python file is provided, add it to the child arguments and list of files to deploy.
+ // Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
+ args.mainClass = "org.apache.spark.deploy.PythonRunner"
+ args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
+ args.files = mergeFileLists(args.files, args.primaryResource)
+ }
val pyFiles = Option(args.pyFiles).getOrElse("")
- args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs
- args.primaryResource = RESERVED_JAR_NAME
+ args.files = mergeFileLists(args.files, pyFiles)
sysProps("spark.submit.pyFiles") = pyFiles
}
// If we're deploying into YARN, use yarn.Client as a wrapper around the user class
if (!deployOnCluster) {
childMainClass = args.mainClass
- if (args.primaryResource != RESERVED_JAR_NAME) {
+ if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
} else if (clusterManager == YARN) {
@@ -219,7 +225,7 @@ object SparkSubmit {
// For python files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
- if (args.primaryResource != RESERVED_JAR_NAME) {
+ if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sysProps.put("spark.jars", jars.mkString(","))
@@ -293,7 +299,7 @@ object SparkSubmit {
}
private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
- val localJarFile = new File(new URI(localJar).getPath())
+ val localJarFile = new File(new URI(localJar).getPath)
if (!localJarFile.exists()) {
printWarning(s"Jar $localJar does not exist, skipping.")
}
@@ -303,6 +309,27 @@ object SparkSubmit {
}
/**
+ * Return whether the given primary resource represents a user jar.
+ */
+ private def isUserJar(primaryResource: String): Boolean = {
+ !isShell(primaryResource) && !isPython(primaryResource)
+ }
+
+ /**
+ * Return whether the given primary resource represents a shell.
+ */
+ private def isShell(primaryResource: String): Boolean = {
+ primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL
+ }
+
+ /**
+ * Return whether the given primary resource requires running python.
+ */
+ private[spark] def isPython(primaryResource: String): Boolean = {
+ primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
+ }
+
+ /**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
*/
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 2d327aa3fb..264d4544cd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -298,11 +298,13 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
case v =>
primaryResource = v
inSparkOpts = false
- isPython = v.endsWith(".py")
+ isPython = SparkSubmit.isPython(v)
parse(tail)
}
} else {
- childArgs += value
+ if (!value.isEmpty) {
+ childArgs += value
+ }
parse(tail)
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 388f722242..0c7cff019f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1101,7 +1101,7 @@ private[spark] object Utils extends Logging {
* Strip the directory from a path name
*/
def stripDirectory(path: String): String = {
- path.split(File.separator).last
+ new File(path).getName
}
/**
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 3d0936fdca..91ae8263f6 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -18,12 +18,12 @@
import os
import sys
import signal
+import shlex
import platform
from subprocess import Popen, PIPE
from threading import Thread
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
-
def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]
@@ -34,9 +34,11 @@ def launch_gateway():
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
- script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
- command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
- "--die-on-broken-pipe", "0"]
+ script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
+ submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS")
+ submit_args = submit_args if submit_args is not None else ""
+ submit_args = shlex.split(submit_args)
+ command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index d172d588bf..ebd714db7a 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -40,7 +40,7 @@ add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES"
if os.environ.get("SPARK_EXECUTOR_URI"):
SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])
-sc = SparkContext(os.environ.get("MASTER", "local[*]"), "PySparkShell", pyFiles=add_files)
+sc = SparkContext(appName="PySparkShell", pyFiles=add_files)
print("""Welcome to
____ __