diff options
author | Davies Liu <davies@databricks.com> | 2015-04-21 00:08:18 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-04-21 00:08:18 -0700 |
commit | ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa (patch) | |
tree | 88b7b9582617ef0fda39de8c04e9b0fdde030533 /python/pyspark/sql/dataframe.py | |
parent | 8136810dfad12008ac300116df7bc8448740f1ae (diff) | |
download | spark-ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa.tar.gz spark-ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa.tar.bz2 spark-ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa.zip |
[SPARK-6949] [SQL] [PySpark] Support Date/Timestamp in Column expression
This PR enable auto_convert in JavaGateway, then we could register a converter for a given types, for example, date and datetime.
There are two bugs related to auto_convert, see [1] and [2], we workaround it in this PR.
[1] https://github.com/bartdag/py4j/issues/160
[2] https://github.com/bartdag/py4j/issues/161
cc rxin JoshRosen
Author: Davies Liu <davies@databricks.com>
Closes #5570 from davies/py4j_date and squashes the following commits:
eb4fa53 [Davies Liu] fix tests in python 3
d17d634 [Davies Liu] rollback changes in mllib
2e7566d [Davies Liu] convert tuple into ArrayList
ceb3779 [Davies Liu] Update rdd.py
3c373f3 [Davies Liu] support date and datetime by auto_convert
cb094ff [Davies Liu] enable auto convert
Diffstat (limited to 'python/pyspark/sql/dataframe.py')
-rw-r--r-- | python/pyspark/sql/dataframe.py | 18 |
1 files changed, 4 insertions, 14 deletions
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 75c181c0c7..ca9bf8efb9 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -25,8 +25,6 @@ if sys.version >= '3': else: from itertools import imap as map -from py4j.java_collections import ListConverter, MapConverter - from pyspark.context import SparkContext from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deserializer @@ -186,9 +184,7 @@ class DataFrame(object): source = self.sql_ctx.getConf("spark.sql.sources.default", "org.apache.spark.sql.parquet") jmode = self._java_save_mode(mode) - joptions = MapConverter().convert(options, - self.sql_ctx._sc._gateway._gateway_client) - self._jdf.saveAsTable(tableName, source, jmode, joptions) + self._jdf.saveAsTable(tableName, source, jmode, options) def save(self, path=None, source=None, mode="error", **options): """Saves the contents of the :class:`DataFrame` to a data source. @@ -211,9 +207,7 @@ class DataFrame(object): source = self.sql_ctx.getConf("spark.sql.sources.default", "org.apache.spark.sql.parquet") jmode = self._java_save_mode(mode) - joptions = MapConverter().convert(options, - self._sc._gateway._gateway_client) - self._jdf.save(source, jmode, joptions) + self._jdf.save(source, jmode, options) @property def schema(self): @@ -819,7 +813,6 @@ class DataFrame(object): value = float(value) if isinstance(value, dict): - value = MapConverter().convert(value, self.sql_ctx._sc._gateway._gateway_client) return DataFrame(self._jdf.na().fill(value), self.sql_ctx) elif subset is None: return DataFrame(self._jdf.na().fill(value), self.sql_ctx) @@ -932,9 +925,7 @@ class GroupedData(object): """ assert exprs, "exprs should not be empty" if len(exprs) == 1 and isinstance(exprs[0], dict): - jmap = MapConverter().convert(exprs[0], - self.sql_ctx._sc._gateway._gateway_client) - jdf = self._jdf.agg(jmap) + jdf = self._jdf.agg(exprs[0]) else: # Columns assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column" @@ -1040,8 +1031,7 @@ def _to_seq(sc, cols, converter=None): """ if converter: cols = [converter(c) for c in cols] - jcols = ListConverter().convert(cols, sc._gateway._gateway_client) - return sc._jvm.PythonUtils.toSeq(jcols) + return sc._jvm.PythonUtils.toSeq(cols) def _unary_op(name, doc="unary operator"): |