aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--LICENSE2
-rwxr-xr-xbin/pyspark2
-rw-r--r--bin/pyspark2.cmd2
-rw-r--r--core/pom.xml2
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala2
-rw-r--r--dev/deps/spark-deps-hadoop-2.22
-rw-r--r--dev/deps/spark-deps-hadoop-2.32
-rw-r--r--dev/deps/spark-deps-hadoop-2.42
-rw-r--r--dev/deps/spark-deps-hadoop-2.62
-rw-r--r--python/docs/Makefile2
-rw-r--r--python/lib/py4j-0.9-src.zipbin44846 -> 0 bytes
-rw-r--r--python/lib/py4j-0.9.1-src.zipbin0 -> 47035 bytes
-rw-r--r--python/pyspark/streaming/context.py89
-rw-r--r--python/pyspark/streaming/util.py3
-rwxr-xr-xsbin/spark-config.sh2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala10
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala4
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala4
18 files changed, 20 insertions, 112 deletions
diff --git a/LICENSE b/LICENSE
index a2f75b817a..9c944ac610 100644
--- a/LICENSE
+++ b/LICENSE
@@ -264,7 +264,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
- (The New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
+ (The New BSD License) Py4J (net.sf.py4j:py4j:0.9.1 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
diff --git a/bin/pyspark b/bin/pyspark
index 5eaa17d3c2..2ac4a8be25 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -67,7 +67,7 @@ export PYSPARK_PYTHON
# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
-export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
+export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:$PYTHONPATH"
# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd
index a97d884f0b..51d6d15f66 100644
--- a/bin/pyspark2.cmd
+++ b/bin/pyspark2.cmd
@@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)
set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
-set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9-src.zip;%PYTHONPATH%
+set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9.1-src.zip;%PYTHONPATH%
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
diff --git a/core/pom.xml b/core/pom.xml
index 34ecb19654..3bec5debc2 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -350,7 +350,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
- <version>0.9</version>
+ <version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index 2d97cd9a9a..bda872746c 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -32,7 +32,7 @@ private[spark] object PythonUtils {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
- pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9-src.zip").mkString(File.separator)
+ pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9.1-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
pythonPath.mkString(File.pathSeparator)
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index cd3ff29350..53034a25d4 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -160,7 +160,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
-py4j-0.9.jar
+py4j-0.9.1.jar
pyrolite-4.9.jar
quasiquotes_2.10-2.0.0-M8.jar
reflectasm-1.07-shaded.jar
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index 0985089cce..a23e260641 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -151,7 +151,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
-py4j-0.9.jar
+py4j-0.9.1.jar
pyrolite-4.9.jar
quasiquotes_2.10-2.0.0-M8.jar
reflectasm-1.07-shaded.jar
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 50f062601c..6bedbed1e3 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -152,7 +152,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
-py4j-0.9.jar
+py4j-0.9.1.jar
pyrolite-4.9.jar
quasiquotes_2.10-2.0.0-M8.jar
reflectasm-1.07-shaded.jar
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 2b6ca983ad..7bfad57b4a 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -158,7 +158,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
-py4j-0.9.jar
+py4j-0.9.1.jar
pyrolite-4.9.jar
quasiquotes_2.10-2.0.0-M8.jar
reflectasm-1.07-shaded.jar
diff --git a/python/docs/Makefile b/python/docs/Makefile
index 4cec74f057..b6d24d8599 100644
--- a/python/docs/Makefile
+++ b/python/docs/Makefile
@@ -7,7 +7,7 @@ SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = _build
-export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9-src.zip)
+export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9.1-src.zip)
# User-friendly check for sphinx-build
ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)
diff --git a/python/lib/py4j-0.9-src.zip b/python/lib/py4j-0.9-src.zip
deleted file mode 100644
index dace2d0fe3..0000000000
--- a/python/lib/py4j-0.9-src.zip
+++ /dev/null
Binary files differ
diff --git a/python/lib/py4j-0.9.1-src.zip b/python/lib/py4j-0.9.1-src.zip
new file mode 100644
index 0000000000..fedde845fd
--- /dev/null
+++ b/python/lib/py4j-0.9.1-src.zip
Binary files differ
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index 0f1f005ce3..ec3ad9933c 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -19,7 +19,6 @@ from __future__ import print_function
import os
import sys
-from threading import RLock, Timer
from py4j.java_gateway import java_import, JavaObject
@@ -33,63 +32,6 @@ from pyspark.streaming.util import TransformFunction, TransformFunctionSerialize
__all__ = ["StreamingContext"]
-class Py4jCallbackConnectionCleaner(object):
-
- """
- A cleaner to clean up callback connections that are not closed by Py4j. See SPARK-12617.
- It will scan all callback connections every 30 seconds and close the dead connections.
- """
-
- def __init__(self, gateway):
- self._gateway = gateway
- self._stopped = False
- self._timer = None
- self._lock = RLock()
-
- def start(self):
- if self._stopped:
- return
-
- def clean_closed_connections():
- from py4j.java_gateway import quiet_close, quiet_shutdown
-
- callback_server = self._gateway._callback_server
- if callback_server:
- with callback_server.lock:
- try:
- closed_connections = []
- for connection in callback_server.connections:
- if not connection.isAlive():
- quiet_close(connection.input)
- quiet_shutdown(connection.socket)
- quiet_close(connection.socket)
- closed_connections.append(connection)
-
- for closed_connection in closed_connections:
- callback_server.connections.remove(closed_connection)
- except Exception:
- import traceback
- traceback.print_exc()
-
- self._start_timer(clean_closed_connections)
-
- self._start_timer(clean_closed_connections)
-
- def _start_timer(self, f):
- with self._lock:
- if not self._stopped:
- self._timer = Timer(30.0, f)
- self._timer.daemon = True
- self._timer.start()
-
- def stop(self):
- with self._lock:
- self._stopped = True
- if self._timer:
- self._timer.cancel()
- self._timer = None
-
-
class StreamingContext(object):
"""
Main entry point for Spark Streaming functionality. A StreamingContext
@@ -105,9 +47,6 @@ class StreamingContext(object):
# Reference to a currently active StreamingContext
_activeContext = None
- # A cleaner to clean leak sockets of callback server every 30 seconds
- _py4j_cleaner = None
-
def __init__(self, sparkContext, batchDuration=None, jssc=None):
"""
Create a new StreamingContext.
@@ -155,34 +94,12 @@ class StreamingContext(object):
# get the GatewayServer object in JVM by ID
jgws = JavaObject("GATEWAY_SERVER", gw._gateway_client)
# update the port of CallbackClient with real port
- gw.jvm.PythonDStream.updatePythonGatewayPort(jgws, gw._python_proxy_port)
- _py4j_cleaner = Py4jCallbackConnectionCleaner(gw)
- _py4j_cleaner.start()
+ jgws.resetCallbackClient(jgws.getCallbackClient().getAddress(), gw._python_proxy_port)
# register serializer for TransformFunction
# it happens before creating SparkContext when loading from checkpointing
- if cls._transformerSerializer is None:
- transformer_serializer = TransformFunctionSerializer()
- transformer_serializer.init(
- SparkContext._active_spark_context, CloudPickleSerializer(), gw)
- # SPARK-12511 streaming driver with checkpointing unable to finalize leading to OOM
- # There is an issue that Py4J's PythonProxyHandler.finalize blocks forever.
- # (https://github.com/bartdag/py4j/pull/184)
- #
- # Py4j will create a PythonProxyHandler in Java for "transformer_serializer" when
- # calling "registerSerializer". If we call "registerSerializer" twice, the second
- # PythonProxyHandler will override the first one, then the first one will be GCed and
- # trigger "PythonProxyHandler.finalize". To avoid that, we should not call
- # "registerSerializer" more than once, so that "PythonProxyHandler" in Java side won't
- # be GCed.
- #
- # TODO Once Py4J fixes this issue, we should upgrade Py4j to the latest version.
- transformer_serializer.gateway.jvm.PythonDStream.registerSerializer(
- transformer_serializer)
- cls._transformerSerializer = transformer_serializer
- else:
- cls._transformerSerializer.init(
- SparkContext._active_spark_context, CloudPickleSerializer(), gw)
+ cls._transformerSerializer = TransformFunctionSerializer(
+ SparkContext._active_spark_context, CloudPickleSerializer(), gw)
@classmethod
def getOrCreate(cls, checkpointPath, setupFunc):
diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py
index e617fc9ce9..abbbf6eb93 100644
--- a/python/pyspark/streaming/util.py
+++ b/python/pyspark/streaming/util.py
@@ -89,10 +89,11 @@ class TransformFunctionSerializer(object):
it uses this class to invoke Python, which returns the serialized function
as a byte array.
"""
- def init(self, ctx, serializer, gateway=None):
+ def __init__(self, ctx, serializer, gateway=None):
self.ctx = ctx
self.serializer = serializer
self.gateway = gateway or self.ctx._gateway
+ self.gateway.jvm.PythonDStream.registerSerializer(self)
self.failure = None
def dumps(self, id):
diff --git a/sbin/spark-config.sh b/sbin/spark-config.sh
index d8d9d00d64..0c37985a67 100755
--- a/sbin/spark-config.sh
+++ b/sbin/spark-config.sh
@@ -27,4 +27,4 @@ fi
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"
# Add the PySpark classes to the PYTHONPATH:
export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
-export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9-src.zip:${PYTHONPATH}"
+export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:${PYTHONPATH}"
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index 953fe95177..8c9beccc29 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -170,16 +170,6 @@ private[python] object PythonDStream {
}
/**
- * Update the port of callback client to `port`
- */
- def updatePythonGatewayPort(gws: GatewayServer, port: Int): Unit = {
- val cl = gws.getCallbackClient
- val f = cl.getClass.getDeclaredField("port")
- f.setAccessible(true)
- f.setInt(cl, port)
- }
-
- /**
* helper function for DStream.foreachRDD(),
* cannot be `foreachRDD`, it will confusing py4j
*/
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8cf438be58..d4ca255953 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1044,9 +1044,9 @@ private[spark] class Client(
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
require(pyArchivesFile.exists(),
"pyspark.zip not found; cannot run pyspark application in YARN mode.")
- val py4jFile = new File(pyLibPath, "py4j-0.9-src.zip")
+ val py4jFile = new File(pyLibPath, "py4j-0.9.1-src.zip")
require(py4jFile.exists(),
- "py4j-0.9-src.zip not found; cannot run pyspark application in YARN mode.")
+ "py4j-0.9.1-src.zip not found; cannot run pyspark application in YARN mode.")
Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath())
}
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 6db012a77a..b91c4be2ea 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -151,9 +151,9 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
// When running tests, let's not assume the user has built the assembly module, which also
// creates the pyspark archive. Instead, let's use PYSPARK_ARCHIVES_PATH to point at the
// needed locations.
- val sparkHome = sys.props("spark.test.home");
+ val sparkHome = sys.props("spark.test.home")
val pythonPath = Seq(
- s"$sparkHome/python/lib/py4j-0.9-src.zip",
+ s"$sparkHome/python/lib/py4j-0.9.1-src.zip",
s"$sparkHome/python")
val extraEnv = Map(
"PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),