aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-09-01 22:12:30 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-09-01 22:12:30 -0700
commitd9a53b94f81f6d706bece56c15a2f9e05143a350 (patch)
treea0161b7bed5e4a9e37b3a6b079f827a88b1fd2e7
parent3c520fea7782fd24b2e30347938af9769c72c4ea (diff)
parent9ee1e9db2ea4ae4794817ee6b4603ba869aaa7d4 (diff)
downloadspark-d9a53b94f81f6d706bece56c15a2f9e05143a350.tar.gz
spark-d9a53b94f81f6d706bece56c15a2f9e05143a350.tar.bz2
spark-d9a53b94f81f6d706bece56c15a2f9e05143a350.zip
Merge pull request #885 from mateiz/win-py
Allow PySpark to run on Windows
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala107
-rw-r--r--docs/configuration.md4
-rw-r--r--docs/index.md4
-rw-r--r--docs/python-programming-guide.md36
-rw-r--r--pyspark.cmd23
-rw-r--r--pyspark2.cmd55
-rw-r--r--python/pyspark/java_gateway.py17
-rw-r--r--python/pyspark/worker.py11
8 files changed, 221 insertions, 36 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 08e3f670f5..67d45723ba 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -17,8 +17,8 @@
package org.apache.spark.api.python
-import java.io.{File, DataInputStream, IOException}
-import java.net.{Socket, SocketException, InetAddress}
+import java.io.{OutputStreamWriter, File, DataInputStream, IOException}
+import java.net.{ServerSocket, Socket, SocketException, InetAddress}
import scala.collection.JavaConversions._
@@ -26,11 +26,30 @@ import org.apache.spark._
private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
extends Logging {
+
+ // Because forking processes from Java is expensive, we prefer to launch a single Python daemon
+ // (pyspark/daemon.py) and tell it to fork new workers for our tasks. This daemon currently
+ // only works on UNIX-based systems now because it uses signals for child management, so we can
+ // also fall back to launching workers (pyspark/worker.py) directly.
+ val useDaemon = !System.getProperty("os.name").startsWith("Windows")
+
var daemon: Process = null
val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
var daemonPort: Int = 0
def create(): Socket = {
+ if (useDaemon) {
+ createThroughDaemon()
+ } else {
+ createSimpleWorker()
+ }
+ }
+
+ /**
+ * Connect to a worker launched through pyspark/daemon.py, which forks python processes itself
+ * to avoid the high cost of forking from Java. This currently only works on UNIX-based systems.
+ */
+ private def createThroughDaemon(): Socket = {
synchronized {
// Start the daemon if it hasn't been started
startDaemon()
@@ -50,6 +69,78 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}
}
+ /**
+ * Launch a worker by executing worker.py directly and telling it to connect to us.
+ */
+ private def createSimpleWorker(): Socket = {
+ var serverSocket: ServerSocket = null
+ try {
+ serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
+
+ // Create and start the worker
+ val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
+ val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/worker.py"))
+ val workerEnv = pb.environment()
+ workerEnv.putAll(envVars)
+ val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
+ workerEnv.put("PYTHONPATH", pythonPath)
+ val worker = pb.start()
+
+ // Redirect the worker's stderr to ours
+ new Thread("stderr reader for " + pythonExec) {
+ setDaemon(true)
+ override def run() {
+ scala.util.control.Exception.ignoring(classOf[IOException]) {
+ // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
+ val in = worker.getErrorStream
+ val buf = new Array[Byte](1024)
+ var len = in.read(buf)
+ while (len != -1) {
+ System.err.write(buf, 0, len)
+ len = in.read(buf)
+ }
+ }
+ }
+ }.start()
+
+ // Redirect worker's stdout to our stderr
+ new Thread("stdout reader for " + pythonExec) {
+ setDaemon(true)
+ override def run() {
+ scala.util.control.Exception.ignoring(classOf[IOException]) {
+ // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
+ val in = worker.getInputStream
+ val buf = new Array[Byte](1024)
+ var len = in.read(buf)
+ while (len != -1) {
+ System.err.write(buf, 0, len)
+ len = in.read(buf)
+ }
+ }
+ }
+ }.start()
+
+ // Tell the worker our port
+ val out = new OutputStreamWriter(worker.getOutputStream)
+ out.write(serverSocket.getLocalPort + "\n")
+ out.flush()
+
+ // Wait for it to connect to our socket
+ serverSocket.setSoTimeout(10000)
+ try {
+ return serverSocket.accept()
+ } catch {
+ case e: Exception =>
+ throw new SparkException("Python worker did not connect back in time", e)
+ }
+ } finally {
+ if (serverSocket != null) {
+ serverSocket.close()
+ }
+ }
+ null
+ }
+
def stop() {
stopDaemon()
}
@@ -73,12 +164,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
// Redirect the stderr to ours
new Thread("stderr reader for " + pythonExec) {
+ setDaemon(true)
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
- // FIXME HACK: We copy the stream on the level of bytes to
- // attempt to dodge encoding problems.
+ // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
val in = daemon.getErrorStream
- var buf = new Array[Byte](1024)
+ val buf = new Array[Byte](1024)
var len = in.read(buf)
while (len != -1) {
System.err.write(buf, 0, len)
@@ -93,11 +184,11 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
// Redirect further stdout output to our stderr
new Thread("stdout reader for " + pythonExec) {
+ setDaemon(true)
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
- // FIXME HACK: We copy the stream on the level of bytes to
- // attempt to dodge encoding problems.
- var buf = new Array[Byte](1024)
+ // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
+ val buf = new Array[Byte](1024)
var len = in.read(buf)
while (len != -1) {
System.err.write(buf, 0, len)
diff --git a/docs/configuration.md b/docs/configuration.md
index 58e9434bdc..aaf85ed4f4 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -315,7 +315,7 @@ Apart from these, the following properties are also available, and may be useful
# Environment Variables
Certain Spark settings can also be configured through environment variables, which are read from the `conf/spark-env.sh`
-script in the directory where Spark is installed. These variables are meant to be for machine-specific settings, such
+script in the directory where Spark is installed (or `conf/spark-env.cmd` on Windows). These variables are meant to be for machine-specific settings, such
as library search paths. While Java system properties can also be set here, for application settings, we recommend setting
these properties within the application instead of in `spark-env.sh` so that different applications can use different
settings.
@@ -325,6 +325,8 @@ Note that `conf/spark-env.sh` does not exist by default when Spark is installed.
The following variables can be set in `spark-env.sh`:
+* `JAVA_HOME`, the location where Java is installed (if it's not on your default `PATH`)
+* `PYSPARK_PYTHON`, the Python binary to use for PySpark
* `SPARK_LOCAL_IP`, to configure which IP address of the machine to bind to.
* `SPARK_LIBRARY_PATH`, to add search directories for native libraries.
* `SPARK_CLASSPATH`, to add elements to Spark's classpath that you want to be present for _all_ applications.
diff --git a/docs/index.md b/docs/index.md
index 3cf9cc1c64..c7018d8846 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -11,6 +11,8 @@ Spark can run on the Apache Mesos cluster manager, Hadoop YARN, Amazon EC2, or w
Get Spark by visiting the [downloads page](http://spark.incubator.apache.org/downloads.html) of the Apache Spark site. This documentation is for Spark version {{site.SPARK_VERSION}}.
+Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). All you need to run it is to have `java` to installed on your system `PATH`, or the `JAVA_HOME` environment variable pointing to a Java installation.
+
# Building
Spark uses [Simple Build Tool](http://www.scala-sbt.org), which is bundled with it. To compile the code, go into the top-level Spark directory and run
@@ -50,6 +52,8 @@ In addition, if you wish to run Spark on [YARN](running-on-yarn.md), set
SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly
+(Note that on Windows, you need to set the environment variables on separate lines, e.g., `set SPARK_HADOOP_VERSION=1.2.1`.)
+
# Where to Go from Here
**Programming guides:**
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 8a539fe774..8c33a953a4 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -4,7 +4,7 @@ title: Python Programming Guide
---
-The Spark Python API (PySpark) exposes most of the Spark features available in the Scala version to Python.
+The Spark Python API (PySpark) exposes the Spark programming model to Python.
To learn the basics of Spark, we recommend reading through the
[Scala programming guide](scala-programming-guide.html) first; it should be
easy to follow even if you don't know Scala.
@@ -15,12 +15,8 @@ This guide will show how to use the Spark features described there in Python.
There are a few key differences between the Python and Scala APIs:
-* Python is dynamically typed, so RDDs can hold objects of different types.
-* PySpark does not currently support the following Spark features:
- - `lookup`
- - `sort`
- - `persist` at storage levels other than `MEMORY_ONLY`
- - Execution on Windows -- this is slated for a future release
+* Python is dynamically typed, so RDDs can hold objects of multiple types.
+* PySpark does not yet support a few API calls, such as `lookup`, `sort`, and `persist` at custom storage levels. See the [API docs](api/pyspark/index.html) for details.
In PySpark, RDDs support the same methods as their Scala counterparts but take Python functions and return Python collection types.
Short functions can be passed to RDD methods using Python's [`lambda`](http://www.diveintopython.net/power_of_introspection/lambda_functions.html) syntax:
@@ -30,7 +26,7 @@ logData = sc.textFile(logFile).cache()
errors = logData.filter(lambda line: "ERROR" in line)
{% endhighlight %}
-You can also pass functions that are defined using the `def` keyword; this is useful for more complicated functions that cannot be expressed using `lambda`:
+You can also pass functions that are defined with the `def` keyword; this is useful for longer functions that can't be expressed using `lambda`:
{% highlight python %}
def is_error(line):
@@ -38,7 +34,7 @@ def is_error(line):
errors = logData.filter(is_error)
{% endhighlight %}
-Functions can access objects in enclosing scopes, although modifications to those objects within RDD methods will not be propagated to other tasks:
+Functions can access objects in enclosing scopes, although modifications to those objects within RDD methods will not be propagated back:
{% highlight python %}
error_keywords = ["Exception", "Error"]
@@ -51,17 +47,20 @@ PySpark will automatically ship these functions to workers, along with any objec
Instances of classes will be serialized and shipped to workers by PySpark, but classes themselves cannot be automatically distributed to workers.
The [Standalone Use](#standalone-use) section describes how to ship code dependencies to workers.
+In addition, PySpark fully supports interactive use---simply run `./pyspark` to launch an interactive shell.
+
# Installing and Configuring PySpark
PySpark requires Python 2.6 or higher.
-PySpark jobs are executed using a standard cPython interpreter in order to support Python modules that use C extensions.
+PySpark jobs are executed using a standard CPython interpreter in order to support Python modules that use C extensions.
We have not tested PySpark with Python 3 or with alternative Python interpreters, such as [PyPy](http://pypy.org/) or [Jython](http://www.jython.org/).
-By default, PySpark's scripts will run programs using `python`; an alternate Python executable may be specified by setting the `PYSPARK_PYTHON` environment variable in `conf/spark-env.sh`.
+
+By default, PySpark requires `python` to be available on the system `PATH` and use it to run programs; an alternate Python executable may be specified by setting the `PYSPARK_PYTHON` environment variable in `conf/spark-env.sh` (or `.cmd` on Windows).
All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.net/), are bundled with PySpark and automatically imported.
-Standalone PySpark jobs should be run using the `pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh`.
+Standalone PySpark jobs should be run using the `pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`.
The script automatically adds the `pyspark` package to the `PYTHONPATH`.
@@ -101,7 +100,7 @@ $ MASTER=local[4] ./pyspark
## IPython
It is also possible to launch PySpark in [IPython](http://ipython.org), the enhanced Python interpreter.
-To do this, simply set the `IPYTHON` variable to `1` when running `pyspark`:
+To do this, set the `IPYTHON` variable to `1` when running `pyspark`:
{% highlight bash %}
$ IPYTHON=1 ./pyspark
@@ -132,15 +131,16 @@ sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg
Files listed here will be added to the `PYTHONPATH` and shipped to remote worker machines.
Code dependencies can be added to an existing SparkContext using its `addPyFile()` method.
+# API Docs
+
+[API documentation](api/pyspark/index.html) for PySpark is available as Epydoc.
+Many of the methods also contain [doctests](http://docs.python.org/2/library/doctest.html) that provide additional usage examples.
# Where to Go from Here
-PySpark includes several sample programs in the [`python/examples` folder](https://github.com/apache/incubator-spark/tree/master/python/examples).
-You can run them by passing the files to the `pyspark` script; e.g.:
+PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/incubator-spark/tree/master/python/examples).
+You can run them by passing the files to `pyspark`; e.g.:
./pyspark python/examples/wordcount.py
Each program prints usage help when run without arguments.
-
-We currently provide [API documentation](api/pyspark/index.html) for the Python API as Epydoc.
-Many of the RDD method descriptions contain [doctests](http://docs.python.org/2/library/doctest.html) that provide additional usage examples.
diff --git a/pyspark.cmd b/pyspark.cmd
new file mode 100644
index 0000000000..7c26fbbac2
--- /dev/null
+++ b/pyspark.cmd
@@ -0,0 +1,23 @@
+@echo off
+
+rem
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements. See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License. You may obtain a copy of the License at
+rem
+rem http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+rem
+
+rem This is the entry point for running PySpark. To avoid polluting the
+rem environment, it just launches a new cmd to do the real work.
+
+cmd /V /E /C %~dp0pyspark2.cmd %*
diff --git a/pyspark2.cmd b/pyspark2.cmd
new file mode 100644
index 0000000000..f58e349643
--- /dev/null
+++ b/pyspark2.cmd
@@ -0,0 +1,55 @@
+@echo off
+
+rem
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements. See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License. You may obtain a copy of the License at
+rem
+rem http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+rem
+
+set SCALA_VERSION=2.9.3
+
+rem Figure out where the Spark framework is installed
+set FWDIR=%~dp0
+
+rem Export this as SPARK_HOME
+set SPARK_HOME=%FWDIR%
+
+rem Test whether the user has built Spark
+if exist "%FWDIR%RELEASE" goto skip_build_test
+set FOUND_JAR=0
+for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do (
+ set FOUND_JAR=1
+)
+if "%FOUND_JAR%"=="0" (
+ echo Failed to find Spark assembly JAR.
+ echo You need to build Spark with sbt\sbt assembly before running this program.
+ goto exit
+)
+:skip_build_test
+
+rem Load environment variables from conf\spark-env.cmd, if it exists
+if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
+
+rem Figure out which Python to use.
+if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python
+
+set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
+
+set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
+set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
+
+echo Running %PYSPARK_PYTHON% with PYTHONPATH=%PYTHONPATH%
+
+"%PYSPARK_PYTHON%" %*
+:exit
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 26fbe0f080..e615c1e9b6 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -18,6 +18,7 @@
import os
import sys
import signal
+import platform
from subprocess import Popen, PIPE
from threading import Thread
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
@@ -29,12 +30,18 @@ SPARK_HOME = os.environ["SPARK_HOME"]
def launch_gateway():
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and SPARK_MEM settings from spark-env.sh
- command = [os.path.join(SPARK_HOME, "spark-class"), "py4j.GatewayServer",
+ on_windows = platform.system() == "Windows"
+ script = "spark-class.cmd" if on_windows else "spark-class"
+ command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
"--die-on-broken-pipe", "0"]
- # Don't send ctrl-c / SIGINT to the Java gateway:
- def preexec_function():
- signal.signal(signal.SIGINT, signal.SIG_IGN)
- proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_function)
+ if not on_windows:
+ # Don't send ctrl-c / SIGINT to the Java gateway:
+ def preexec_func():
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
+ else:
+ # preexec_fn not supported on Windows
+ proc = Popen(command, stdout=PIPE, stdin=PIPE)
# Determine which ephemeral port the server started on:
port = int(proc.stdout.readline())
# Create a thread to echo output from the GatewayServer, which is required
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 695f6dfb84..d63c2aaef7 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -21,6 +21,7 @@ Worker that receives input from Piped RDD.
import os
import sys
import time
+import socket
import traceback
from base64 import standard_b64decode
# CloudPickler needs to be imported so that depicklers are registered using the
@@ -94,7 +95,9 @@ def main(infile, outfile):
if __name__ == '__main__':
- # Redirect stdout to stderr so that users must return values from functions.
- old_stdout = os.fdopen(os.dup(1), 'w')
- os.dup2(2, 1)
- main(sys.stdin, old_stdout)
+ # Read a local port to connect to from stdin
+ java_port = int(sys.stdin.readline())
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(("127.0.0.1", java_port))
+ sock_file = sock.makefile("a+", 65536)
+ main(sock_file, sock_file)