aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzero323 <zero323@users.noreply.github.com>2016-12-02 17:39:28 -0800
committerReynold Xin <rxin@databricks.com>2016-12-02 17:39:28 -0800
commita9cbfc4f6a8db936215fcf64697d5b65f13f666e (patch)
tree4ea20839c22b2297b889421a36b62da4dbea66e6
parent2dc0d7efe3380a5763cb69ef346674a46f8e3d57 (diff)
downloadspark-a9cbfc4f6a8db936215fcf64697d5b65f13f666e.tar.gz
spark-a9cbfc4f6a8db936215fcf64697d5b65f13f666e.tar.bz2
spark-a9cbfc4f6a8db936215fcf64697d5b65f13f666e.zip
[SPARK-18690][PYTHON][SQL] Backward compatibility of unbounded frames
## What changes were proposed in this pull request? Makes `Window.unboundedPreceding` and `Window.unboundedFollowing` backward compatible. ## How was this patch tested? Pyspark SQL unittests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: zero323 <zero323@users.noreply.github.com> Closes #16123 from zero323/SPARK-17845-follow-up.
-rw-r--r--python/pyspark/sql/tests.py35
-rw-r--r--python/pyspark/sql/window.py30
2 files changed, 51 insertions, 14 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index b7b2a5923c..0aff9cebe9 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1980,6 +1980,41 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
# Regression test for SPARK-17514: limit(n).collect() should the perform same as take(n)
assert_runs_only_one_job_stage_and_task("collect_limit", lambda: df.limit(1).collect())
+ @unittest.skipIf(sys.version_info < (3, 3), "Unittest < 3.3 doesn't support mocking")
+ def test_unbounded_frames(self):
+ from unittest.mock import patch
+ from pyspark.sql import functions as F
+ from pyspark.sql import window
+ import importlib
+
+ df = self.spark.range(0, 3)
+
+ def rows_frame_match():
+ return "ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING" in df.select(
+ F.count("*").over(window.Window.rowsBetween(-sys.maxsize, sys.maxsize))
+ ).columns[0]
+
+ def range_frame_match():
+ return "RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING" in df.select(
+ F.count("*").over(window.Window.rangeBetween(-sys.maxsize, sys.maxsize))
+ ).columns[0]
+
+ with patch("sys.maxsize", 2 ** 31 - 1):
+ importlib.reload(window)
+ self.assertTrue(rows_frame_match())
+ self.assertTrue(range_frame_match())
+
+ with patch("sys.maxsize", 2 ** 63 - 1):
+ importlib.reload(window)
+ self.assertTrue(rows_frame_match())
+ self.assertTrue(range_frame_match())
+
+ with patch("sys.maxsize", 2 ** 127 - 1):
+ importlib.reload(window)
+ self.assertTrue(rows_frame_match())
+ self.assertTrue(range_frame_match())
+
+ importlib.reload(window)
if __name__ == "__main__":
from pyspark.sql.tests import *
diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py
index c345e623f1..7ce27f9b10 100644
--- a/python/pyspark/sql/window.py
+++ b/python/pyspark/sql/window.py
@@ -49,6 +49,8 @@ class Window(object):
_JAVA_MIN_LONG = -(1 << 63) # -9223372036854775808
_JAVA_MAX_LONG = (1 << 63) - 1 # 9223372036854775807
+ _PRECEDING_THRESHOLD = max(-sys.maxsize, _JAVA_MIN_LONG)
+ _FOLLOWING_THRESHOLD = min(sys.maxsize, _JAVA_MAX_LONG)
unboundedPreceding = _JAVA_MIN_LONG
@@ -98,9 +100,9 @@ class Window(object):
The frame is unbounded if this is ``Window.unboundedFollowing``, or
any value greater than or equal to 9223372036854775807.
"""
- if start <= Window._JAVA_MIN_LONG:
+ if start <= Window._PRECEDING_THRESHOLD:
start = Window.unboundedPreceding
- if end >= Window._JAVA_MAX_LONG:
+ if end >= Window._FOLLOWING_THRESHOLD:
end = Window.unboundedFollowing
sc = SparkContext._active_spark_context
jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rowsBetween(start, end)
@@ -123,14 +125,14 @@ class Window(object):
:param start: boundary start, inclusive.
The frame is unbounded if this is ``Window.unboundedPreceding``, or
- any value less than or equal to -9223372036854775808.
+ any value less than or equal to max(-sys.maxsize, -9223372036854775808).
:param end: boundary end, inclusive.
The frame is unbounded if this is ``Window.unboundedFollowing``, or
- any value greater than or equal to 9223372036854775807.
+ any value greater than or equal to min(sys.maxsize, 9223372036854775807).
"""
- if start <= Window._JAVA_MIN_LONG:
+ if start <= Window._PRECEDING_THRESHOLD:
start = Window.unboundedPreceding
- if end >= Window._JAVA_MAX_LONG:
+ if end >= Window._FOLLOWING_THRESHOLD:
end = Window.unboundedFollowing
sc = SparkContext._active_spark_context
jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rangeBetween(start, end)
@@ -185,14 +187,14 @@ class WindowSpec(object):
:param start: boundary start, inclusive.
The frame is unbounded if this is ``Window.unboundedPreceding``, or
- any value less than or equal to -9223372036854775808.
+ any value less than or equal to max(-sys.maxsize, -9223372036854775808).
:param end: boundary end, inclusive.
The frame is unbounded if this is ``Window.unboundedFollowing``, or
- any value greater than or equal to 9223372036854775807.
+ any value greater than or equal to min(sys.maxsize, 9223372036854775807).
"""
- if start <= Window._JAVA_MIN_LONG:
+ if start <= Window._PRECEDING_THRESHOLD:
start = Window.unboundedPreceding
- if end >= Window._JAVA_MAX_LONG:
+ if end >= Window._FOLLOWING_THRESHOLD:
end = Window.unboundedFollowing
return WindowSpec(self._jspec.rowsBetween(start, end))
@@ -211,14 +213,14 @@ class WindowSpec(object):
:param start: boundary start, inclusive.
The frame is unbounded if this is ``Window.unboundedPreceding``, or
- any value less than or equal to -9223372036854775808.
+ any value less than or equal to max(-sys.maxsize, -9223372036854775808).
:param end: boundary end, inclusive.
The frame is unbounded if this is ``Window.unboundedFollowing``, or
- any value greater than or equal to 9223372036854775807.
+ any value greater than or equal to min(sys.maxsize, 9223372036854775807).
"""
- if start <= Window._JAVA_MIN_LONG:
+ if start <= Window._PRECEDING_THRESHOLD:
start = Window.unboundedPreceding
- if end >= Window._JAVA_MAX_LONG:
+ if end >= Window._FOLLOWING_THRESHOLD:
end = Window.unboundedFollowing
return WindowSpec(self._jspec.rangeBetween(start, end))