diff options
author | Davies Liu <davies@databricks.com> | 2015-06-11 01:00:41 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-06-11 01:00:41 -0700 |
commit | 424b0075a1a31c251451c6a75c6ba8e81c39453d (patch) | |
tree | 2b199ad2664963648c4caf83fae93391922606f0 /python/pyspark | |
parent | 6b68366df345d4572cf138f9efe17e23d0d1971e (diff) | |
download | spark-424b0075a1a31c251451c6a75c6ba8e81c39453d.tar.gz spark-424b0075a1a31c251451c6a75c6ba8e81c39453d.tar.bz2 spark-424b0075a1a31c251451c6a75c6ba8e81c39453d.zip |
[SPARK-6411] [SQL] [PySpark] support date/datetime with timezone in Python
Spark SQL does not support timezone, and Pyrolite does not support timezone well. This patch will convert datetime into POSIX timestamp (without confusing of timezone), which is used by SQL. If the datetime object does not have timezone, it's treated as local time.
The timezone in RDD will be lost after one round trip, all the datetime from SQL will be local time.
Because of Pyrolite, datetime from SQL only has precision as 1 millisecond.
This PR also drop the timezone in date, convert it to number of days since epoch (used in SQL).
Author: Davies Liu <davies@databricks.com>
Closes #6250 from davies/tzone and squashes the following commits:
44d8497 [Davies Liu] add timezone support for DateType
99d9d9c [Davies Liu] use int for timestamp
10aa7ca [Davies Liu] Merge branch 'master' of github.com:apache/spark into tzone
6a29aa4 [Davies Liu] support datetime with timezone
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/sql/tests.py | 32 | ||||
-rw-r--r-- | python/pyspark/sql/types.py | 27 |
2 files changed, 50 insertions, 9 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index a6fce50c76..b5fbb7d098 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -26,6 +26,7 @@ import shutil import tempfile import pickle import functools +import time import datetime import py4j @@ -47,6 +48,20 @@ from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.window import Window +class UTC(datetime.tzinfo): + """UTC""" + ZERO = datetime.timedelta(0) + + def utcoffset(self, dt): + return self.ZERO + + def tzname(self, dt): + return "UTC" + + def dst(self, dt): + return self.ZERO + + class ExamplePointUDT(UserDefinedType): """ User-defined type (UDT) for ExamplePoint. @@ -588,6 +603,23 @@ class SQLTests(ReusedPySparkTestCase): self.assertEqual(0, df.filter(df.date > date).count()) self.assertEqual(0, df.filter(df.time > time).count()) + def test_time_with_timezone(self): + day = datetime.date.today() + now = datetime.datetime.now() + ts = time.mktime(now.timetuple()) + now.microsecond / 1e6 + # class in __main__ is not serializable + from pyspark.sql.tests import UTC + utc = UTC() + utcnow = datetime.datetime.fromtimestamp(ts, utc) + df = self.sqlCtx.createDataFrame([(day, now, utcnow)]) + day1, now1, utcnow1 = df.first() + # Pyrolite serialize java.sql.Date as datetime, will be fixed in new version + self.assertEqual(day1.date(), day) + # Pyrolite does not support microsecond, the error should be + # less than 1 millisecond + self.assertTrue(now - now1 < datetime.timedelta(0.001)) + self.assertTrue(now - utcnow1 < datetime.timedelta(0.001)) + def test_dropna(self): schema = StructType([ StructField("name", StringType(), True), diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 8f286b631f..23d9adb0da 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -655,12 +655,15 @@ def _need_python_to_sql_conversion(dataType): _need_python_to_sql_conversion(dataType.valueType) elif isinstance(dataType, UserDefinedType): return True - elif isinstance(dataType, TimestampType): + elif isinstance(dataType, (DateType, TimestampType)): return True else: return False +EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal() + + def _python_to_sql_converter(dataType): """ Returns a converter that converts a Python object into a SQL datum for the given type. @@ -698,26 +701,32 @@ def _python_to_sql_converter(dataType): return tuple(c(d.get(n)) for n, c in zip(names, converters)) else: return tuple(c(v) for c, v in zip(converters, obj)) - else: + elif obj is not None: raise ValueError("Unexpected tuple %r with type %r" % (obj, dataType)) return converter elif isinstance(dataType, ArrayType): element_converter = _python_to_sql_converter(dataType.elementType) - return lambda a: [element_converter(v) for v in a] + return lambda a: a and [element_converter(v) for v in a] elif isinstance(dataType, MapType): key_converter = _python_to_sql_converter(dataType.keyType) value_converter = _python_to_sql_converter(dataType.valueType) - return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()]) + return lambda m: m and dict([(key_converter(k), value_converter(v)) for k, v in m.items()]) + elif isinstance(dataType, UserDefinedType): - return lambda obj: dataType.serialize(obj) + return lambda obj: obj and dataType.serialize(obj) + + elif isinstance(dataType, DateType): + return lambda d: d and d.toordinal() - EPOCH_ORDINAL + elif isinstance(dataType, TimestampType): def to_posix_timstamp(dt): - if dt.tzinfo is None: - return int(time.mktime(dt.timetuple()) * 1e7 + dt.microsecond * 10) - else: - return int(calendar.timegm(dt.utctimetuple()) * 1e7 + dt.microsecond * 10) + if dt: + seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo + else time.mktime(dt.timetuple())) + return int(seconds * 1e7 + dt.microsecond * 10) return to_posix_timstamp + else: raise ValueError("Unexpected type %r" % dataType) |