aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-05-06 15:12:35 -0700
committerMatei Zaharia <matei@databricks.com>2014-05-06 15:12:35 -0700
commit951a5d939863b42da83ac2569d5e9d7ed680e119 (patch)
tree6ff0c545f577b05a86ce33d339cd0d487e935a38
parentec09acdd4a72333e1c9c2e9d8e12e9c4c07770c8 (diff)
downloadspark-951a5d939863b42da83ac2569d5e9d7ed680e119.tar.gz
spark-951a5d939863b42da83ac2569d5e9d7ed680e119.tar.bz2
spark-951a5d939863b42da83ac2569d5e9d7ed680e119.zip
[SPARK-1549] Add Python support to spark-submit
This PR updates spark-submit to allow submitting Python scripts (currently only with deploy-mode=client, but that's all that was supported before) and updates the PySpark code to properly find various paths, etc. One significant change is that we assume we can always find the Python files either from the Spark assembly JAR (which will happen with the Maven assembly build in make-distribution.sh) or from SPARK_HOME (which will exist in local mode even if you use sbt assembly, and should be enough for testing). This means we no longer need a weird hack to modify the environment for YARN. This patch also updates the Python worker manager to run python with -u, which means unbuffered output (send it to our logs right away instead of waiting a while after stuff was written); this should simplify debugging. In addition, it fixes https://issues.apache.org/jira/browse/SPARK-1709, setting the main class from a JAR's Main-Class attribute if not specified by the user, and fixes a few help strings and style issues in spark-submit. In the future we may want to make the `pyspark` shell use spark-submit as well, but it seems unnecessary for 1.0. Author: Matei Zaharia <matei@databricks.com> Closes #664 from mateiz/py-submit and squashes the following commits: 15e9669 [Matei Zaharia] Fix some uses of path.separator property 051278c [Matei Zaharia] Small style fixes 0afe886 [Matei Zaharia] Add license headers 4650412 [Matei Zaharia] Add pyFiles to PYTHONPATH in executors, remove old YARN stuff, add tests 15f8e1e [Matei Zaharia] Set PYTHONPATH in PythonWorkerFactory in case it wasn't set from outside 47c0655 [Matei Zaharia] More work to make spark-submit work with Python: d4375bd [Matei Zaharia] Clean up description of spark-submit args a bit and add Python ones
-rw-r--r--assembly/pom.xml13
-rw-r--r--core/pom.xml5
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala42
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala84
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala183
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala84
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala6
-rw-r--r--docs/python-programming-guide.md28
-rw-r--r--project/SparkBuild.scala4
-rw-r--r--python/pyspark/context.py6
-rw-r--r--python/pyspark/java_gateway.py89
-rw-r--r--python/pyspark/tests.py131
-rw-r--r--repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala3
16 files changed, 505 insertions, 194 deletions
diff --git a/assembly/pom.xml b/assembly/pom.xml
index bdb3880649..7d123fb1d7 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -40,14 +40,6 @@
<deb.user>root</deb.user>
</properties>
- <repositories>
- <!-- A repository in the local filesystem for the Py4J JAR, which is not in Maven central -->
- <repository>
- <id>lib</id>
- <url>file://${project.basedir}/lib</url>
- </repository>
- </repositories>
-
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
@@ -84,11 +76,6 @@
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>net.sf.py4j</groupId>
- <artifactId>py4j</artifactId>
- <version>0.8.1</version>
- </dependency>
</dependencies>
<build>
diff --git a/core/pom.xml b/core/pom.xml
index c24c7be204..8fe215ab24 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -247,6 +247,11 @@
<artifactId>pyrolite</artifactId>
<version>2.0.1</version>
</dependency>
+ <dependency>
+ <groupId>net.sf.py4j</groupId>
+ <artifactId>py4j</artifactId>
+ <version>0.8.1</version>
+ </dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index d40ed27da5..806e77d98f 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -17,6 +17,8 @@
package org.apache.spark
+import java.io.File
+
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.concurrent.Await
@@ -304,7 +306,7 @@ object SparkEnv extends Logging {
k == "java.class.path"
}.getOrElse(("", ""))
val classPathEntries = classPathProperty._2
- .split(conf.get("path.separator", ":"))
+ .split(File.pathSeparator)
.filterNot(e => e.isEmpty)
.map(e => (e, "System Classpath"))
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
new file mode 100644
index 0000000000..cf69fa1d53
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.SparkContext
+
+private[spark] object PythonUtils {
+ /** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */
+ def sparkPythonPath: String = {
+ val pythonPath = new ArrayBuffer[String]
+ for (sparkHome <- sys.env.get("SPARK_HOME")) {
+ pythonPath += Seq(sparkHome, "python").mkString(File.separator)
+ pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.1-src.zip").mkString(File.separator)
+ }
+ pythonPath ++= SparkContext.jarOfObject(this)
+ pythonPath.mkString(File.pathSeparator)
+ }
+
+ /** Merge PYTHONPATHS with the appropriate separator. Ignores blank strings. */
+ def mergePythonPaths(paths: String*): String = {
+ paths.filter(_ != "").mkString(File.pathSeparator)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 02799ce009..b0bf4e052b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -37,6 +37,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
var daemonPort: Int = 0
+ val pythonPath = PythonUtils.mergePythonPaths(
+ PythonUtils.sparkPythonPath, envVars.getOrElse("PYTHONPATH", ""))
+
def create(): Socket = {
if (useDaemon) {
createThroughDaemon()
@@ -78,9 +81,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
// Create and start the worker
- val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
+ val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.worker"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
+ workerEnv.put("PYTHONPATH", pythonPath)
val worker = pb.start()
// Redirect the worker's stderr to ours
@@ -151,9 +155,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
try {
// Create and start the daemon
- val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
+ val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.daemon"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
+ workerEnv.put("PYTHONPATH", pythonPath)
daemon = pb.start()
// Redirect the stderr to ours
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
new file mode 100644
index 0000000000..f2e7c7a508
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io.{IOException, File, InputStream, OutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.api.python.PythonUtils
+
+/**
+ * A main class used by spark-submit to launch Python applications. It executes python as a
+ * subprocess and then has it connect back to the JVM to access system properties, etc.
+ */
+object PythonRunner {
+ def main(args: Array[String]) {
+ val primaryResource = args(0)
+ val pyFiles = args(1)
+ val otherArgs = args.slice(2, args.length)
+
+ val pythonExec = sys.env.get("PYSPARK_PYTHON").getOrElse("python") // TODO: get this from conf
+
+ // Launch a Py4J gateway server for the process to connect to; this will let it see our
+ // Java system properties and such
+ val gatewayServer = new py4j.GatewayServer(null, 0)
+ gatewayServer.start()
+
+ // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
+ // python directories in SPARK_HOME (if set), and any files in the pyFiles argument
+ val pathElements = new ArrayBuffer[String]
+ pathElements ++= pyFiles.split(",")
+ pathElements += PythonUtils.sparkPythonPath
+ pathElements += sys.env.getOrElse("PYTHONPATH", "")
+ val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
+
+ // Launch Python process
+ val builder = new ProcessBuilder(Seq(pythonExec, "-u", primaryResource) ++ otherArgs)
+ val env = builder.environment()
+ env.put("PYTHONPATH", pythonPath)
+ env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
+ builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
+ val process = builder.start()
+
+ new RedirectThread(process.getInputStream, System.out, "redirect output").start()
+
+ System.exit(process.waitFor())
+ }
+
+ /**
+ * A utility class to redirect the child process's stdout or stderr
+ */
+ class RedirectThread(in: InputStream, out: OutputStream, name: String) extends Thread(name) {
+ setDaemon(true)
+ override def run() {
+ scala.util.control.Exception.ignoring(classOf[IOException]) {
+ // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
+ val buf = new Array[Byte](1024)
+ var len = in.read(buf)
+ while (len != -1) {
+ out.write(buf, 0, len)
+ out.flush()
+ len = in.read(buf)
+ }
+ }
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index fb30e8a70f..e39723f383 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -60,11 +60,11 @@ object SparkSubmit {
private[spark] var exitFn: () => Unit = () => System.exit(-1)
private[spark] def printErrorAndExit(str: String) = {
- printStream.println("error: " + str)
- printStream.println("run with --help for more information or --verbose for debugging output")
+ printStream.println("Error: " + str)
+ printStream.println("Run with --help for usage help or --verbose for debug output")
exitFn()
}
- private[spark] def printWarning(str: String) = printStream.println("warning: " + str)
+ private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)
/**
* @return
@@ -72,43 +72,43 @@ object SparkSubmit {
* entries for the child, a list of system propertes, a list of env vars
* and the main class for the child
*/
- private[spark] def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
+ private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String],
ArrayBuffer[String], Map[String, String], String) = {
- if (appArgs.master.startsWith("local")) {
+ if (args.master.startsWith("local")) {
clusterManager = LOCAL
- } else if (appArgs.master.startsWith("yarn")) {
+ } else if (args.master.startsWith("yarn")) {
clusterManager = YARN
- } else if (appArgs.master.startsWith("spark")) {
+ } else if (args.master.startsWith("spark")) {
clusterManager = STANDALONE
- } else if (appArgs.master.startsWith("mesos")) {
+ } else if (args.master.startsWith("mesos")) {
clusterManager = MESOS
} else {
- printErrorAndExit("master must start with yarn, mesos, spark, or local")
+ printErrorAndExit("Master must start with yarn, mesos, spark, or local")
}
// Because "yarn-cluster" and "yarn-client" encapsulate both the master
// and deploy mode, we have some logic to infer the master and deploy mode
// from each other if only one is specified, or exit early if they are at odds.
- if (appArgs.deployMode == null &&
- (appArgs.master == "yarn-standalone" || appArgs.master == "yarn-cluster")) {
- appArgs.deployMode = "cluster"
+ if (args.deployMode == null &&
+ (args.master == "yarn-standalone" || args.master == "yarn-cluster")) {
+ args.deployMode = "cluster"
}
- if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") {
+ if (args.deployMode == "cluster" && args.master == "yarn-client") {
printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible")
}
- if (appArgs.deployMode == "client" &&
- (appArgs.master == "yarn-standalone" || appArgs.master == "yarn-cluster")) {
- printErrorAndExit("Deploy mode \"client\" and master \"" + appArgs.master
+ if (args.deployMode == "client" &&
+ (args.master == "yarn-standalone" || args.master == "yarn-cluster")) {
+ printErrorAndExit("Deploy mode \"client\" and master \"" + args.master
+ "\" are not compatible")
}
- if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) {
- appArgs.master = "yarn-cluster"
+ if (args.deployMode == "cluster" && args.master.startsWith("yarn")) {
+ args.master = "yarn-cluster"
}
- if (appArgs.deployMode != "cluster" && appArgs.master.startsWith("yarn")) {
- appArgs.master = "yarn-client"
+ if (args.deployMode != "cluster" && args.master.startsWith("yarn")) {
+ args.master = "yarn-client"
}
- val deployOnCluster = Option(appArgs.deployMode).getOrElse("client") == "cluster"
+ val deployOnCluster = Option(args.deployMode).getOrElse("client") == "cluster"
val childClasspath = new ArrayBuffer[String]()
val childArgs = new ArrayBuffer[String]()
@@ -116,76 +116,93 @@ object SparkSubmit {
var childMainClass = ""
if (clusterManager == MESOS && deployOnCluster) {
- printErrorAndExit("Mesos does not support running the driver on the cluster")
+ printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
}
+ // If we're running a Python app, set the Java class to run to be our PythonRunner, add
+ // Python files to deployment list, and pass the main file and Python path to PythonRunner
+ if (args.isPython) {
+ if (deployOnCluster) {
+ printErrorAndExit("Cannot currently run Python driver programs on cluster")
+ }
+ args.mainClass = "org.apache.spark.deploy.PythonRunner"
+ args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource)
+ val pyFiles = Option(args.pyFiles).getOrElse("")
+ args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs
+ args.primaryResource = RESERVED_JAR_NAME
+ sysProps("spark.submit.pyFiles") = pyFiles
+ }
+
+ // If we're deploying into YARN, use yarn.Client as a wrapper around the user class
if (!deployOnCluster) {
- childMainClass = appArgs.mainClass
- if (appArgs.primaryResource != RESERVED_JAR_NAME) {
- childClasspath += appArgs.primaryResource
+ childMainClass = args.mainClass
+ if (args.primaryResource != RESERVED_JAR_NAME) {
+ childClasspath += args.primaryResource
}
} else if (clusterManager == YARN) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
- childArgs += ("--jar", appArgs.primaryResource)
- childArgs += ("--class", appArgs.mainClass)
+ childArgs += ("--jar", args.primaryResource)
+ childArgs += ("--class", args.mainClass)
}
+ // Make sure YARN is included in our build if we're trying to use it
if (clusterManager == YARN) {
- // The choice of class is arbitrary, could use any spark-yarn class
if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
- val msg = "Could not load YARN classes. This copy of Spark may not have been compiled " +
- "with YARN support."
- throw new Exception(msg)
+ printErrorAndExit("Could not load YARN classes. " +
+ "This copy of Spark may not have been compiled with YARN support.")
}
}
// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"
+ // A list of rules to map each argument to system properties or command-line options in
+ // each deploy mode; we iterate through these below
val options = List[OptionAssigner](
- new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
- new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true,
+ OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
+ OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraClassPath"),
- new OptionAssigner(appArgs.driverExtraJavaOptions, STANDALONE | YARN, true,
+ OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
sysProp = "spark.driver.extraJavaOptions"),
- new OptionAssigner(appArgs.driverExtraLibraryPath, STANDALONE | YARN, true,
+ OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraLibraryPath"),
- new OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"),
- new OptionAssigner(appArgs.name, YARN, true, clOption = "--name"),
- new OptionAssigner(appArgs.queue, YARN, true, clOption = "--queue"),
- new OptionAssigner(appArgs.queue, YARN, false, sysProp = "spark.yarn.queue"),
- new OptionAssigner(appArgs.numExecutors, YARN, true, clOption = "--num-executors"),
- new OptionAssigner(appArgs.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
- new OptionAssigner(appArgs.executorMemory, YARN, true, clOption = "--executor-memory"),
- new OptionAssigner(appArgs.executorMemory, STANDALONE | MESOS | YARN, false,
+ OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
+ OptionAssigner(args.name, YARN, true, clOption = "--name"),
+ OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
+ OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
+ OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
+ OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
+ OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
+ OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
sysProp = "spark.executor.memory"),
- new OptionAssigner(appArgs.driverMemory, STANDALONE, true, clOption = "--memory"),
- new OptionAssigner(appArgs.driverCores, STANDALONE, true, clOption = "--cores"),
- new OptionAssigner(appArgs.executorCores, YARN, true, clOption = "--executor-cores"),
- new OptionAssigner(appArgs.executorCores, YARN, false, sysProp = "spark.executor.cores"),
- new OptionAssigner(appArgs.totalExecutorCores, STANDALONE | MESOS, false,
+ OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
+ OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
+ OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
+ OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
+ OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
sysProp = "spark.cores.max"),
- new OptionAssigner(appArgs.files, YARN, false, sysProp = "spark.yarn.dist.files"),
- new OptionAssigner(appArgs.files, YARN, true, clOption = "--files"),
- new OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
- new OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"),
- new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars"),
- new OptionAssigner(appArgs.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
- new OptionAssigner(appArgs.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"),
- new OptionAssigner(appArgs.name, LOCAL | STANDALONE | MESOS, false,
- sysProp = "spark.app.name")
+ OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
+ OptionAssigner(args.files, YARN, true, clOption = "--files"),
+ OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
+ OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
+ OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
+ OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
+ OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
+ OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"),
+ OptionAssigner(args.name, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.app.name")
)
// For client mode make any added jars immediately visible on the classpath
- if (appArgs.jars != null && !deployOnCluster) {
- for (jar <- appArgs.jars.split(",")) {
+ if (args.jars != null && !deployOnCluster) {
+ for (jar <- args.jars.split(",")) {
childClasspath += jar
}
}
+ // Map all arguments to command-line options or system properties for our chosen mode
for (opt <- options) {
if (opt.value != null && deployOnCluster == opt.deployOnCluster &&
- (clusterManager & opt.clusterManager) != 0) {
+ (clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) {
childArgs += (opt.clOption, opt.value)
} else if (opt.sysProp != null) {
@@ -197,32 +214,35 @@ object SparkSubmit {
// For standalone mode, add the application jar automatically so the user doesn't have to
// call sc.addJar. TODO: Standalone mode in the cluster
if (clusterManager == STANDALONE) {
- val existingJars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
- sysProps.put("spark.jars", (existingJars ++ Seq(appArgs.primaryResource)).mkString(","))
+ var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
+ if (args.primaryResource != RESERVED_JAR_NAME) {
+ jars = jars ++ Seq(args.primaryResource)
+ }
+ sysProps.put("spark.jars", jars.mkString(","))
}
if (deployOnCluster && clusterManager == STANDALONE) {
- if (appArgs.supervise) {
+ if (args.supervise) {
childArgs += "--supervise"
}
childMainClass = "org.apache.spark.deploy.Client"
childArgs += "launch"
- childArgs += (appArgs.master, appArgs.primaryResource, appArgs.mainClass)
+ childArgs += (args.master, args.primaryResource, args.mainClass)
}
// Arguments to be passed to user program
- if (appArgs.childArgs != null) {
+ if (args.childArgs != null) {
if (!deployOnCluster || clusterManager == STANDALONE) {
- childArgs ++= appArgs.childArgs
+ childArgs ++= args.childArgs
} else if (clusterManager == YARN) {
- for (arg <- appArgs.childArgs) {
+ for (arg <- args.childArgs) {
childArgs += ("--arg", arg)
}
}
}
- for ((k, v) <- appArgs.getDefaultSparkProperties) {
+ for ((k, v) <- args.getDefaultSparkProperties) {
if (!sysProps.contains(k)) sysProps(k) = v
}
@@ -230,8 +250,8 @@ object SparkSubmit {
}
private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
- sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) {
-
+ sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false)
+ {
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
@@ -273,15 +293,26 @@ object SparkSubmit {
val url = localJarFile.getAbsoluteFile.toURI.toURL
loader.addURL(url)
}
+
+ /**
+ * Merge a sequence of comma-separated file lists, some of which may be null to indicate
+ * no files, into a single comma-separated string.
+ */
+ private[spark] def mergeFileLists(lists: String*): String = {
+ val merged = lists.filter(_ != null)
+ .flatMap(_.split(","))
+ .mkString(",")
+ if (merged == "") null else merged
+ }
}
/**
* Provides an indirection layer for passing arguments as system properties or flags to
* the user's driver program or to downstream launcher tools.
*/
-private[spark] class OptionAssigner(val value: String,
- val clusterManager: Int,
- val deployOnCluster: Boolean,
- val clOption: String = null,
- val sysProp: String = null
-) { }
+private[spark] case class OptionAssigner(
+ value: String,
+ clusterManager: Int,
+ deployOnCluster: Boolean,
+ clOption: String = null,
+ sysProp: String = null)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 7031cdd9b4..2d327aa3fb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy
import java.io.{File, FileInputStream, IOException}
import java.util.Properties
+import java.util.jar.JarFile
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -52,6 +53,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
var jars: String = null
var verbose: Boolean = false
+ var isPython: Boolean = false
+ var pyFiles: String = null
parseOpts(args.toList)
loadDefaults()
@@ -76,7 +79,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
}
/** Fill in any undefined values based on the current properties file or built-in defaults. */
- private def loadDefaults() = {
+ private def loadDefaults(): Unit = {
// Use common defaults file, if not specified by user
if (propertiesFile == null) {
@@ -107,15 +110,43 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
master = Option(master).getOrElse(System.getenv("MASTER"))
deployMode = Option(deployMode).getOrElse(System.getenv("DEPLOY_MODE"))
+ // Try to set main class from JAR if no --class argument is given
+ if (mainClass == null && !isPython && primaryResource != null) {
+ try {
+ val jar = new JarFile(primaryResource)
+ // Note that this might still return null if no main-class is set; we catch that later
+ mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
+ } catch {
+ case e: Exception =>
+ SparkSubmit.printErrorAndExit("Failed to read JAR: " + primaryResource)
+ return
+ }
+ }
+
// Global defaults. These should be keep to minimum to avoid confusing behavior.
master = Option(master).getOrElse("local[*]")
+
+ // Set name from main class if not given
+ name = Option(name).orElse(Option(mainClass)).orNull
+ if (name == null && primaryResource != null) {
+ name = Utils.stripDirectory(primaryResource)
+ }
}
/** Ensure that required fields exists. Call this only once all defaults are loaded. */
private def checkRequiredArguments() = {
- if (args.length == 0) printUsageAndExit(-1)
- if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
- if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")
+ if (args.length == 0) {
+ printUsageAndExit(-1)
+ }
+ if (primaryResource == null) {
+ SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR or Python file)")
+ }
+ if (mainClass == null && !isPython) {
+ SparkSubmit.printErrorAndExit("No main class set in JAR; please specify one with --class")
+ }
+ if (pyFiles != null && !isPython) {
+ SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
+ }
if (master.startsWith("yarn")) {
val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
@@ -143,6 +174,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
| queue $queue
| numExecutors $numExecutors
| files $files
+ | pyFiles $pyFiles
| archives $archives
| mainClass $mainClass
| primaryResource $primaryResource
@@ -234,6 +266,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
files = value
parse(tail)
+ case ("--py-files") :: value :: tail =>
+ pyFiles = value
+ parse(tail)
+
case ("--archives") :: value :: tail =>
archives = value
parse(tail)
@@ -260,9 +296,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
val errMessage = s"Unrecognized option '$value'."
SparkSubmit.printErrorAndExit(errMessage)
case v =>
- primaryResource = v
- inSparkOpts = false
- parse(tail)
+ primaryResource = v
+ inSparkOpts = false
+ isPython = v.endsWith(".py")
+ parse(tail)
}
} else {
childArgs += value
@@ -270,7 +307,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
}
case Nil =>
- }
+ }
}
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
@@ -279,23 +316,26 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
outStream.println("Unknown/unsupported param " + unknownParam)
}
outStream.println(
- """Usage: spark-submit [options] <app jar> [app options]
+ """Usage: spark-submit [options] <app jar | python file> [app options]
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
- | --deploy-mode DEPLOY_MODE Mode to deploy the app in, either 'client' or 'cluster'.
- | --class CLASS_NAME Name of your app's main class (required for Java apps).
- | --name NAME The name of your application (Default: 'Spark').
- | --jars JARS A comma-separated list of local jars to include on the
- | driver classpath and that SparkContext.addJar will work
- | with. Doesn't work on standalone with 'cluster' deploy mode.
- | --files FILES Comma separated list of files to be placed in the working dir
- | of each executor.
+ | --deploy-mode DEPLOY_MODE Where to run the driver program: either "client" to run
+ | on the local machine, or "cluster" to run inside cluster.
+ | --class CLASS_NAME Your application's main class (for Java / Scala apps).
+ | --name NAME A name of your application.
+ | --jars JARS Comma-separated list of local jars to include on the driver
+ | and executor classpaths. Doesn't work for drivers in
+ | standalone mode with "cluster" deploy mode.
+ | --py-files PY_FILES Comma-separated list of .zip or .egg files to place on the
+ | PYTHONPATH for Python apps.
+ | --files FILES Comma-separated list of files to be placed in the working
+ | directory of each executor.
| --properties-file FILE Path to a file from which to load extra properties. If not
| specified, this will look for conf/spark-defaults.conf.
|
| --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512M).
- | --driver-java-options Extra Java options to pass to the driver
- | --driver-library-path Extra library path entries to pass to the driver
+ | --driver-java-options Extra Java options to pass to the driver.
+ | --driver-library-path Extra library path entries to pass to the driver.
| --driver-class-path Extra class path entries to pass to the driver. Note that
| jars added with --jars are automatically included in the
| classpath.
@@ -311,10 +351,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
|
| YARN-only:
| --executor-cores NUM Number of cores per executor (Default: 1).
- | --queue QUEUE_NAME The YARN queue to submit to (Default: 'default').
- | --num-executors NUM Number of executors to (Default: 2).
+ | --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
+ | --num-executors NUM Number of executors to launch (Default: 2).
| --archives ARCHIVES Comma separated list of archives to be extracted into the
- | working dir of each executor.""".stripMargin
+ | working directory of each executor.""".stripMargin
)
SparkSubmit.exitFn()
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index bef4dab3d7..202bd46956 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -46,7 +46,6 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
* Various utility methods used by Spark.
*/
private[spark] object Utils extends Logging {
-
val random = new Random()
def sparkBin(sparkHome: String, which: String): File = {
@@ -1082,4 +1081,11 @@ private[spark] object Utils extends Logging {
def isTesting = {
sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
}
+
+ /**
+ * Strip the directory from a path name
+ */
+ def stripDirectory(path: String): String = {
+ path.split(File.separator).last
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index b3541b4a40..d7e3b22ed4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -83,7 +83,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}
test("handle binary specified but not class") {
- testPrematureExit(Array("foo.jar"), "Must specify a main class")
+ testPrematureExit(Array("foo.jar"), "No main class")
}
test("handles arguments with --key=val") {
@@ -94,9 +94,9 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}
test("handles arguments to user program") {
- val clArgs = Seq("--name", "myApp", "userjar.jar", "some", "--random", "args", "here")
+ val clArgs = Seq("--name", "myApp", "--class", "Foo", "userjar.jar", "some", "--weird", "args")
val appArgs = new SparkSubmitArguments(clArgs)
- appArgs.childArgs should be (Seq("some", "--random", "args", "here"))
+ appArgs.childArgs should be (Seq("some", "--weird", "args"))
}
test("handles YARN cluster mode") {
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 98c456228a..8ea22e15a4 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -60,12 +60,9 @@ By default, PySpark requires `python` to be available on the system `PATH` and u
All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.net/), are bundled with PySpark and automatically imported.
-Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`.
-The script automatically adds the `bin/pyspark` package to the `PYTHONPATH`.
+Standalone PySpark applications should be run using the `bin/spark-submit` script, which automatically
+configures the Java and Python environment for running Spark.
-# Running PySpark on YARN
-
-To run PySpark against a YARN cluster, simply set the MASTER environment variable to "yarn-client".
# Interactive Use
@@ -103,7 +100,7 @@ $ MASTER=local[4] ./bin/pyspark
## IPython
-It is also possible to launch PySpark in [IPython](http://ipython.org), the
+It is also possible to launch the PySpark shell in [IPython](http://ipython.org), the
enhanced Python interpreter. PySpark works with IPython 1.0.0 and later. To
use IPython, set the `IPYTHON` variable to `1` when running `bin/pyspark`:
@@ -123,18 +120,17 @@ IPython also works on a cluster or on multiple cores if you set the `MASTER` env
# Standalone Programs
-PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using `bin/pyspark`.
+PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using `bin/spark-submit`.
The Quick Start guide includes a [complete example](quick-start.html#a-standalone-app-in-python) of a standalone Python application.
-Code dependencies can be deployed by listing them in the `pyFiles` option in the SparkContext constructor:
+Code dependencies can be deployed by passing .zip or .egg files in the `--py-files` option of `spark-submit`:
-{% highlight python %}
-from pyspark import SparkContext
-sc = SparkContext("local", "App Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg'])
+{% highlight bash %}
+./bin/spark-submit --py-files lib1.zip,lib2.zip my_script.py
{% endhighlight %}
Files listed here will be added to the `PYTHONPATH` and shipped to remote worker machines.
-Code dependencies can be added to an existing SparkContext using its `addPyFile()` method.
+Code dependencies can also be added to an existing SparkContext at runtime using its `addPyFile()` method.
You can set [configuration properties](configuration.html#spark-properties) by passing a
[SparkConf](api/python/pyspark.conf.SparkConf-class.html) object to SparkContext:
@@ -142,12 +138,16 @@ You can set [configuration properties](configuration.html#spark-properties) by p
{% highlight python %}
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
- .setMaster("local")
.setAppName("My app")
.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
{% endhighlight %}
+`spark-submit` supports launching Python applications on standalone, Mesos or YARN clusters, through
+its `--master` argument. However, it currently requires the Python driver program to run on the local
+machine, not the cluster (i.e. the `--deploy-mode` parameter cannot be `cluster`).
+
+
# API Docs
[API documentation](api/python/index.html) for PySpark is available as Epydoc.
@@ -164,6 +164,6 @@ some example applications.
PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/spark/tree/master/python/examples).
You can run them by passing the files to `pyspark`; e.g.:
- ./bin/pyspark python/examples/wordcount.py
+ ./bin/spark-submit python/examples/wordcount.py
Each program prints usage help when run without arguments.
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 1ad05d9e46..7f9746ec4a 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -356,7 +356,8 @@ object SparkBuild extends Build {
"com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm),
"org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock),
"com.clearspring.analytics" % "stream" % "2.5.1" excludeAll(excludeFastutil),
- "org.spark-project" % "pyrolite" % "2.0.1"
+ "org.spark-project" % "pyrolite" % "2.0.1",
+ "net.sf.py4j" % "py4j" % "0.8.1"
),
libraryDependencies ++= maybeAvro
)
@@ -569,7 +570,6 @@ object SparkBuild extends Build {
)
def assemblyProjSettings = sharedSettings ++ Seq(
- libraryDependencies += "net.sf.py4j" % "py4j" % "0.8.1",
name := "spark-assembly",
assembleDeps in Compile <<= (packageProjects.map(packageBin in Compile in _) ++ Seq(packageDependency in Compile)).dependOn,
jarName in assembly <<= version map { v => "spark-assembly-" + v + "-hadoop" + hadoopVersion + ".jar" },
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index c74dc5fd4f..c7dc85ea03 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -158,6 +158,12 @@ class SparkContext(object):
for path in (pyFiles or []):
self.addPyFile(path)
+ # Deploy code dependencies set by spark-submit; these will already have been added
+ # with SparkContext.addFile, so we just need to add them
+ for path in self._conf.get("spark.submit.pyFiles", "").split(","):
+ if path != "":
+ self._python_includes.append(os.path.basename(path))
+
# Create a temporary directory inside spark.local.dir:
local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
self._temp_dir = \
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 032d960e40..3d0936fdca 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -27,39 +27,43 @@ from py4j.java_gateway import java_import, JavaGateway, GatewayClient
def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]
- set_env_vars_for_yarn()
-
- # Launch the Py4j gateway using Spark's run command so that we pick up the
- # proper classpath and settings from spark-env.sh
- on_windows = platform.system() == "Windows"
- script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
- command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
- "--die-on-broken-pipe", "0"]
- if not on_windows:
- # Don't send ctrl-c / SIGINT to the Java gateway:
- def preexec_func():
- signal.signal(signal.SIGINT, signal.SIG_IGN)
- proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
+ gateway_port = -1
+ if "PYSPARK_GATEWAY_PORT" in os.environ:
+ gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
else:
- # preexec_fn not supported on Windows
- proc = Popen(command, stdout=PIPE, stdin=PIPE)
- # Determine which ephemeral port the server started on:
- port = int(proc.stdout.readline())
- # Create a thread to echo output from the GatewayServer, which is required
- # for Java log output to show up:
- class EchoOutputThread(Thread):
- def __init__(self, stream):
- Thread.__init__(self)
- self.daemon = True
- self.stream = stream
+ # Launch the Py4j gateway using Spark's run command so that we pick up the
+ # proper classpath and settings from spark-env.sh
+ on_windows = platform.system() == "Windows"
+ script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
+ command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
+ "--die-on-broken-pipe", "0"]
+ if not on_windows:
+ # Don't send ctrl-c / SIGINT to the Java gateway:
+ def preexec_func():
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
+ else:
+ # preexec_fn not supported on Windows
+ proc = Popen(command, stdout=PIPE, stdin=PIPE)
+ # Determine which ephemeral port the server started on:
+ gateway_port = int(proc.stdout.readline())
+ # Create a thread to echo output from the GatewayServer, which is required
+ # for Java log output to show up:
+ class EchoOutputThread(Thread):
+ def __init__(self, stream):
+ Thread.__init__(self)
+ self.daemon = True
+ self.stream = stream
+
+ def run(self):
+ while True:
+ line = self.stream.readline()
+ sys.stderr.write(line)
+ EchoOutputThread(proc.stdout).start()
- def run(self):
- while True:
- line = self.stream.readline()
- sys.stderr.write(line)
- EchoOutputThread(proc.stdout).start()
# Connect to the gateway
- gateway = JavaGateway(GatewayClient(port=port), auto_convert=False)
+ gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=False)
+
# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
@@ -70,28 +74,5 @@ def launch_gateway():
java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
java_import(gateway.jvm, "scala.Tuple2")
- return gateway
-def set_env_vars_for_yarn():
- # Add the spark jar, which includes the pyspark files, to the python path
- env_map = parse_env(os.environ.get("SPARK_YARN_USER_ENV", ""))
- if "PYTHONPATH" in env_map:
- env_map["PYTHONPATH"] += ":spark.jar"
- else:
- env_map["PYTHONPATH"] = "spark.jar"
-
- os.environ["SPARK_YARN_USER_ENV"] = ",".join(k + '=' + v for (k, v) in env_map.items())
-
-def parse_env(env_str):
- # Turns a comma-separated of env settings into a dict that maps env vars to
- # their values.
- env = {}
- for var_str in env_str.split(","):
- parts = var_str.split("=")
- if len(parts) == 2:
- env[parts[0]] = parts[1]
- elif len(var_str) > 0:
- print "Invalid entry in SPARK_YARN_USER_ENV: " + var_str
- sys.exit(1)
-
- return env
+ return gateway
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 8cf9d9cf1b..64f2eeb12b 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -22,11 +22,14 @@ individual modules.
from fileinput import input
from glob import glob
import os
+import re
import shutil
+import subprocess
import sys
-from tempfile import NamedTemporaryFile
+import tempfile
import time
import unittest
+import zipfile
from pyspark.context import SparkContext
from pyspark.files import SparkFiles
@@ -55,7 +58,7 @@ class TestCheckpoint(PySparkTestCase):
def setUp(self):
PySparkTestCase.setUp(self)
- self.checkpointDir = NamedTemporaryFile(delete=False)
+ self.checkpointDir = tempfile.NamedTemporaryFile(delete=False)
os.unlink(self.checkpointDir.name)
self.sc.setCheckpointDir(self.checkpointDir.name)
@@ -148,7 +151,7 @@ class TestRDDFunctions(PySparkTestCase):
# Regression test for SPARK-970
x = u"\u00A1Hola, mundo!"
data = self.sc.parallelize([x])
- tempFile = NamedTemporaryFile(delete=True)
+ tempFile = tempfile.NamedTemporaryFile(delete=True)
tempFile.close()
data.saveAsTextFile(tempFile.name)
raw_contents = ''.join(input(glob(tempFile.name + "/part-0000*")))
@@ -172,7 +175,7 @@ class TestRDDFunctions(PySparkTestCase):
def test_deleting_input_files(self):
# Regression test for SPARK-1025
- tempFile = NamedTemporaryFile(delete=False)
+ tempFile = tempfile.NamedTemporaryFile(delete=False)
tempFile.write("Hello World!")
tempFile.close()
data = self.sc.textFile(tempFile.name)
@@ -236,5 +239,125 @@ class TestDaemon(unittest.TestCase):
from signal import SIGTERM
self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
+
+class TestSparkSubmit(unittest.TestCase):
+ def setUp(self):
+ self.programDir = tempfile.mkdtemp()
+ self.sparkSubmit = os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit")
+
+ def tearDown(self):
+ shutil.rmtree(self.programDir)
+
+ def createTempFile(self, name, content):
+ """
+ Create a temp file with the given name and content and return its path.
+ Strips leading spaces from content up to the first '|' in each line.
+ """
+ pattern = re.compile(r'^ *\|', re.MULTILINE)
+ content = re.sub(pattern, '', content.strip())
+ path = os.path.join(self.programDir, name)
+ with open(path, "w") as f:
+ f.write(content)
+ return path
+
+ def createFileInZip(self, name, content):
+ """
+ Create a zip archive containing a file with the given content and return its path.
+ Strips leading spaces from content up to the first '|' in each line.
+ """
+ pattern = re.compile(r'^ *\|', re.MULTILINE)
+ content = re.sub(pattern, '', content.strip())
+ path = os.path.join(self.programDir, name + ".zip")
+ with zipfile.ZipFile(path, 'w') as zip:
+ zip.writestr(name, content)
+ return path
+
+ def test_single_script(self):
+ """Submit and test a single script file"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect()
+ """)
+ proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[2, 4, 6]", out)
+
+ def test_script_with_local_functions(self):
+ """Submit and test a single script file calling a global function"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |
+ |def foo(x):
+ | return x * 3
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(foo).collect()
+ """)
+ proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[3, 6, 9]", out)
+
+ def test_module_dependency(self):
+ """Submit and test a script with a dependency on another module"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |from mylib import myfunc
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(myfunc).collect()
+ """)
+ zip = self.createFileInZip("mylib.py", """
+ |def myfunc(x):
+ | return x + 1
+ """)
+ proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, script],
+ stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[2, 3, 4]", out)
+
+ def test_module_dependency_on_cluster(self):
+ """Submit and test a script with a dependency on another module on a cluster"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |from mylib import myfunc
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(myfunc).collect()
+ """)
+ zip = self.createFileInZip("mylib.py", """
+ |def myfunc(x):
+ | return x + 1
+ """)
+ proc = subprocess.Popen(
+ [self.sparkSubmit, "--py-files", zip, "--master", "local-cluster[1,1,512]", script],
+ stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[2, 3, 4]", out)
+
+ def test_single_script_on_cluster(self):
+ """Submit and test a single script on a cluster"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |
+ |def foo(x):
+ | return x * 2
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(foo).collect()
+ """)
+ proc = subprocess.Popen(
+ [self.sparkSubmit, "--master", "local-cluster[1,1,512]", script],
+ stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[2, 4, 6]", out)
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index e33f4f9803..566d96e16e 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -45,8 +45,7 @@ class ReplSuite extends FunSuite {
}
val interp = new SparkILoop(in, new PrintWriter(out), master)
org.apache.spark.repl.Main.interp = interp
- val separator = System.getProperty("path.separator")
- interp.process(Array("-classpath", paths.mkString(separator)))
+ interp.process(Array("-classpath", paths.mkString(File.pathSeparator)))
org.apache.spark.repl.Main.interp = null
if (interp.sparkContext != null) {
interp.sparkContext.stop()