aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/context.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/context.py')
-rw-r--r--python/pyspark/streaming/context.py11
1 files changed, 3 insertions, 8 deletions
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index 4590c58839..ac5ba69e8d 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -20,7 +20,6 @@ from __future__ import print_function
import os
import sys
-from py4j.java_collections import ListConverter
from py4j.java_gateway import java_import, JavaObject
from pyspark import RDD, SparkConf
@@ -305,9 +304,7 @@ class StreamingContext(object):
rdds = [self._sc.parallelize(input) for input in rdds]
self._check_serializers(rdds)
- jrdds = ListConverter().convert([r._jrdd for r in rdds],
- SparkContext._gateway._gateway_client)
- queue = self._jvm.PythonDStream.toRDDQueue(jrdds)
+ queue = self._jvm.PythonDStream.toRDDQueue([r._jrdd for r in rdds])
if default:
default = default._reserialize(rdds[0]._jrdd_deserializer)
jdstream = self._jssc.queueStream(queue, oneAtATime, default._jrdd)
@@ -322,8 +319,7 @@ class StreamingContext(object):
the transform function parameter will be the same as the order
of corresponding DStreams in the list.
"""
- jdstreams = ListConverter().convert([d._jdstream for d in dstreams],
- SparkContext._gateway._gateway_client)
+ jdstreams = [d._jdstream for d in dstreams]
# change the final serializer to sc.serializer
func = TransformFunction(self._sc,
lambda t, *rdds: transformFunc(rdds).map(lambda x: x),
@@ -346,6 +342,5 @@ class StreamingContext(object):
if len(set(s._slideDuration for s in dstreams)) > 1:
raise ValueError("All DStreams should have same slide duration")
first = dstreams[0]
- jrest = ListConverter().convert([d._jdstream for d in dstreams[1:]],
- SparkContext._gateway._gateway_client)
+ jrest = [d._jdstream for d in dstreams[1:]]
return DStream(self._jssc.union(first._jdstream, jrest), self, first._jrdd_deserializer)