diff options
author | Davies Liu <davies@databricks.com> | 2015-04-21 00:08:18 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-04-21 00:08:18 -0700 |
commit | ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa (patch) | |
tree | 88b7b9582617ef0fda39de8c04e9b0fdde030533 /python/pyspark/context.py | |
parent | 8136810dfad12008ac300116df7bc8448740f1ae (diff) | |
download | spark-ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa.tar.gz spark-ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa.tar.bz2 spark-ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa.zip |
[SPARK-6949] [SQL] [PySpark] Support Date/Timestamp in Column expression
This PR enable auto_convert in JavaGateway, then we could register a converter for a given types, for example, date and datetime.
There are two bugs related to auto_convert, see [1] and [2], we workaround it in this PR.
[1] https://github.com/bartdag/py4j/issues/160
[2] https://github.com/bartdag/py4j/issues/161
cc rxin JoshRosen
Author: Davies Liu <davies@databricks.com>
Closes #5570 from davies/py4j_date and squashes the following commits:
eb4fa53 [Davies Liu] fix tests in python 3
d17d634 [Davies Liu] rollback changes in mllib
2e7566d [Davies Liu] convert tuple into ArrayList
ceb3779 [Davies Liu] Update rdd.py
3c373f3 [Davies Liu] support date and datetime by auto_convert
cb094ff [Davies Liu] enable auto convert
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r-- | python/pyspark/context.py | 6 |
1 files changed, 1 insertions, 5 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 6a743ac8bd..b006120eb2 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -23,8 +23,6 @@ import sys from threading import Lock from tempfile import NamedTemporaryFile -from py4j.java_collections import ListConverter - from pyspark import accumulators from pyspark.accumulators import Accumulator from pyspark.broadcast import Broadcast @@ -643,7 +641,6 @@ class SparkContext(object): rdds = [x._reserialize() for x in rdds] first = rdds[0]._jrdd rest = [x._jrdd for x in rdds[1:]] - rest = ListConverter().convert(rest, self._gateway._gateway_client) return RDD(self._jsc.union(first, rest), self, rdds[0]._jrdd_deserializer) def broadcast(self, value): @@ -846,13 +843,12 @@ class SparkContext(object): """ if partitions is None: partitions = range(rdd._jrdd.partitions().size()) - javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client) # Implementation note: This is implemented as a mapPartitions followed # by runJob() in order to avoid having to pass a Python lambda into # SparkContext#runJob. mappedRDD = rdd.mapPartitions(partitionFunc) - port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, + port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions, allowLocal) return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) |