aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-05-16 22:34:38 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-16 22:34:38 -0700
commit4b8ec6fcfd7a7ef0857d5b21917183c181301c95 (patch)
treed20ce09d28fac8caf0cec1ef68fbf72a4b3b62a1 /core
parentc0ab85d7320cea90e6331fb03a70349bc804c1b1 (diff)
downloadspark-4b8ec6fcfd7a7ef0857d5b21917183c181301c95.tar.gz
spark-4b8ec6fcfd7a7ef0857d5b21917183c181301c95.tar.bz2
spark-4b8ec6fcfd7a7ef0857d5b21917183c181301c95.zip
[SPARK-1808] Route bin/pyspark through Spark submit
**Problem.** For `bin/pyspark`, there is currently no other way to specify Spark configuration properties other than through `SPARK_JAVA_OPTS` in `conf/spark-env.sh`. However, this mechanism is supposedly deprecated. Instead, it needs to pick up configurations explicitly specified in `conf/spark-defaults.conf`. **Solution.** Have `bin/pyspark` invoke `bin/spark-submit`, like all of its counterparts in Scala land (i.e. `bin/spark-shell`, `bin/run-example`). This has the additional benefit of making the invocation of all the user facing Spark scripts consistent. **Details.** `bin/pyspark` inherently handles two cases: (1) running python applications and (2) running the python shell. For (1), Spark submit already handles running python applications. For cases in which `bin/pyspark` is given a python file, we can simply call pass the file directly to Spark submit and let it handle the rest. For case (2), `bin/pyspark` starts a python process as before, which launches the JVM as a sub-process. The existing code already provides a code path to do this. All we needed to change is to use `bin/spark-submit` instead of `spark-class` to launch the JVM. This requires modifications to Spark submit to handle the pyspark shell as a special case. This has been tested locally (OSX and Windows 7), on a standalone cluster, and on a YARN cluster. Running IPython also works as before, except now it takes in Spark submit arguments too. Author: Andrew Or <andrewor14@gmail.com> Closes #799 from andrewor14/pyspark-submit and squashes the following commits: bf37e36 [Andrew Or] Minor changes 01066fa [Andrew Or] bin/pyspark for Windows c8cb3bf [Andrew Or] Handle perverse app names (with escaped quotes) 1866f85 [Andrew Or] Windows is not cooperating 456d844 [Andrew Or] Guard against shlex hanging if PYSPARK_SUBMIT_ARGS is not set 7eebda8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-submit b7ba0d8 [Andrew Or] Address a few comments (minor) 06eb138 [Andrew Or] Use shlex instead of writing our own parser 05879fa [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-submit a823661 [Andrew Or] Fix --die-on-broken-pipe not propagated properly 6fba412 [Andrew Or] Deal with quotes + address various comments fe4c8a7 [Andrew Or] Update --help for bin/pyspark afe47bf [Andrew Or] Fix spark shell f04aaa4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-submit a371d26 [Andrew Or] Route bin/pyspark through Spark submit
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala55
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala2
4 files changed, 47 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index e20d4486c8..2dfa02bd26 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -42,7 +42,7 @@ object PythonRunner {
// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument
val pathElements = new ArrayBuffer[String]
- pathElements ++= pyFiles.split(",")
+ pathElements ++= Option(pyFiles).getOrElse("").split(",")
pathElements += PythonUtils.sparkPythonPath
pathElements += sys.env.getOrElse("PYTHONPATH", "")
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index e86182e4c5..a99b2176e2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -41,10 +41,10 @@ object SparkSubmit {
private var clusterManager: Int = LOCAL
/**
- * A special jar name that indicates the class being run is inside of Spark itself,
- * and therefore no user jar is needed.
+ * Special primary resource names that represent shells rather than application jars.
*/
- private val RESERVED_JAR_NAME = "spark-internal"
+ private val SPARK_SHELL = "spark-shell"
+ private val PYSPARK_SHELL = "pyspark-shell"
def main(args: Array[String]) {
val appArgs = new SparkSubmitArguments(args)
@@ -71,8 +71,8 @@ object SparkSubmit {
* entries for the child, a list of system properties, a list of env vars
* and the main class for the child
*/
- private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String],
- ArrayBuffer[String], Map[String, String], String) = {
+ private[spark] def createLaunchEnv(args: SparkSubmitArguments)
+ : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
if (args.master.startsWith("local")) {
clusterManager = LOCAL
} else if (args.master.startsWith("yarn")) {
@@ -121,24 +121,30 @@ object SparkSubmit {
printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
}
- // If we're running a Python app, set the Java class to run to be our PythonRunner, add
- // Python files to deployment list, and pass the main file and Python path to PythonRunner
+ // If we're running a python app, set the main class to our specific python runner
if (isPython) {
if (deployOnCluster) {
printErrorAndExit("Cannot currently run Python driver programs on cluster")
}
- args.mainClass = "org.apache.spark.deploy.PythonRunner"
- args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource)
+ if (args.primaryResource == PYSPARK_SHELL) {
+ args.mainClass = "py4j.GatewayServer"
+ args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
+ } else {
+ // If a python file is provided, add it to the child arguments and list of files to deploy.
+ // Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
+ args.mainClass = "org.apache.spark.deploy.PythonRunner"
+ args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
+ args.files = mergeFileLists(args.files, args.primaryResource)
+ }
val pyFiles = Option(args.pyFiles).getOrElse("")
- args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs
- args.primaryResource = RESERVED_JAR_NAME
+ args.files = mergeFileLists(args.files, pyFiles)
sysProps("spark.submit.pyFiles") = pyFiles
}
// If we're deploying into YARN, use yarn.Client as a wrapper around the user class
if (!deployOnCluster) {
childMainClass = args.mainClass
- if (args.primaryResource != RESERVED_JAR_NAME) {
+ if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
} else if (clusterManager == YARN) {
@@ -219,7 +225,7 @@ object SparkSubmit {
// For python files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
- if (args.primaryResource != RESERVED_JAR_NAME) {
+ if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sysProps.put("spark.jars", jars.mkString(","))
@@ -293,7 +299,7 @@ object SparkSubmit {
}
private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
- val localJarFile = new File(new URI(localJar).getPath())
+ val localJarFile = new File(new URI(localJar).getPath)
if (!localJarFile.exists()) {
printWarning(s"Jar $localJar does not exist, skipping.")
}
@@ -303,6 +309,27 @@ object SparkSubmit {
}
/**
+ * Return whether the given primary resource represents a user jar.
+ */
+ private def isUserJar(primaryResource: String): Boolean = {
+ !isShell(primaryResource) && !isPython(primaryResource)
+ }
+
+ /**
+ * Return whether the given primary resource represents a shell.
+ */
+ private def isShell(primaryResource: String): Boolean = {
+ primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL
+ }
+
+ /**
+ * Return whether the given primary resource requires running python.
+ */
+ private[spark] def isPython(primaryResource: String): Boolean = {
+ primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
+ }
+
+ /**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
*/
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 2d327aa3fb..264d4544cd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -298,11 +298,13 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
case v =>
primaryResource = v
inSparkOpts = false
- isPython = v.endsWith(".py")
+ isPython = SparkSubmit.isPython(v)
parse(tail)
}
} else {
- childArgs += value
+ if (!value.isEmpty) {
+ childArgs += value
+ }
parse(tail)
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 388f722242..0c7cff019f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1101,7 +1101,7 @@ private[spark] object Utils extends Logging {
* Strip the directory from a path name
*/
def stripDirectory(path: String): String = {
- path.split(File.separator).last
+ new File(path).getName
}
/**