aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/sql/functions.py52
-rw-r--r--python/pyspark/sql/tests.py9
2 files changed, 55 insertions, 6 deletions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 02c2350dc2..40727ab12b 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -143,6 +143,12 @@ _functions_2_1 = {
'measured in radians.',
}
+_functions_2_2 = {
+ 'to_date': 'Converts a string date into a DateType using the (optionally) specified format.',
+ 'to_timestamp': 'Converts a string timestamp into a timestamp type using the ' +
+ '(optionally) specified format.',
+}
+
# math functions that take two arguments as input
_binary_mathfunctions = {
'atan2': 'Returns the angle theta from the conversion of rectangular coordinates (x, y) to' +
@@ -976,18 +982,52 @@ def months_between(date1, date2):
return Column(sc._jvm.functions.months_between(_to_java_column(date1), _to_java_column(date2)))
-@since(1.5)
-def to_date(col):
- """
- Converts the column of :class:`pyspark.sql.types.StringType` or
- :class:`pyspark.sql.types.TimestampType` into :class:`pyspark.sql.types.DateType`.
+@since(2.2)
+def to_date(col, format=None):
+ """Converts a :class:`Column` of :class:`pyspark.sql.types.StringType` or
+ :class:`pyspark.sql.types.TimestampType` into :class:`pyspark.sql.types.DateType`
+ using the optionally specified format. Default format is 'yyyy-MM-dd'.
+ Specify formats according to
+ `SimpleDateFormats <http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html>`_.
>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_date(df.t).alias('date')).collect()
[Row(date=datetime.date(1997, 2, 28))]
+
+ >>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
+ >>> df.select(to_date(df.t, 'yyyy-MM-dd HH:mm:ss').alias('date')).collect()
+ [Row(date=datetime.date(1997, 2, 28))]
"""
sc = SparkContext._active_spark_context
- return Column(sc._jvm.functions.to_date(_to_java_column(col)))
+ if format is None:
+ jc = sc._jvm.functions.to_date(_to_java_column(col))
+ else:
+ jc = sc._jvm.functions.to_date(_to_java_column(col), format)
+ return Column(jc)
+
+
+@since(2.2)
+def to_timestamp(col, format=None):
+ """Converts a :class:`Column` of :class:`pyspark.sql.types.StringType` or
+ :class:`pyspark.sql.types.TimestampType` into :class:`pyspark.sql.types.DateType`
+ using the optionally specified format. Default format is 'yyyy-MM-dd HH:mm:ss'. Specify
+ formats according to
+ `SimpleDateFormats <http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html>`_.
+
+ >>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
+ >>> df.select(to_timestamp(df.t).alias('dt')).collect()
+ [Row(dt=datetime.datetime(1997, 2, 28, 10, 30))]
+
+ >>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
+ >>> df.select(to_timestamp(df.t, 'yyyy-MM-dd HH:mm:ss').alias('dt')).collect()
+ [Row(dt=datetime.datetime(1997, 2, 28, 10, 30))]
+ """
+ sc = SparkContext._active_spark_context
+ if format is None:
+ jc = sc._jvm.functions.to_timestamp(_to_java_column(col))
+ else:
+ jc = sc._jvm.functions.to_timestamp(_to_java_column(col), format)
+ return Column(jc)
@since(1.5)
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 86cad4b363..710585cbe2 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1807,6 +1807,8 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue("+" in functions)
self.assertTrue("like" in functions)
self.assertTrue("month" in functions)
+ self.assertTrue("to_date" in functions)
+ self.assertTrue("to_timestamp" in functions)
self.assertTrue("to_unix_timestamp" in functions)
self.assertTrue("current_database" in functions)
self.assertEquals(functions["+"], Function(
@@ -2189,6 +2191,13 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
# Regression test for SPARK-17514: limit(n).collect() should the perform same as take(n)
assert_runs_only_one_job_stage_and_task("collect_limit", lambda: df.limit(1).collect())
+ def test_datetime_functions(self):
+ from pyspark.sql import functions
+ from datetime import date, datetime
+ df = self.spark.range(1).selectExpr("'2017-01-22' as dateCol")
+ parse_result = df.select(functions.to_date(functions.col("dateCol"))).first()
+ self.assertEquals(date(2017, 1, 22), parse_result['to_date(dateCol)'])
+
@unittest.skipIf(sys.version_info < (3, 3), "Unittest < 3.3 doesn't support mocking")
def test_unbounded_frames(self):
from unittest.mock import patch