aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-04-21 00:08:18 -0700
committerReynold Xin <rxin@databricks.com>2015-04-21 00:08:18 -0700
commitab9128fb7ec7ca479dc91e7cc7c775e8d071eafa (patch)
tree88b7b9582617ef0fda39de8c04e9b0fdde030533 /python/pyspark/streaming
parent8136810dfad12008ac300116df7bc8448740f1ae (diff)
downloadspark-ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa.tar.gz
spark-ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa.tar.bz2
spark-ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa.zip
[SPARK-6949] [SQL] [PySpark] Support Date/Timestamp in Column expression
This PR enable auto_convert in JavaGateway, then we could register a converter for a given types, for example, date and datetime. There are two bugs related to auto_convert, see [1] and [2], we workaround it in this PR. [1] https://github.com/bartdag/py4j/issues/160 [2] https://github.com/bartdag/py4j/issues/161 cc rxin JoshRosen Author: Davies Liu <davies@databricks.com> Closes #5570 from davies/py4j_date and squashes the following commits: eb4fa53 [Davies Liu] fix tests in python 3 d17d634 [Davies Liu] rollback changes in mllib 2e7566d [Davies Liu] convert tuple into ArrayList ceb3779 [Davies Liu] Update rdd.py 3c373f3 [Davies Liu] support date and datetime by auto_convert cb094ff [Davies Liu] enable auto convert
Diffstat (limited to 'python/pyspark/streaming')
-rw-r--r--python/pyspark/streaming/context.py11
-rw-r--r--python/pyspark/streaming/kafka.py7
-rw-r--r--python/pyspark/streaming/tests.py6
3 files changed, 6 insertions, 18 deletions
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index 4590c58839..ac5ba69e8d 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -20,7 +20,6 @@ from __future__ import print_function
import os
import sys
-from py4j.java_collections import ListConverter
from py4j.java_gateway import java_import, JavaObject
from pyspark import RDD, SparkConf
@@ -305,9 +304,7 @@ class StreamingContext(object):
rdds = [self._sc.parallelize(input) for input in rdds]
self._check_serializers(rdds)
- jrdds = ListConverter().convert([r._jrdd for r in rdds],
- SparkContext._gateway._gateway_client)
- queue = self._jvm.PythonDStream.toRDDQueue(jrdds)
+ queue = self._jvm.PythonDStream.toRDDQueue([r._jrdd for r in rdds])
if default:
default = default._reserialize(rdds[0]._jrdd_deserializer)
jdstream = self._jssc.queueStream(queue, oneAtATime, default._jrdd)
@@ -322,8 +319,7 @@ class StreamingContext(object):
the transform function parameter will be the same as the order
of corresponding DStreams in the list.
"""
- jdstreams = ListConverter().convert([d._jdstream for d in dstreams],
- SparkContext._gateway._gateway_client)
+ jdstreams = [d._jdstream for d in dstreams]
# change the final serializer to sc.serializer
func = TransformFunction(self._sc,
lambda t, *rdds: transformFunc(rdds).map(lambda x: x),
@@ -346,6 +342,5 @@ class StreamingContext(object):
if len(set(s._slideDuration for s in dstreams)) > 1:
raise ValueError("All DStreams should have same slide duration")
first = dstreams[0]
- jrest = ListConverter().convert([d._jdstream for d in dstreams[1:]],
- SparkContext._gateway._gateway_client)
+ jrest = [d._jdstream for d in dstreams[1:]]
return DStream(self._jssc.union(first._jdstream, jrest), self, first._jrdd_deserializer)
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index 7a7b6e1d9a..8d610d6569 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -15,8 +15,7 @@
# limitations under the License.
#
-from py4j.java_collections import MapConverter
-from py4j.java_gateway import java_import, Py4JError, Py4JJavaError
+from py4j.java_gateway import Py4JJavaError
from pyspark.storagelevel import StorageLevel
from pyspark.serializers import PairDeserializer, NoOpSerializer
@@ -57,8 +56,6 @@ class KafkaUtils(object):
})
if not isinstance(topics, dict):
raise TypeError("topics should be dict")
- jtopics = MapConverter().convert(topics, ssc.sparkContext._gateway._gateway_client)
- jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client)
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
try:
@@ -66,7 +63,7 @@ class KafkaUtils(object):
helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
.loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
helper = helperClass.newInstance()
- jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel)
+ jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
except Py4JJavaError as e:
# TODO: use --jar once it also work on driver
if 'ClassNotFoundException' in str(e.java_exception):
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 06d2215437..33f958a601 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -24,8 +24,6 @@ import tempfile
import struct
from functools import reduce
-from py4j.java_collections import MapConverter
-
from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
@@ -581,11 +579,9 @@ class KafkaStreamTests(PySparkStreamingTestCase):
"""Test the Python Kafka stream API."""
topic = "topic1"
sendData = {"a": 3, "b": 5, "c": 10}
- jSendData = MapConverter().convert(sendData,
- self.ssc.sparkContext._gateway._gateway_client)
self._kafkaTestUtils.createTopic(topic)
- self._kafkaTestUtils.sendMessages(topic, jSendData)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
"test-streaming-consumer", {topic: 1},