aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--assembly/pom.xml13
-rw-r--r--core/pom.xml5
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala42
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala84
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala183
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala84
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala6
-rw-r--r--docs/python-programming-guide.md28
-rw-r--r--project/SparkBuild.scala4
-rw-r--r--python/pyspark/context.py6
-rw-r--r--python/pyspark/java_gateway.py89
-rw-r--r--python/pyspark/tests.py131
-rw-r--r--repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala3
16 files changed, 505 insertions, 194 deletions
diff --git a/assembly/pom.xml b/assembly/pom.xml
index bdb3880649..7d123fb1d7 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -40,14 +40,6 @@
<deb.user>root</deb.user>
</properties>
- <repositories>
- <!-- A repository in the local filesystem for the Py4J JAR, which is not in Maven central -->
- <repository>
- <id>lib</id>
- <url>file://${project.basedir}/lib</url>
- </repository>
- </repositories>
-
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
@@ -84,11 +76,6 @@
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>net.sf.py4j</groupId>
- <artifactId>py4j</artifactId>
- <version>0.8.1</version>
- </dependency>
</dependencies>
<build>
diff --git a/core/pom.xml b/core/pom.xml
index c24c7be204..8fe215ab24 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -247,6 +247,11 @@
<artifactId>pyrolite</artifactId>
<version>2.0.1</version>
</dependency>
+ <dependency>
+ <groupId>net.sf.py4j</groupId>
+ <artifactId>py4j</artifactId>
+ <version>0.8.1</version>
+ </dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index d40ed27da5..806e77d98f 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -17,6 +17,8 @@
package org.apache.spark
+import java.io.File
+
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.concurrent.Await
@@ -304,7 +306,7 @@ object SparkEnv extends Logging {
k == "java.class.path"
}.getOrElse(("", ""))
val classPathEntries = classPathProperty._2
- .split(conf.get("path.separator", ":"))
+ .split(File.pathSeparator)
.filterNot(e => e.isEmpty)
.map(e => (e, "System Classpath"))
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
new file mode 100644
index 0000000000..cf69fa1d53
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.SparkContext
+
+private[spark] object PythonUtils {
+ /** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */
+ def sparkPythonPath: String = {
+ val pythonPath = new ArrayBuffer[String]
+ for (sparkHome <- sys.env.get("SPARK_HOME")) {
+ pythonPath += Seq(sparkHome, "python").mkString(File.separator)
+ pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.1-src.zip").mkString(File.separator)
+ }
+ pythonPath ++= SparkContext.jarOfObject(this)
+ pythonPath.mkString(File.pathSeparator)
+ }
+
+ /** Merge PYTHONPATHS with the appropriate separator. Ignores blank strings. */
+ def mergePythonPaths(paths: String*): String = {
+ paths.filter(_ != "").mkString(File.pathSeparator)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 02799ce009..b0bf4e052b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -37,6 +37,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
var daemonPort: Int = 0
+ val pythonPath = PythonUtils.mergePythonPaths(
+ PythonUtils.sparkPythonPath, envVars.getOrElse("PYTHONPATH", ""))
+
def create(): Socket = {
if (useDaemon) {
createThroughDaemon()
@@ -78,9 +81,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
// Create and start the worker
- val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
+ val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.worker"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
+ workerEnv.put("PYTHONPATH", pythonPath)
val worker = pb.start()
// Redirect the worker's stderr to ours
@@ -151,9 +155,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
try {
// Create and start the daemon
- val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
+ val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.daemon"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
+ workerEnv.put("PYTHONPATH", pythonPath)
daemon = pb.start()
// Redirect the stderr to ours
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
new file mode 100644
index 0000000000..f2e7c7a508
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io.{IOException, File, InputStream, OutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.api.python.PythonUtils
+
+/**
+ * A main class used by spark-submit to launch Python applications. It executes python as a
+ * subprocess and then has it connect back to the JVM to access system properties, etc.
+ */
+object PythonRunner {
+ def main(args: Array[String]) {
+ val primaryResource = args(0)
+ val pyFiles = args(1)
+ val otherArgs = args.slice(2, args.length)
+
+ val pythonExec = sys.env.get("PYSPARK_PYTHON").getOrElse("python") // TODO: get this from conf
+
+ // Launch a Py4J gateway server for the process to connect to; this will let it see our
+ // Java system properties and such
+ val gatewayServer = new py4j.GatewayServer(null, 0)
+ gatewayServer.start()
+
+ // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
+ // python directories in SPARK_HOME (if set), and any files in the pyFiles argument
+ val pathElements = new ArrayBuffer[String]
+ pathElements ++= pyFiles.split(",")
+ pathElements += PythonUtils.sparkPythonPath
+ pathElements += sys.env.getOrElse("PYTHONPATH", "")
+ val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
+
+ // Launch Python process
+ val builder = new ProcessBuilder(Seq(pythonExec, "-u", primaryResource) ++ otherArgs)
+ val env = builder.environment()
+ env.put("PYTHONPATH", pythonPath)
+ env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
+ builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
+ val process = builder.start()
+
+ new RedirectThread(process.getInputStream, System.out, "redirect output").start()
+
+ System.exit(process.waitFor())
+ }
+
+ /**
+ * A utility class to redirect the child process's stdout or stderr
+ */
+ class RedirectThread(in: InputStream, out: OutputStream, name: String) extends Thread(name) {
+ setDaemon(true)
+ override def run() {
+ scala.util.control.Exception.ignoring(classOf[IOException]) {
+ // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
+ val buf = new Array[Byte](1024)
+ var len = in.read(buf)
+ while (len != -1) {
+ out.write(buf, 0, len)
+ out.flush()
+ len = in.read(buf)
+ }
+ }
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index fb30e8a70f..e39723f383 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -60,11 +60,11 @@ object SparkSubmit {
private[spark] var exitFn: () => Unit = () => System.exit(-1)
private[spark] def printErrorAndExit(str: String) = {
- printStream.println("error: " + str)
- printStream.println("run with --help for more information or --verbose for debugging output")
+ printStream.println("Error: " + str)
+ printStream.println("Run with --help for usage help or --verbose for debug output")
exitFn()
}
- private[spark] def printWarning(str: String) = printStream.println("warning: " + str)
+ private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)
/**
* @return
@@ -72,43 +72,43 @@ object SparkSubmit {
* entries for the child, a list of system propertes, a list of env vars
* and the main class for the child
*/
- private[spark] def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
+ private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String],
ArrayBuffer[String], Map[String, String], String) = {
- if (appArgs.master.startsWith("local")) {
+ if (args.master.startsWith("local")) {
clusterManager = LOCAL
- } else if (appArgs.master.startsWith("yarn")) {
+ } else if (args.master.startsWith("yarn")) {
clusterManager = YARN
- } else if (appArgs.master.startsWith("spark")) {
+ } else if (args.master.startsWith("spark")) {
clusterManager = STANDALONE
- } else if (appArgs.master.startsWith("mesos")) {
+ } else if (args.master.startsWith("mesos")) {
clusterManager = MESOS
} else {
- printErrorAndExit("master must start with yarn, mesos, spark, or local")
+ printErrorAndExit("Master must start with yarn, mesos, spark, or local")
}
// Because "yarn-cluster" and "yarn-client" encapsulate both the master
// and deploy mode, we have some logic to infer the master and deploy mode
// from each other if only one is specified, or exit early if they are at odds.
- if (appArgs.deployMode == null &&
- (appArgs.master == "yarn-standalone" || appArgs.master == "yarn-cluster")) {
- appArgs.deployMode = "cluster"
+ if (args.deployMode == null &&
+ (args.master == "yarn-standalone" || args.master == "yarn-cluster")) {
+ args.deployMode = "cluster"
}
- if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") {
+ if (args.deployMode == "cluster" && args.master == "yarn-client") {
printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible")
}
- if (appArgs.deployMode == "client" &&
- (appArgs.master == "yarn-standalone" || appArgs.master == "yarn-cluster")) {
- printErrorAndExit("Deploy mode \"client\" and master \"" + appArgs.master
+ if (args.deployMode == "client" &&
+ (args.master == "yarn-standalone" || args.master == "yarn-cluster")) {
+ printErrorAndExit("Deploy mode \"client\" and master \"" + args.master
+ "\" are not compatible")
}
- if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) {
- appArgs.master = "yarn-cluster"
+ if (args.deployMode == "cluster" && args.master.startsWith("yarn")) {
+ args.master = "yarn-cluster"
}
- if (appArgs.deployMode != "cluster" && appArgs.master.startsWith("yarn")) {
- appArgs.master = "yarn-client"
+ if (args.deployMode != "cluster" && args.master.startsWith("yarn")) {
+ args.master = "yarn-client"
}
- val deployOnCluster = Option(appArgs.deployMode).getOrElse("client") == "cluster"
+ val deployOnCluster = Option(args.deployMode).getOrElse("client") == "cluster"
val childClasspath = new ArrayBuffer[String]()
val childArgs = new ArrayBuffer[String]()
@@ -116,76 +116,93 @@ object SparkSubmit {
var childMainClass = ""
if (clusterManager == MESOS && deployOnCluster) {
- printErrorAndExit("Mesos does not support running the driver on the cluster")
+ printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
}
+ // If we're running a Python app, set the Java class to run to be our PythonRunner, add
+ // Python files to deployment list, and pass the main file and Python path to PythonRunner
+ if (args.isPython) {
+ if (deployOnCluster) {
+ printErrorAndExit("Cannot currently run Python driver programs on cluster")
+ }
+ args.mainClass = "org.apache.spark.deploy.PythonRunner"
+ args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource)
+ val pyFiles = Option(args.pyFiles).getOrElse("")
+ args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs
+ args.primaryResource = RESERVED_JAR_NAME
+ sysProps("spark.submit.pyFiles") = pyFiles
+ }
+
+ // If we're deploying into YARN, use yarn.Client as a wrapper around the user class
if (!deployOnCluster) {
- childMainClass = appArgs.mainClass
- if (appArgs.primaryResource != RESERVED_JAR_NAME) {
- childClasspath += appArgs.primaryResource
+ childMainClass = args.mainClass
+ if (args.primaryResource != RESERVED_JAR_NAME) {
+ childClasspath += args.primaryResource
}
} else if (clusterManager == YARN) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
- childArgs += ("--jar", appArgs.primaryResource)
- childArgs += ("--class", appArgs.mainClass)
+ childArgs += ("--jar", args.primaryResource)
+ childArgs += ("--class", args.mainClass)
}
+ // Make sure YARN is included in our build if we're trying to use it
if (clusterManager == YARN) {
- // The choice of class is arbitrary, could use any spark-yarn class
if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
- val msg = "Could not load YARN classes. This copy of Spark may not have been compiled " +
- "with YARN support."
- throw new Exception(msg)
+ printErrorAndExit("Could not load YARN classes. " +
+ "This copy of Spark may not have been compiled with YARN support.")
}
}
// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"
+ // A list of rules to map each argument to system properties or command-line options in
+ // each deploy mode; we iterate through these below
val options = List[OptionAssigner](
- new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
- new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true,
+ OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
+ OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraClassPath"),
- new OptionAssigner(appArgs.driverExtraJavaOptions, STANDALONE | YARN, true,
+ OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
sysProp = "spark.driver.extraJavaOptions"),
- new OptionAssigner(appArgs.driverExtraLibraryPath, STANDALONE | YARN, true,
+ OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraLibraryPath"),
- new OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"),
- new OptionAssigner(appArgs.name, YARN, true, clOption = "--name"),
- new OptionAssigner(appArgs.queue, YARN, true, clOption = "--queue"),
- new OptionAssigner(appArgs.queue, YARN, false, sysProp = "spark.yarn.queue"),
- new OptionAssigner(appArgs.numExecutors, YARN, true, clOption = "--num-executors"),
- new OptionAssigner(appArgs.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
- new OptionAssigner(appArgs.executorMemory, YARN, true, clOption = "--executor-memory"),
- new OptionAssigner(appArgs.executorMemory, STANDALONE | MESOS | YARN, false,
+ OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
+ OptionAssigner(args.name, YARN, true, clOption = "--name"),
+ OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
+ OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
+ OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
+ OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
+ OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
+ OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
sysProp = "spark.executor.memory"),
- new OptionAssigner(appArgs.driverMemory, STANDALONE, true, clOption = "--memory"),
- new OptionAssigner(appArgs.driverCores, STANDALONE, true, clOption = "--cores"),
- new OptionAssigner(appArgs.executorCores, YARN, true, clOption = "--executor-cores"),
- new OptionAssigner(appArgs.executorCores, YARN, false, sysProp = "spark.executor.cores"),
- new OptionAssigner(appArgs.totalExecutorCores, STANDALONE | MESOS, false,
+ OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
+ OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
+ OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
+ OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
+ OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
sysProp = "spark.cores.max"),
- new OptionAssigner(appArgs.files, YARN, false, sysProp = "spark.yarn.dist.files"),
- new OptionAssigner(appArgs.files, YARN, true, clOption = "--files"),
- new OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
- new OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"),
- new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars"),
- new OptionAssigner(appArgs.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
- new OptionAssigner(appArgs.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"),
- new OptionAssigner(appArgs.name, LOCAL | STANDALONE | MESOS, false,
- sysProp = "spark.app.name")
+ OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
+ OptionAssigner(args.files, YARN, true, clOption = "--files"),
+ OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
+ OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
+ OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
+ OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
+ OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
+ OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"),
+ OptionAssigner(args.name, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.app.name")
)
// For client mode make any added jars immediately visible on the classpath
- if (appArgs.jars != null && !deployOnCluster) {
- for (jar <- appArgs.jars.split(",")) {
+ if (args.jars != null && !deployOnCluster) {
+ for (jar <- args.jars.split(",")) {
childClasspath += jar
}
}
+ // Map all arguments to command-line options or system properties for our chosen mode
for (opt <- options) {
if (opt.value != null && deployOnCluster == opt.deployOnCluster &&
- (clusterManager & opt.clusterManager) != 0) {
+ (clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) {
childArgs += (opt.clOption, opt.value)
} else if (opt.sysProp != null) {
@@ -197,32 +214,35 @@ object SparkSubmit {
// For standalone mode, add the application jar automatically so the user doesn't have to
// call sc.addJar. TODO: Standalone mode in the cluster
if (clusterManager == STANDALONE) {
- val existingJars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
- sysProps.put("spark.jars", (existingJars ++ Seq(appArgs.primaryResource)).mkString(","))
+ var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
+ if (args.primaryResource != RESERVED_JAR_NAME) {
+ jars = jars ++ Seq(args.primaryResource)
+ }
+ sysProps.put("spark.jars", jars.mkString(","))
}
if (deployOnCluster && clusterManager == STANDALONE) {
- if (appArgs.supervise) {
+ if (args.supervise) {
childArgs += "--supervise"
}
childMainClass = "org.apache.spark.deploy.Client"
childArgs += "launch"
- childArgs += (appArgs.master, appArgs.primaryResource, appArgs.mainClass)
+ childArgs += (args.master, args.primaryResource, args.mainClass)
}
// Arguments to be passed to user program
- if (appArgs.childArgs != null) {
+ if (args.childArgs != null) {
if (!deployOnCluster || clusterManager == STANDALONE) {
- childArgs ++= appArgs.childArgs
+ childArgs ++= args.childArgs
} else if (clusterManager == YARN) {
- for (arg <- appArgs.childArgs) {
+ for (arg <- args.childArgs) {
childArgs += ("--arg", arg)
}
}
}
- for ((k, v) <- appArgs.getDefaultSparkProperties) {
+ for ((k, v) <- args.getDefaultSparkProperties) {
if (!sysProps.contains(k)) sysProps(k) = v
}
@@ -230,8 +250,8 @@ object SparkSubmit {
}
private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
- sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) {
-
+ sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false)
+ {
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
@@ -273,15 +293,26 @@ object SparkSubmit {
val url = localJarFile.getAbsoluteFile.toURI.toURL
loader.addURL(url)
}
+
+ /**
+ * Merge a sequence of comma-separated file lists, some of which may be null to indicate
+ * no files, into a single comma-separated string.
+ */
+ private[spark] def mergeFileLists(lists: String*): String = {
+ val merged = lists.filter(_ != null)
+ .flatMap(_.split(","))
+ .mkString(",")
+ if (merged == "") null else merged
+ }
}
/**
* Provides an indirection layer for passing arguments as system properties or flags to
* the user's driver program or to downstream launcher tools.
*/
-private[spark] class OptionAssigner(val value: String,
- val clusterManager: Int,
- val deployOnCluster: Boolean,
- val clOption: String = null,
- val sysProp: String = null
-) { }
+private[spark] case class OptionAssigner(
+ value: String,
+ clusterManager: Int,
+ deployOnCluster: Boolean,
+ clOption: String = null,
+ sysProp: String = null)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 7031cdd9b4..2d327aa3fb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy
import java.io.{File, FileInputStream, IOException}
import java.util.Properties
+import java.util.jar.JarFile
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -52,6 +53,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
var jars: String = null
var verbose: Boolean = false
+ var isPython: Boolean = false
+ var pyFiles: String = null
parseOpts(args.toList)
loadDefaults()
@@ -76,7 +79,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
}
/** Fill in any undefined values based on the current properties file or built-in defaults. */
- private def loadDefaults() = {
+ private def loadDefaults(): Unit = {
// Use common defaults file, if not specified by user
if (propertiesFile == null) {
@@ -107,15 +110,43 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
master = Option(master).getOrElse(System.getenv("MASTER"))
deployMode = Option(deployMode).getOrElse(System.getenv("DEPLOY_MODE"))
+ // Try to set main class from JAR if no --class argument is given
+ if (mainClass == null && !isPython && primaryResource != null) {
+ try {
+ val jar = new JarFile(primaryResource)
+ // Note that this might still return null if no main-class is set; we catch that later
+ mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
+ } catch {
+ case e: Exception =>
+ SparkSubmit.printErrorAndExit("Failed to read JAR: " + primaryResource)
+ return
+ }
+ }
+
// Global defaults. These should be keep to minimum to avoid confusing behavior.
master = Option(master).getOrElse("local[*]")
+
+ // Set name from main class if not given
+ name = Option(name).orElse(Option(mainClass)).orNull
+ if (name == null && primaryResource != null) {
+ name = Utils.stripDirectory(primaryResource)
+ }
}
/** Ensure that required fields exists. Call this only once all defaults are loaded. */
private def checkRequiredArguments() = {
- if (args.length == 0) printUsageAndExit(-1)
- if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
- if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")
+ if (args.length == 0) {
+ printUsageAndExit(-1)
+ }
+ if (primaryResource == null) {
+ SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR or Python file)")
+ }
+ if (mainClass == null && !isPython) {
+ SparkSubmit.printErrorAndExit("No main class set in JAR; please specify one with --class")
+ }
+ if (pyFiles != null && !isPython) {
+ SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
+ }
if (master.startsWith("yarn")) {
val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
@@ -143,6 +174,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
| queue $queue
| numExecutors $numExecutors
| files $files
+ | pyFiles $pyFiles
| archives $archives
| mainClass $mainClass
| primaryResource $primaryResource
@@ -234,6 +266,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
files = value
parse(tail)
+ case ("--py-files") :: value :: tail =>
+ pyFiles = value
+ parse(tail)
+
case ("--archives") :: value :: tail =>
archives = value
parse(tail)
@@ -260,9 +296,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
val errMessage = s"Unrecognized option '$value'."
SparkSubmit.printErrorAndExit(errMessage)
case v =>
- primaryResource = v
- inSparkOpts = false
- parse(tail)
+ primaryResource = v
+ inSparkOpts = false
+ isPython = v.endsWith(".py")
+ parse(tail)
}
} else {
childArgs += value
@@ -270,7 +307,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
}
case Nil =>
- }
+ }
}
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
@@ -279,23 +316,26 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
outStream.println("Unknown/unsupported param " + unknownParam)
}
outStream.println(
- """Usage: spark-submit [options] <app jar> [app options]
+ """Usage: spark-submit [options] <app jar | python file> [app options]
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
- | --deploy-mode DEPLOY_MODE Mode to deploy the app in, either 'client' or 'cluster'.
- | --class CLASS_NAME Name of your app's main class (required for Java apps).
- | --name NAME The name of your application (Default: 'Spark').
- | --jars JARS A comma-separated list of local jars to include on the
- | driver classpath and that SparkContext.addJar will work
- | with. Doesn't work on standalone with 'cluster' deploy mode.
- | --files FILES Comma separated list of files to be placed in the working dir
- | of each executor.
+ | --deploy-mode DEPLOY_MODE Where to run the driver program: either "client" to run
+ | on the local machine, or "cluster" to run inside cluster.
+ | --class CLASS_NAME Your application's main class (for Java / Scala apps).
+ | --name NAME A name of your application.
+ | --jars JARS Comma-separated list of local jars to include on the driver
+ | and executor classpaths. Doesn't work for drivers in
+ | standalone mode with "cluster" deploy mode.
+ | --py-files PY_FILES Comma-separated list of .zip or .egg files to place on the
+ | PYTHONPATH for Python apps.
+ | --files FILES Comma-separated list of files to be placed in the working
+ | directory of each executor.
| --properties-file FILE Path to a file from which to load extra properties. If not
| specified, this will look for conf/spark-defaults.conf.
|
| --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512M).
- | --driver-java-options Extra Java options to pass to the driver
- | --driver-library-path Extra library path entries to pass to the driver
+ | --driver-java-options Extra Java options to pass to the driver.
+ | --driver-library-path Extra library path entries to pass to the driver.
| --driver-class-path Extra class path entries to pass to the driver. Note that
| jars added with --jars are automatically included in the
| classpath.
@@ -311,10 +351,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
|
| YARN-only:
| --executor-cores NUM Number of cores per executor (Default: 1).
- | --queue QUEUE_NAME The YARN queue to submit to (Default: 'default').
- | --num-executors NUM Number of executors to (Default: 2).
+ | --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
+ | --num-executors NUM Number of executors to launch (Default: 2).
| --archives ARCHIVES Comma separated list of archives to be extracted into the
- | working dir of each executor.""".stripMargin
+ | working directory of each executor.""".stripMargin
)
SparkSubmit.exitFn()
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index bef4dab3d7..202bd46956 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -46,7 +46,6 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
* Various utility methods used by Spark.
*/
private[spark] object Utils extends Logging {
-
val random = new Random()
def sparkBin(sparkHome: String, which: String): File = {
@@ -1082,4 +1081,11 @@ private[spark] object Utils extends Logging {
def isTesting = {
sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
}
+
+ /**
+ * Strip the directory from a path name
+ */
+ def stripDirectory(path: String): String = {
+ path.split(File.separator).last
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index b3541b4a40..d7e3b22ed4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -83,7 +83,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}
test("handle binary specified but not class") {
- testPrematureExit(Array("foo.jar"), "Must specify a main class")
+ testPrematureExit(Array("foo.jar"), "No main class")
}
test("handles arguments with --key=val") {
@@ -94,9 +94,9 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}
test("handles arguments to user program") {
- val clArgs = Seq("--name", "myApp", "userjar.jar", "some", "--random", "args", "here")
+ val clArgs = Seq("--name", "myApp", "--class", "Foo", "userjar.jar", "some", "--weird", "args")
val appArgs = new SparkSubmitArguments(clArgs)
- appArgs.childArgs should be (Seq("some", "--random", "args", "here"))
+ appArgs.childArgs should be (Seq("some", "--weird", "args"))
}
test("handles YARN cluster mode") {
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 98c456228a..8ea22e15a4 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -60,12 +60,9 @@ By default, PySpark requires `python` to be available on the system `PATH` and u
All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.net/), are bundled with PySpark and automatically imported.
-Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`.
-The script automatically adds the `bin/pyspark` package to the `PYTHONPATH`.
+Standalone PySpark applications should be run using the `bin/spark-submit` script, which automatically
+configures the Java and Python environment for running Spark.
-# Running PySpark on YARN
-
-To run PySpark against a YARN cluster, simply set the MASTER environment variable to "yarn-client".
# Interactive Use
@@ -103,7 +100,7 @@ $ MASTER=local[4] ./bin/pyspark
## IPython
-It is also possible to launch PySpark in [IPython](http://ipython.org), the
+It is also possible to launch the PySpark shell in [IPython](http://ipython.org), the
enhanced Python interpreter. PySpark works with IPython 1.0.0 and later. To
use IPython, set the `IPYTHON` variable to `1` when running `bin/pyspark`:
@@ -123,18 +120,17 @@ IPython also works on a cluster or on multiple cores if you set the `MASTER` env
# Standalone Programs
-PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using `bin/pyspark`.
+PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using `bin/spark-submit`.
The Quick Start guide includes a [complete example](quick-start.html#a-standalone-app-in-python) of a standalone Python application.
-Code dependencies can be deployed by listing them in the `pyFiles` option in the SparkContext constructor:
+Code dependencies can be deployed by passing .zip or .egg files in the `--py-files` option of `spark-submit`:
-{% highlight python %}
-from pyspark import SparkContext
-sc = SparkContext("local", "App Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg'])
+{% highlight bash %}
+./bin/spark-submit --py-files lib1.zip,lib2.zip my_script.py
{% endhighlight %}
Files listed here will be added to the `PYTHONPATH` and shipped to remote worker machines.
-Code dependencies can be added to an existing SparkContext using its `addPyFile()` method.
+Code dependencies can also be added to an existing SparkContext at runtime using its `addPyFile()` method.
You can set [configuration properties](configuration.html#spark-properties) by passing a
[SparkConf](api/python/pyspark.conf.SparkConf-class.html) object to SparkContext:
@@ -142,12 +138,16 @@ You can set [configuration properties](configuration.html#spark-properties) by p
{% highlight python %}
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
- .setMaster("local")
.setAppName("My app")
.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
{% endhighlight %}
+`spark-submit` supports launching Python applications on standalone, Mesos or YARN clusters, through
+its `--master` argument. However, it currently requires the Python driver program to run on the local
+machine, not the cluster (i.e. the `--deploy-mode` parameter cannot be `cluster`).
+
+
# API Docs
[API documentation](api/python/index.html) for PySpark is available as Epydoc.
@@ -164,6 +164,6 @@ some example applications.
PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/spark/tree/master/python/examples).
You can run them by passing the files to `pyspark`; e.g.:
- ./bin/pyspark python/examples/wordcount.py
+ ./bin/spark-submit python/examples/wordcount.py
Each program prints usage help when run without arguments.
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 1ad05d9e46..7f9746ec4a 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -356,7 +356,8 @@ object SparkBuild extends Build {
"com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm),
"org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock),
"com.clearspring.analytics" % "stream" % "2.5.1" excludeAll(excludeFastutil),
- "org.spark-project" % "pyrolite" % "2.0.1"
+ "org.spark-project" % "pyrolite" % "2.0.1",
+ "net.sf.py4j" % "py4j" % "0.8.1"
),
libraryDependencies ++= maybeAvro
)
@@ -569,7 +570,6 @@ object SparkBuild extends Build {
)
def assemblyProjSettings = sharedSettings ++ Seq(
- libraryDependencies += "net.sf.py4j" % "py4j" % "0.8.1",
name := "spark-assembly",
assembleDeps in Compile <<= (packageProjects.map(packageBin in Compile in _) ++ Seq(packageDependency in Compile)).dependOn,
jarName in assembly <<= version map { v => "spark-assembly-" + v + "-hadoop" + hadoopVersion + ".jar" },
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index c74dc5fd4f..c7dc85ea03 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -158,6 +158,12 @@ class SparkContext(object):
for path in (pyFiles or []):
self.addPyFile(path)
+ # Deploy code dependencies set by spark-submit; these will already have been added
+ # with SparkContext.addFile, so we just need to add them
+ for path in self._conf.get("spark.submit.pyFiles", "").split(","):
+ if path != "":
+ self._python_includes.append(os.path.basename(path))
+
# Create a temporary directory inside spark.local.dir:
local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
self._temp_dir = \
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 032d960e40..3d0936fdca 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -27,39 +27,43 @@ from py4j.java_gateway import java_import, JavaGateway, GatewayClient
def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]
- set_env_vars_for_yarn()
-
- # Launch the Py4j gateway using Spark's run command so that we pick up the
- # proper classpath and settings from spark-env.sh
- on_windows = platform.system() == "Windows"
- script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
- command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
- "--die-on-broken-pipe", "0"]
- if not on_windows:
- # Don't send ctrl-c / SIGINT to the Java gateway:
- def preexec_func():
- signal.signal(signal.SIGINT, signal.SIG_IGN)
- proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
+ gateway_port = -1
+ if "PYSPARK_GATEWAY_PORT" in os.environ:
+ gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
else:
- # preexec_fn not supported on Windows
- proc = Popen(command, stdout=PIPE, stdin=PIPE)
- # Determine which ephemeral port the server started on:
- port = int(proc.stdout.readline())
- # Create a thread to echo output from the GatewayServer, which is required
- # for Java log output to show up:
- class EchoOutputThread(Thread):
- def __init__(self, stream):
- Thread.__init__(self)
- self.daemon = True
- self.stream = stream
+ # Launch the Py4j gateway using Spark's run command so that we pick up the
+ # proper classpath and settings from spark-env.sh
+ on_windows = platform.system() == "Windows"
+ script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
+ command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
+ "--die-on-broken-pipe", "0"]
+ if not on_windows:
+ # Don't send ctrl-c / SIGINT to the Java gateway:
+ def preexec_func():
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
+ else:
+ # preexec_fn not supported on Windows
+ proc = Popen(command, stdout=PIPE, stdin=PIPE)
+ # Determine which ephemeral port the server started on:
+ gateway_port = int(proc.stdout.readline())
+ # Create a thread to echo output from the GatewayServer, which is required
+ # for Java log output to show up:
+ class EchoOutputThread(Thread):
+ def __init__(self, stream):
+ Thread.__init__(self)
+ self.daemon = True
+ self.stream = stream
+
+ def run(self):
+ while True:
+ line = self.stream.readline()
+ sys.stderr.write(line)
+ EchoOutputThread(proc.stdout).start()
- def run(self):
- while True:
- line = self.stream.readline()
- sys.stderr.write(line)
- EchoOutputThread(proc.stdout).start()
# Connect to the gateway
- gateway = JavaGateway(GatewayClient(port=port), auto_convert=False)
+ gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=False)
+
# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
@@ -70,28 +74,5 @@ def launch_gateway():
java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
java_import(gateway.jvm, "scala.Tuple2")
- return gateway
-def set_env_vars_for_yarn():
- # Add the spark jar, which includes the pyspark files, to the python path
- env_map = parse_env(os.environ.get("SPARK_YARN_USER_ENV", ""))
- if "PYTHONPATH" in env_map:
- env_map["PYTHONPATH"] += ":spark.jar"
- else:
- env_map["PYTHONPATH"] = "spark.jar"
-
- os.environ["SPARK_YARN_USER_ENV"] = ",".join(k + '=' + v for (k, v) in env_map.items())
-
-def parse_env(env_str):
- # Turns a comma-separated of env settings into a dict that maps env vars to
- # their values.
- env = {}
- for var_str in env_str.split(","):
- parts = var_str.split("=")
- if len(parts) == 2:
- env[parts[0]] = parts[1]
- elif len(var_str) > 0:
- print "Invalid entry in SPARK_YARN_USER_ENV: " + var_str
- sys.exit(1)
-
- return env
+ return gateway
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 8cf9d9cf1b..64f2eeb12b 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -22,11 +22,14 @@ individual modules.
from fileinput import input
from glob import glob
import os
+import re
import shutil
+import subprocess
import sys
-from tempfile import NamedTemporaryFile
+import tempfile
import time
import unittest
+import zipfile
from pyspark.context import SparkContext
from pyspark.files import SparkFiles
@@ -55,7 +58,7 @@ class TestCheckpoint(PySparkTestCase):
def setUp(self):
PySparkTestCase.setUp(self)
- self.checkpointDir = NamedTemporaryFile(delete=False)
+ self.checkpointDir = tempfile.NamedTemporaryFile(delete=False)
os.unlink(self.checkpointDir.name)
self.sc.setCheckpointDir(self.checkpointDir.name)
@@ -148,7 +151,7 @@ class TestRDDFunctions(PySparkTestCase):
# Regression test for SPARK-970
x = u"\u00A1Hola, mundo!"
data = self.sc.parallelize([x])
- tempFile = NamedTemporaryFile(delete=True)
+ tempFile = tempfile.NamedTemporaryFile(delete=True)
tempFile.close()
data.saveAsTextFile(tempFile.name)
raw_contents = ''.join(input(glob(tempFile.name + "/part-0000*")))
@@ -172,7 +175,7 @@ class TestRDDFunctions(PySparkTestCase):
def test_deleting_input_files(self):
# Regression test for SPARK-1025
- tempFile = NamedTemporaryFile(delete=False)
+ tempFile = tempfile.NamedTemporaryFile(delete=False)
tempFile.write("Hello World!")
tempFile.close()
data = self.sc.textFile(tempFile.name)
@@ -236,5 +239,125 @@ class TestDaemon(unittest.TestCase):
from signal import SIGTERM
self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
+
+class TestSparkSubmit(unittest.TestCase):
+ def setUp(self):
+ self.programDir = tempfile.mkdtemp()
+ self.sparkSubmit = os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit")
+
+ def tearDown(self):
+ shutil.rmtree(self.programDir)
+
+ def createTempFile(self, name, content):
+ """
+ Create a temp file with the given name and content and return its path.
+ Strips leading spaces from content up to the first '|' in each line.
+ """
+ pattern = re.compile(r'^ *\|', re.MULTILINE)
+ content = re.sub(pattern, '', content.strip())
+ path = os.path.join(self.programDir, name)
+ with open(path, "w") as f:
+ f.write(content)
+ return path
+
+ def createFileInZip(self, name, content):
+ """
+ Create a zip archive containing a file with the given content and return its path.
+ Strips leading spaces from content up to the first '|' in each line.
+ """
+ pattern = re.compile(r'^ *\|', re.MULTILINE)
+ content = re.sub(pattern, '', content.strip())
+ path = os.path.join(self.programDir, name + ".zip")
+ with zipfile.ZipFile(path, 'w') as zip:
+ zip.writestr(name, content)
+ return path
+
+ def test_single_script(self):
+ """Submit and test a single script file"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect()
+ """)
+ proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[2, 4, 6]", out)
+
+ def test_script_with_local_functions(self):
+ """Submit and test a single script file calling a global function"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |
+ |def foo(x):
+ | return x * 3
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(foo).collect()
+ """)
+ proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[3, 6, 9]", out)
+
+ def test_module_dependency(self):
+ """Submit and test a script with a dependency on another module"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |from mylib import myfunc
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(myfunc).collect()
+ """)
+ zip = self.createFileInZip("mylib.py", """
+ |def myfunc(x):
+ | return x + 1
+ """)
+ proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, script],
+ stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[2, 3, 4]", out)
+
+ def test_module_dependency_on_cluster(self):
+ """Submit and test a script with a dependency on another module on a cluster"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |from mylib import myfunc
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(myfunc).collect()
+ """)
+ zip = self.createFileInZip("mylib.py", """
+ |def myfunc(x):
+ | return x + 1
+ """)
+ proc = subprocess.Popen(
+ [self.sparkSubmit, "--py-files", zip, "--master", "local-cluster[1,1,512]", script],
+ stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[2, 3, 4]", out)
+
+ def test_single_script_on_cluster(self):
+ """Submit and test a single script on a cluster"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |
+ |def foo(x):
+ | return x * 2
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(foo).collect()
+ """)
+ proc = subprocess.Popen(
+ [self.sparkSubmit, "--master", "local-cluster[1,1,512]", script],
+ stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[2, 4, 6]", out)
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index e33f4f9803..566d96e16e 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -45,8 +45,7 @@ class ReplSuite extends FunSuite {
}
val interp = new SparkILoop(in, new PrintWriter(out), master)
org.apache.spark.repl.Main.interp = interp
- val separator = System.getProperty("path.separator")
- interp.process(Array("-classpath", paths.mkString(separator)))
+ interp.process(Array("-classpath", paths.mkString(File.pathSeparator)))
org.apache.spark.repl.Main.interp = null
if (interp.sparkContext != null) {
interp.sparkContext.stop()