aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/dataframe.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-04-21 00:08:18 -0700
committerReynold Xin <rxin@databricks.com>2015-04-21 00:08:18 -0700
commitab9128fb7ec7ca479dc91e7cc7c775e8d071eafa (patch)
tree88b7b9582617ef0fda39de8c04e9b0fdde030533 /python/pyspark/sql/dataframe.py
parent8136810dfad12008ac300116df7bc8448740f1ae (diff)
downloadspark-ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa.tar.gz
spark-ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa.tar.bz2
spark-ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa.zip
[SPARK-6949] [SQL] [PySpark] Support Date/Timestamp in Column expression
This PR enable auto_convert in JavaGateway, then we could register a converter for a given types, for example, date and datetime. There are two bugs related to auto_convert, see [1] and [2], we workaround it in this PR. [1] https://github.com/bartdag/py4j/issues/160 [2] https://github.com/bartdag/py4j/issues/161 cc rxin JoshRosen Author: Davies Liu <davies@databricks.com> Closes #5570 from davies/py4j_date and squashes the following commits: eb4fa53 [Davies Liu] fix tests in python 3 d17d634 [Davies Liu] rollback changes in mllib 2e7566d [Davies Liu] convert tuple into ArrayList ceb3779 [Davies Liu] Update rdd.py 3c373f3 [Davies Liu] support date and datetime by auto_convert cb094ff [Davies Liu] enable auto convert
Diffstat (limited to 'python/pyspark/sql/dataframe.py')
-rw-r--r--python/pyspark/sql/dataframe.py18
1 files changed, 4 insertions, 14 deletions
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 75c181c0c7..ca9bf8efb9 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -25,8 +25,6 @@ if sys.version >= '3':
else:
from itertools import imap as map
-from py4j.java_collections import ListConverter, MapConverter
-
from pyspark.context import SparkContext
from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deserializer
@@ -186,9 +184,7 @@ class DataFrame(object):
source = self.sql_ctx.getConf("spark.sql.sources.default",
"org.apache.spark.sql.parquet")
jmode = self._java_save_mode(mode)
- joptions = MapConverter().convert(options,
- self.sql_ctx._sc._gateway._gateway_client)
- self._jdf.saveAsTable(tableName, source, jmode, joptions)
+ self._jdf.saveAsTable(tableName, source, jmode, options)
def save(self, path=None, source=None, mode="error", **options):
"""Saves the contents of the :class:`DataFrame` to a data source.
@@ -211,9 +207,7 @@ class DataFrame(object):
source = self.sql_ctx.getConf("spark.sql.sources.default",
"org.apache.spark.sql.parquet")
jmode = self._java_save_mode(mode)
- joptions = MapConverter().convert(options,
- self._sc._gateway._gateway_client)
- self._jdf.save(source, jmode, joptions)
+ self._jdf.save(source, jmode, options)
@property
def schema(self):
@@ -819,7 +813,6 @@ class DataFrame(object):
value = float(value)
if isinstance(value, dict):
- value = MapConverter().convert(value, self.sql_ctx._sc._gateway._gateway_client)
return DataFrame(self._jdf.na().fill(value), self.sql_ctx)
elif subset is None:
return DataFrame(self._jdf.na().fill(value), self.sql_ctx)
@@ -932,9 +925,7 @@ class GroupedData(object):
"""
assert exprs, "exprs should not be empty"
if len(exprs) == 1 and isinstance(exprs[0], dict):
- jmap = MapConverter().convert(exprs[0],
- self.sql_ctx._sc._gateway._gateway_client)
- jdf = self._jdf.agg(jmap)
+ jdf = self._jdf.agg(exprs[0])
else:
# Columns
assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column"
@@ -1040,8 +1031,7 @@ def _to_seq(sc, cols, converter=None):
"""
if converter:
cols = [converter(c) for c in cols]
- jcols = ListConverter().convert(cols, sc._gateway._gateway_client)
- return sc._jvm.PythonUtils.toSeq(jcols)
+ return sc._jvm.PythonUtils.toSeq(cols)
def _unary_op(name, doc="unary operator"):