diff options
author | Davies Liu <davies@databricks.com> | 2015-04-21 00:08:18 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-04-21 00:08:18 -0700 |
commit | ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa (patch) | |
tree | 88b7b9582617ef0fda39de8c04e9b0fdde030533 /python/pyspark/streaming | |
parent | 8136810dfad12008ac300116df7bc8448740f1ae (diff) | |
download | spark-ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa.tar.gz spark-ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa.tar.bz2 spark-ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa.zip |
[SPARK-6949] [SQL] [PySpark] Support Date/Timestamp in Column expression
This PR enable auto_convert in JavaGateway, then we could register a converter for a given types, for example, date and datetime.
There are two bugs related to auto_convert, see [1] and [2], we workaround it in this PR.
[1] https://github.com/bartdag/py4j/issues/160
[2] https://github.com/bartdag/py4j/issues/161
cc rxin JoshRosen
Author: Davies Liu <davies@databricks.com>
Closes #5570 from davies/py4j_date and squashes the following commits:
eb4fa53 [Davies Liu] fix tests in python 3
d17d634 [Davies Liu] rollback changes in mllib
2e7566d [Davies Liu] convert tuple into ArrayList
ceb3779 [Davies Liu] Update rdd.py
3c373f3 [Davies Liu] support date and datetime by auto_convert
cb094ff [Davies Liu] enable auto convert
Diffstat (limited to 'python/pyspark/streaming')
-rw-r--r-- | python/pyspark/streaming/context.py | 11 | ||||
-rw-r--r-- | python/pyspark/streaming/kafka.py | 7 | ||||
-rw-r--r-- | python/pyspark/streaming/tests.py | 6 |
3 files changed, 6 insertions, 18 deletions
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 4590c58839..ac5ba69e8d 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -20,7 +20,6 @@ from __future__ import print_function import os import sys -from py4j.java_collections import ListConverter from py4j.java_gateway import java_import, JavaObject from pyspark import RDD, SparkConf @@ -305,9 +304,7 @@ class StreamingContext(object): rdds = [self._sc.parallelize(input) for input in rdds] self._check_serializers(rdds) - jrdds = ListConverter().convert([r._jrdd for r in rdds], - SparkContext._gateway._gateway_client) - queue = self._jvm.PythonDStream.toRDDQueue(jrdds) + queue = self._jvm.PythonDStream.toRDDQueue([r._jrdd for r in rdds]) if default: default = default._reserialize(rdds[0]._jrdd_deserializer) jdstream = self._jssc.queueStream(queue, oneAtATime, default._jrdd) @@ -322,8 +319,7 @@ class StreamingContext(object): the transform function parameter will be the same as the order of corresponding DStreams in the list. """ - jdstreams = ListConverter().convert([d._jdstream for d in dstreams], - SparkContext._gateway._gateway_client) + jdstreams = [d._jdstream for d in dstreams] # change the final serializer to sc.serializer func = TransformFunction(self._sc, lambda t, *rdds: transformFunc(rdds).map(lambda x: x), @@ -346,6 +342,5 @@ class StreamingContext(object): if len(set(s._slideDuration for s in dstreams)) > 1: raise ValueError("All DStreams should have same slide duration") first = dstreams[0] - jrest = ListConverter().convert([d._jdstream for d in dstreams[1:]], - SparkContext._gateway._gateway_client) + jrest = [d._jdstream for d in dstreams[1:]] return DStream(self._jssc.union(first._jdstream, jrest), self, first._jrdd_deserializer) diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index 7a7b6e1d9a..8d610d6569 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -15,8 +15,7 @@ # limitations under the License. # -from py4j.java_collections import MapConverter -from py4j.java_gateway import java_import, Py4JError, Py4JJavaError +from py4j.java_gateway import Py4JJavaError from pyspark.storagelevel import StorageLevel from pyspark.serializers import PairDeserializer, NoOpSerializer @@ -57,8 +56,6 @@ class KafkaUtils(object): }) if not isinstance(topics, dict): raise TypeError("topics should be dict") - jtopics = MapConverter().convert(topics, ssc.sparkContext._gateway._gateway_client) - jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client) jlevel = ssc._sc._getJavaStorageLevel(storageLevel) try: @@ -66,7 +63,7 @@ class KafkaUtils(object): helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\ .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper") helper = helperClass.newInstance() - jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel) + jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel) except Py4JJavaError as e: # TODO: use --jar once it also work on driver if 'ClassNotFoundException' in str(e.java_exception): diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 06d2215437..33f958a601 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -24,8 +24,6 @@ import tempfile import struct from functools import reduce -from py4j.java_collections import MapConverter - from pyspark.context import SparkConf, SparkContext, RDD from pyspark.streaming.context import StreamingContext from pyspark.streaming.kafka import KafkaUtils @@ -581,11 +579,9 @@ class KafkaStreamTests(PySparkStreamingTestCase): """Test the Python Kafka stream API.""" topic = "topic1" sendData = {"a": 3, "b": 5, "c": 10} - jSendData = MapConverter().convert(sendData, - self.ssc.sparkContext._gateway._gateway_client) self._kafkaTestUtils.createTopic(topic) - self._kafkaTestUtils.sendMessages(topic, jSendData) + self._kafkaTestUtils.sendMessages(topic, sendData) stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(), "test-streaming-consumer", {topic: 1}, |