aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason White <jason.white@shopify.com>2016-10-03 14:12:03 -0700
committerDavies Liu <davies.liu@gmail.com>2016-10-03 14:12:03 -0700
commit1f31bdaef670dd43999613deae3620f4ddcd1fbf (patch)
tree0814b539f497beddf89f2421e90aa93d0bf83c6b
parent1dd68d3827133d203e85294405400b04904879e0 (diff)
downloadspark-1f31bdaef670dd43999613deae3620f4ddcd1fbf.tar.gz
spark-1f31bdaef670dd43999613deae3620f4ddcd1fbf.tar.bz2
spark-1f31bdaef670dd43999613deae3620f4ddcd1fbf.zip
[SPARK-17679] [PYSPARK] remove unnecessary Py4J ListConverter patch
## What changes were proposed in this pull request? This PR removes a patch on ListConverter from https://github.com/apache/spark/pull/5570, as it is no longer necessary. The underlying issue in Py4J https://github.com/bartdag/py4j/issues/160 was patched in https://github.com/bartdag/py4j/commit/224b94b6665e56a93a064073886e1d803a4969d2 and is present in 0.10.3, the version currently in use in Spark. ## How was this patch tested? The original test added in https://github.com/apache/spark/pull/5570 remains. Author: Jason White <jason.white@shopify.com> Closes #15254 from JasonMWhite/remove_listconverter_patch.
-rw-r--r--python/pyspark/java_gateway.py9
-rw-r--r--python/pyspark/ml/common.py4
-rw-r--r--python/pyspark/mllib/common.py4
-rw-r--r--python/pyspark/rdd.py13
4 files changed, 6 insertions, 24 deletions
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 527ca82d31..f76cadcf62 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -29,18 +29,9 @@ if sys.version >= '3':
xrange = range
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
-from py4j.java_collections import ListConverter
-
from pyspark.serializers import read_int
-# patching ListConverter, or it will convert bytearray into Java ArrayList
-def can_convert_list(self, obj):
- return isinstance(obj, (list, tuple, xrange))
-
-ListConverter.can_convert = can_convert_list
-
-
def launch_gateway():
if "PYSPARK_GATEWAY_PORT" in os.environ:
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
diff --git a/python/pyspark/ml/common.py b/python/pyspark/ml/common.py
index aec860fca7..387c5d7309 100644
--- a/python/pyspark/ml/common.py
+++ b/python/pyspark/ml/common.py
@@ -23,7 +23,7 @@ if sys.version >= '3':
import py4j.protocol
from py4j.protocol import Py4JJavaError
from py4j.java_gateway import JavaObject
-from py4j.java_collections import ListConverter, JavaArray, JavaList
+from py4j.java_collections import JavaArray, JavaList
from pyspark import RDD, SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
@@ -76,7 +76,7 @@ def _py2java(sc, obj):
elif isinstance(obj, SparkContext):
obj = obj._jsc
elif isinstance(obj, list):
- obj = ListConverter().convert([_py2java(sc, x) for x in obj], sc._gateway._gateway_client)
+ obj = [_py2java(sc, x) for x in obj]
elif isinstance(obj, JavaObject):
pass
elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
diff --git a/python/pyspark/mllib/common.py b/python/pyspark/mllib/common.py
index 21f0e09ea7..bac8f35056 100644
--- a/python/pyspark/mllib/common.py
+++ b/python/pyspark/mllib/common.py
@@ -23,7 +23,7 @@ if sys.version >= '3':
import py4j.protocol
from py4j.protocol import Py4JJavaError
from py4j.java_gateway import JavaObject
-from py4j.java_collections import ListConverter, JavaArray, JavaList
+from py4j.java_collections import JavaArray, JavaList
from pyspark import RDD, SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
@@ -78,7 +78,7 @@ def _py2java(sc, obj):
elif isinstance(obj, SparkContext):
obj = obj._jsc
elif isinstance(obj, list):
- obj = ListConverter().convert([_py2java(sc, x) for x in obj], sc._gateway._gateway_client)
+ obj = [_py2java(sc, x) for x in obj]
elif isinstance(obj, JavaObject):
pass
elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 5fb10f86f4..ed81eb16df 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -52,8 +52,6 @@ from pyspark.shuffle import Aggregator, ExternalMerger, \
get_used_memory, ExternalSorter, ExternalGroupBy
from pyspark.traceback_utils import SCCallSiteSync
-from py4j.java_collections import ListConverter, MapConverter
-
__all__ = ["RDD"]
@@ -2317,16 +2315,9 @@ def _prepare_for_python_RDD(sc, command):
# The broadcast will have same life cycle as created PythonRDD
broadcast = sc.broadcast(pickled_command)
pickled_command = ser.dumps(broadcast)
- # There is a bug in py4j.java_gateway.JavaClass with auto_convert
- # https://github.com/bartdag/py4j/issues/161
- # TODO: use auto_convert once py4j fix the bug
- broadcast_vars = ListConverter().convert(
- [x._jbroadcast for x in sc._pickled_broadcast_vars],
- sc._gateway._gateway_client)
+ broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars]
sc._pickled_broadcast_vars.clear()
- env = MapConverter().convert(sc.environment, sc._gateway._gateway_client)
- includes = ListConverter().convert(sc._python_includes, sc._gateway._gateway_client)
- return pickled_command, broadcast_vars, env, includes
+ return pickled_command, broadcast_vars, sc.environment, sc._python_includes
def _wrap_function(sc, func, deserializer, serializer, profiler=None):