aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/types.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-06-11 01:00:41 -0700
committerReynold Xin <rxin@databricks.com>2015-06-11 01:00:41 -0700
commit424b0075a1a31c251451c6a75c6ba8e81c39453d (patch)
tree2b199ad2664963648c4caf83fae93391922606f0 /python/pyspark/sql/types.py
parent6b68366df345d4572cf138f9efe17e23d0d1971e (diff)
downloadspark-424b0075a1a31c251451c6a75c6ba8e81c39453d.tar.gz
spark-424b0075a1a31c251451c6a75c6ba8e81c39453d.tar.bz2
spark-424b0075a1a31c251451c6a75c6ba8e81c39453d.zip
[SPARK-6411] [SQL] [PySpark] support date/datetime with timezone in Python
Spark SQL does not support timezone, and Pyrolite does not support timezone well. This patch will convert datetime into POSIX timestamp (without confusing of timezone), which is used by SQL. If the datetime object does not have timezone, it's treated as local time. The timezone in RDD will be lost after one round trip, all the datetime from SQL will be local time. Because of Pyrolite, datetime from SQL only has precision as 1 millisecond. This PR also drop the timezone in date, convert it to number of days since epoch (used in SQL). Author: Davies Liu <davies@databricks.com> Closes #6250 from davies/tzone and squashes the following commits: 44d8497 [Davies Liu] add timezone support for DateType 99d9d9c [Davies Liu] use int for timestamp 10aa7ca [Davies Liu] Merge branch 'master' of github.com:apache/spark into tzone 6a29aa4 [Davies Liu] support datetime with timezone
Diffstat (limited to 'python/pyspark/sql/types.py')
-rw-r--r--python/pyspark/sql/types.py27
1 files changed, 18 insertions, 9 deletions
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 8f286b631f..23d9adb0da 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -655,12 +655,15 @@ def _need_python_to_sql_conversion(dataType):
_need_python_to_sql_conversion(dataType.valueType)
elif isinstance(dataType, UserDefinedType):
return True
- elif isinstance(dataType, TimestampType):
+ elif isinstance(dataType, (DateType, TimestampType)):
return True
else:
return False
+EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal()
+
+
def _python_to_sql_converter(dataType):
"""
Returns a converter that converts a Python object into a SQL datum for the given type.
@@ -698,26 +701,32 @@ def _python_to_sql_converter(dataType):
return tuple(c(d.get(n)) for n, c in zip(names, converters))
else:
return tuple(c(v) for c, v in zip(converters, obj))
- else:
+ elif obj is not None:
raise ValueError("Unexpected tuple %r with type %r" % (obj, dataType))
return converter
elif isinstance(dataType, ArrayType):
element_converter = _python_to_sql_converter(dataType.elementType)
- return lambda a: [element_converter(v) for v in a]
+ return lambda a: a and [element_converter(v) for v in a]
elif isinstance(dataType, MapType):
key_converter = _python_to_sql_converter(dataType.keyType)
value_converter = _python_to_sql_converter(dataType.valueType)
- return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()])
+ return lambda m: m and dict([(key_converter(k), value_converter(v)) for k, v in m.items()])
+
elif isinstance(dataType, UserDefinedType):
- return lambda obj: dataType.serialize(obj)
+ return lambda obj: obj and dataType.serialize(obj)
+
+ elif isinstance(dataType, DateType):
+ return lambda d: d and d.toordinal() - EPOCH_ORDINAL
+
elif isinstance(dataType, TimestampType):
def to_posix_timstamp(dt):
- if dt.tzinfo is None:
- return int(time.mktime(dt.timetuple()) * 1e7 + dt.microsecond * 10)
- else:
- return int(calendar.timegm(dt.utctimetuple()) * 1e7 + dt.microsecond * 10)
+ if dt:
+ seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
+ else time.mktime(dt.timetuple()))
+ return int(seconds * 1e7 + dt.microsecond * 10)
return to_posix_timstamp
+
else:
raise ValueError("Unexpected type %r" % dataType)