aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHolden Karau <holden@pigscanfly.ca>2015-10-20 10:52:49 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-10-20 10:52:49 -0700
commite18b571c3374ecbfc0b20a5064cb58d57a2a7d21 (patch)
treee80aeebc7f8a247dff692f46a2bf7860a0e592dc
parent94139557c56cea318d4a4f82a4deaf72198f349a (diff)
downloadspark-e18b571c3374ecbfc0b20a5064cb58d57a2a7d21.tar.gz
spark-e18b571c3374ecbfc0b20a5064cb58d57a2a7d21.tar.bz2
spark-e18b571c3374ecbfc0b20a5064cb58d57a2a7d21.zip
[SPARK-10447][SPARK-3842][PYSPARK] upgrade pyspark to py4j0.9
Upgrade to Py4j0.9 Author: Holden Karau <holden@pigscanfly.ca> Author: Holden Karau <holden@us.ibm.com> Closes #8615 from holdenk/SPARK-10447-upgrade-pyspark-to-py4j0.9.
-rw-r--r--LICENSE2
-rwxr-xr-xbin/pyspark2
-rw-r--r--bin/pyspark2.cmd2
-rw-r--r--core/pom.xml2
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala2
-rw-r--r--python/docs/Makefile2
-rw-r--r--python/lib/py4j-0.8.2.1-src.zipbin37562 -> 0 bytes
-rw-r--r--python/lib/py4j-0.9-src.zipbin0 -> 44846 bytes
-rw-r--r--python/pyspark/streaming/context.py54
-rw-r--r--python/pyspark/streaming/flume.py2
-rw-r--r--python/pyspark/streaming/kafka.py2
-rw-r--r--python/pyspark/streaming/kinesis.py2
-rw-r--r--python/pyspark/streaming/mqtt.py2
-rw-r--r--python/pyspark/streaming/tests.py18
-rwxr-xr-xsbin/spark-config.sh2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala4
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala2
17 files changed, 34 insertions, 66 deletions
diff --git a/LICENSE b/LICENSE
index dca03ab16d..790476ece1 100644
--- a/LICENSE
+++ b/LICENSE
@@ -265,7 +265,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
- (The New BSD License) Py4J (net.sf.py4j:py4j:0.8.2.1 - http://py4j.sourceforge.net/)
+ (The New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
diff --git a/bin/pyspark b/bin/pyspark
index 8f2a3b5a77..18012ee4a0 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -65,7 +65,7 @@ export PYSPARK_PYTHON
# Add the PySpark classes to the Python path:
export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
-export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
+export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd
index 3c6169983e..a97d884f0b 100644
--- a/bin/pyspark2.cmd
+++ b/bin/pyspark2.cmd
@@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)
set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
-set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.8.2.1-src.zip;%PYTHONPATH%
+set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9-src.zip;%PYTHONPATH%
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
diff --git a/core/pom.xml b/core/pom.xml
index fdcb6a7902..319a50049a 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -350,7 +350,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
- <version>0.8.2.1</version>
+ <version>0.9</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index 31e534f160..292ac4cfc3 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -32,7 +32,7 @@ private[spark] object PythonUtils {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
- pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip").mkString(File.separator)
+ pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
pythonPath.mkString(File.pathSeparator)
diff --git a/python/docs/Makefile b/python/docs/Makefile
index 8a1324eecd..4cec74f057 100644
--- a/python/docs/Makefile
+++ b/python/docs/Makefile
@@ -7,7 +7,7 @@ SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = _build
-export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.8.2.1-src.zip)
+export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9-src.zip)
# User-friendly check for sphinx-build
ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)
diff --git a/python/lib/py4j-0.8.2.1-src.zip b/python/lib/py4j-0.8.2.1-src.zip
deleted file mode 100644
index 5203b84d91..0000000000
--- a/python/lib/py4j-0.8.2.1-src.zip
+++ /dev/null
Binary files differ
diff --git a/python/lib/py4j-0.9-src.zip b/python/lib/py4j-0.9-src.zip
new file mode 100644
index 0000000000..dace2d0fe3
--- /dev/null
+++ b/python/lib/py4j-0.9-src.zip
Binary files differ
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index a8c9ffc235..975c754732 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -32,48 +32,6 @@ from pyspark.streaming.util import TransformFunction, TransformFunctionSerialize
__all__ = ["StreamingContext"]
-def _daemonize_callback_server():
- """
- Hack Py4J to daemonize callback server
-
- The thread of callback server has daemon=False, it will block the driver
- from exiting if it's not shutdown. The following code replace `start()`
- of CallbackServer with a new version, which set daemon=True for this
- thread.
-
- Also, it will update the port number (0) with real port
- """
- # TODO: create a patch for Py4J
- import socket
- import py4j.java_gateway
- logger = py4j.java_gateway.logger
- from py4j.java_gateway import Py4JNetworkError
- from threading import Thread
-
- def start(self):
- """Starts the CallbackServer. This method should be called by the
- client instead of run()."""
- self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
- 1)
- try:
- self.server_socket.bind((self.address, self.port))
- if not self.port:
- # update port with real port
- self.port = self.server_socket.getsockname()[1]
- except Exception as e:
- msg = 'An error occurred while trying to start the callback server: %s' % e
- logger.exception(msg)
- raise Py4JNetworkError(msg)
-
- # Maybe thread needs to be cleanup up?
- self.thread = Thread(target=self.run)
- self.thread.daemon = True
- self.thread.start()
-
- py4j.java_gateway.CallbackServer.start = start
-
-
class StreamingContext(object):
"""
Main entry point for Spark Streaming functionality. A StreamingContext
@@ -123,10 +81,14 @@ class StreamingContext(object):
# start callback server
# getattr will fallback to JVM, so we cannot test by hasattr()
- if "_callback_server" not in gw.__dict__:
- _daemonize_callback_server()
- # use random port
- gw._start_callback_server(0)
+ if "_callback_server" not in gw.__dict__ or gw._callback_server is None:
+ gw.callback_server_parameters.eager_load = True
+ gw.callback_server_parameters.daemonize = True
+ gw.callback_server_parameters.daemonize_connections = True
+ gw.callback_server_parameters.port = 0
+ gw.start_callback_server(gw.callback_server_parameters)
+ cbport = gw._callback_server.server_socket.getsockname()[1]
+ gw._callback_server.port = cbport
# gateway with real port
gw._python_proxy_port = gw._callback_server.port
# get the GatewayServer object in JVM by ID
diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py
index c0cdc50d8d..b3d1905365 100644
--- a/python/pyspark/streaming/flume.py
+++ b/python/pyspark/streaming/flume.py
@@ -20,7 +20,7 @@ if sys.version >= "3":
from io import BytesIO
else:
from StringIO import StringIO
-from py4j.java_gateway import Py4JJavaError
+from py4j.protocol import Py4JJavaError
from pyspark.storagelevel import StorageLevel
from pyspark.serializers import PairDeserializer, NoOpSerializer, UTF8Deserializer, read_int
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index 8a814c64c0..b35bbaf404 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -15,7 +15,7 @@
# limitations under the License.
#
-from py4j.java_gateway import Py4JJavaError
+from py4j.protocol import Py4JJavaError
from pyspark.rdd import RDD
from pyspark.storagelevel import StorageLevel
diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py
index 34be5880e1..af72c3d690 100644
--- a/python/pyspark/streaming/kinesis.py
+++ b/python/pyspark/streaming/kinesis.py
@@ -15,7 +15,7 @@
# limitations under the License.
#
-from py4j.java_gateway import Py4JJavaError
+from py4j.protocol import Py4JJavaError
from pyspark.serializers import PairDeserializer, NoOpSerializer
from pyspark.storagelevel import StorageLevel
diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py
index fa83006c36..1ce4093196 100644
--- a/python/pyspark/streaming/mqtt.py
+++ b/python/pyspark/streaming/mqtt.py
@@ -15,7 +15,7 @@
# limitations under the License.
#
-from py4j.java_gateway import Py4JJavaError
+from py4j.protocol import Py4JJavaError
from pyspark.storagelevel import StorageLevel
from pyspark.serializers import UTF8Deserializer
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index e4e56fff3b..49634252fd 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -61,9 +61,12 @@ class PySparkStreamingTestCase(unittest.TestCase):
def tearDownClass(cls):
cls.sc.stop()
# Clean up in the JVM just in case there has been some issues in Python API
- jSparkContextOption = SparkContext._jvm.SparkContext.get()
- if jSparkContextOption.nonEmpty():
- jSparkContextOption.get().stop()
+ try:
+ jSparkContextOption = SparkContext._jvm.SparkContext.get()
+ if jSparkContextOption.nonEmpty():
+ jSparkContextOption.get().stop()
+ except:
+ pass
def setUp(self):
self.ssc = StreamingContext(self.sc, self.duration)
@@ -72,9 +75,12 @@ class PySparkStreamingTestCase(unittest.TestCase):
if self.ssc is not None:
self.ssc.stop(False)
# Clean up in the JVM just in case there has been some issues in Python API
- jStreamingContextOption = StreamingContext._jvm.SparkContext.getActive()
- if jStreamingContextOption.nonEmpty():
- jStreamingContextOption.get().stop(False)
+ try:
+ jStreamingContextOption = StreamingContext._jvm.SparkContext.getActive()
+ if jStreamingContextOption.nonEmpty():
+ jStreamingContextOption.get().stop(False)
+ except:
+ pass
def wait_for(self, result, n):
start_time = time.time()
diff --git a/sbin/spark-config.sh b/sbin/spark-config.sh
index b0361d72d3..e6bf544c14 100755
--- a/sbin/spark-config.sh
+++ b/sbin/spark-config.sh
@@ -36,4 +36,4 @@ export SPARK_HOME="${SPARK_PREFIX}"
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"$SPARK_HOME/conf"}"
# Add the PySpark classes to the PYTHONPATH:
export PYTHONPATH="$SPARK_HOME/python:$PYTHONPATH"
-export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
+export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 08aecfa7f6..754215db2a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1008,9 +1008,9 @@ private[spark] class Client(
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
require(pyArchivesFile.exists(),
"pyspark.zip not found; cannot run pyspark application in YARN mode.")
- val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
+ val py4jFile = new File(pyLibPath, "py4j-0.9-src.zip")
require(py4jFile.exists(),
- "py4j-0.8.2.1-src.zip not found; cannot run pyspark application in YARN mode.")
+ "py4j-0.9-src.zip not found; cannot run pyspark application in YARN mode.")
Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath())
}
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index d1cd0c89b5..6db012a77a 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -153,7 +153,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
// needed locations.
val sparkHome = sys.props("spark.test.home");
val pythonPath = Seq(
- s"$sparkHome/python/lib/py4j-0.8.2.1-src.zip",
+ s"$sparkHome/python/lib/py4j-0.9-src.zip",
s"$sparkHome/python")
val extraEnv = Map(
"PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),