aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-10-12 16:45:10 -0700
committerDavies Liu <davies.liu@gmail.com>2016-10-12 16:45:10 -0700
commit6f20a92ca30f9c367009c4556939ea4de4284cb9 (patch)
treed06828b56cee79054bd0b0a355490e76c30a5e89 /python/pyspark/sql
parentf9a56a153e0579283160519065c7f3620d12da3e (diff)
downloadspark-6f20a92ca30f9c367009c4556939ea4de4284cb9.tar.gz
spark-6f20a92ca30f9c367009c4556939ea4de4284cb9.tar.bz2
spark-6f20a92ca30f9c367009c4556939ea4de4284cb9.zip
[SPARK-17845] [SQL] More self-evident window function frame boundary API
## What changes were proposed in this pull request? This patch improves the window function frame boundary API to make it more obvious to read and to use. The two high level changes are: 1. Create Window.currentRow, Window.unboundedPreceding, Window.unboundedFollowing to indicate the special values in frame boundaries. These methods map to the special integral values so we are not breaking backward compatibility here. This change makes the frame boundaries more self-evident (instead of Long.MinValue, it becomes Window.unboundedPreceding). 2. In Python, for any value less than or equal to JVM's Long.MinValue, treat it as Window.unboundedPreceding. For any value larger than or equal to JVM's Long.MaxValue, treat it as Window.unboundedFollowing. Before this change, if the user specifies any value that is less than Long.MinValue but not -sys.maxsize (e.g. -sys.maxsize + 1), the number we pass over to the JVM would overflow, resulting in a frame that does not make sense. Code example required to specify a frame before this patch: ``` Window.rowsBetween(-Long.MinValue, 0) ``` While the above code should still work, the new way is more obvious to read: ``` Window.rowsBetween(Window.unboundedPreceding, Window.currentRow) ``` ## How was this patch tested? - Updated DataFrameWindowSuite (for Scala/Java) - Updated test_window_functions_cumulative_sum (for Python) - Renamed DataFrameWindowSuite DataFrameWindowFunctionsSuite to better reflect its purpose Author: Reynold Xin <rxin@databricks.com> Closes #15438 from rxin/SPARK-17845.
Diffstat (limited to 'python/pyspark/sql')
-rw-r--r--python/pyspark/sql/tests.py25
-rw-r--r--python/pyspark/sql/window.py89
2 files changed, 84 insertions, 30 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 61674a8a7e..51d5e7ab05 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1876,12 +1876,35 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
def test_window_functions_cumulative_sum(self):
df = self.spark.createDataFrame([("one", 1), ("two", 2)], ["key", "value"])
from pyspark.sql import functions as F
- sel = df.select(df.key, F.sum(df.value).over(Window.rowsBetween(-sys.maxsize, 0)))
+
+ # Test cumulative sum
+ sel = df.select(
+ df.key,
+ F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding, 0)))
+ rs = sorted(sel.collect())
+ expected = [("one", 1), ("two", 3)]
+ for r, ex in zip(rs, expected):
+ self.assertEqual(tuple(r), ex[:len(r)])
+
+ # Test boundary values less than JVM's Long.MinValue and make sure we don't overflow
+ sel = df.select(
+ df.key,
+ F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding - 1, 0)))
rs = sorted(sel.collect())
expected = [("one", 1), ("two", 3)]
for r, ex in zip(rs, expected):
self.assertEqual(tuple(r), ex[:len(r)])
+ # Test boundary values greater than JVM's Long.MaxValue and make sure we don't overflow
+ frame_end = Window.unboundedFollowing + 1
+ sel = df.select(
+ df.key,
+ F.sum(df.value).over(Window.rowsBetween(Window.currentRow, frame_end)))
+ rs = sorted(sel.collect())
+ expected = [("one", 3), ("two", 2)]
+ for r, ex in zip(rs, expected):
+ self.assertEqual(tuple(r), ex[:len(r)])
+
def test_collect_functions(self):
df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
from pyspark.sql import functions
diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py
index 87e9a98898..c345e623f1 100644
--- a/python/pyspark/sql/window.py
+++ b/python/pyspark/sql/window.py
@@ -36,8 +36,8 @@ class Window(object):
For example:
- >>> # PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
- >>> window = Window.partitionBy("country").orderBy("date").rowsBetween(-sys.maxsize, 0)
+ >>> # ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+ >>> window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
>>> # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
>>> window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)
@@ -46,6 +46,16 @@ class Window(object):
.. versionadded:: 1.4
"""
+
+ _JAVA_MIN_LONG = -(1 << 63) # -9223372036854775808
+ _JAVA_MAX_LONG = (1 << 63) - 1 # 9223372036854775807
+
+ unboundedPreceding = _JAVA_MIN_LONG
+
+ unboundedFollowing = _JAVA_MAX_LONG
+
+ currentRow = 0
+
@staticmethod
@since(1.4)
def partitionBy(*cols):
@@ -77,15 +87,21 @@ class Window(object):
For example, "0" means "current row", while "-1" means the row before
the current row, and "5" means the fifth row after the current row.
+ We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
+ and ``Window.currentRow`` to specify special boundary values, rather than using integral
+ values directly.
+
:param start: boundary start, inclusive.
- The frame is unbounded if this is ``-sys.maxsize`` (or lower).
+ The frame is unbounded if this is ``Window.unboundedPreceding``, or
+ any value less than or equal to -9223372036854775808.
:param end: boundary end, inclusive.
- The frame is unbounded if this is ``sys.maxsize`` (or higher).
+ The frame is unbounded if this is ``Window.unboundedFollowing``, or
+ any value greater than or equal to 9223372036854775807.
"""
- if start <= -sys.maxsize:
- start = WindowSpec._JAVA_MIN_LONG
- if end >= sys.maxsize:
- end = WindowSpec._JAVA_MAX_LONG
+ if start <= Window._JAVA_MIN_LONG:
+ start = Window.unboundedPreceding
+ if end >= Window._JAVA_MAX_LONG:
+ end = Window.unboundedFollowing
sc = SparkContext._active_spark_context
jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rowsBetween(start, end)
return WindowSpec(jspec)
@@ -101,15 +117,21 @@ class Window(object):
"0" means "current row", while "-1" means one off before the current row,
and "5" means the five off after the current row.
+ We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
+ and ``Window.currentRow`` to specify special boundary values, rather than using integral
+ values directly.
+
:param start: boundary start, inclusive.
- The frame is unbounded if this is ``-sys.maxsize`` (or lower).
+ The frame is unbounded if this is ``Window.unboundedPreceding``, or
+ any value less than or equal to -9223372036854775808.
:param end: boundary end, inclusive.
- The frame is unbounded if this is ``sys.maxsize`` (or higher).
+ The frame is unbounded if this is ``Window.unboundedFollowing``, or
+ any value greater than or equal to 9223372036854775807.
"""
- if start <= -sys.maxsize:
- start = WindowSpec._JAVA_MIN_LONG
- if end >= sys.maxsize:
- end = WindowSpec._JAVA_MAX_LONG
+ if start <= Window._JAVA_MIN_LONG:
+ start = Window.unboundedPreceding
+ if end >= Window._JAVA_MAX_LONG:
+ end = Window.unboundedFollowing
sc = SparkContext._active_spark_context
jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rangeBetween(start, end)
return WindowSpec(jspec)
@@ -127,9 +149,6 @@ class WindowSpec(object):
.. versionadded:: 1.4
"""
- _JAVA_MAX_LONG = (1 << 63) - 1
- _JAVA_MIN_LONG = - (1 << 63)
-
def __init__(self, jspec):
self._jspec = jspec
@@ -160,15 +179,21 @@ class WindowSpec(object):
For example, "0" means "current row", while "-1" means the row before
the current row, and "5" means the fifth row after the current row.
+ We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
+ and ``Window.currentRow`` to specify special boundary values, rather than using integral
+ values directly.
+
:param start: boundary start, inclusive.
- The frame is unbounded if this is ``-sys.maxsize`` (or lower).
+ The frame is unbounded if this is ``Window.unboundedPreceding``, or
+ any value less than or equal to -9223372036854775808.
:param end: boundary end, inclusive.
- The frame is unbounded if this is ``sys.maxsize`` (or higher).
+ The frame is unbounded if this is ``Window.unboundedFollowing``, or
+ any value greater than or equal to 9223372036854775807.
"""
- if start <= -sys.maxsize:
- start = self._JAVA_MIN_LONG
- if end >= sys.maxsize:
- end = self._JAVA_MAX_LONG
+ if start <= Window._JAVA_MIN_LONG:
+ start = Window.unboundedPreceding
+ if end >= Window._JAVA_MAX_LONG:
+ end = Window.unboundedFollowing
return WindowSpec(self._jspec.rowsBetween(start, end))
@since(1.4)
@@ -180,15 +205,21 @@ class WindowSpec(object):
"0" means "current row", while "-1" means one off before the current row,
and "5" means the five off after the current row.
+ We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
+ and ``Window.currentRow`` to specify special boundary values, rather than using integral
+ values directly.
+
:param start: boundary start, inclusive.
- The frame is unbounded if this is ``-sys.maxsize`` (or lower).
+ The frame is unbounded if this is ``Window.unboundedPreceding``, or
+ any value less than or equal to -9223372036854775808.
:param end: boundary end, inclusive.
- The frame is unbounded if this is ``sys.maxsize`` (or higher).
+ The frame is unbounded if this is ``Window.unboundedFollowing``, or
+ any value greater than or equal to 9223372036854775807.
"""
- if start <= -sys.maxsize:
- start = self._JAVA_MIN_LONG
- if end >= sys.maxsize:
- end = self._JAVA_MAX_LONG
+ if start <= Window._JAVA_MIN_LONG:
+ start = Window.unboundedPreceding
+ if end >= Window._JAVA_MAX_LONG:
+ end = Window.unboundedFollowing
return WindowSpec(self._jspec.rangeBetween(start, end))