aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-10-10 22:33:20 -0700
committerHerman van Hovell <hvanhovell@databricks.com>2016-10-10 22:33:20 -0700
commitb515768f2668749ad37a3bdf9d265ce45ec447b1 (patch)
tree80dee4fe6f00f61241947a8a342f1f579e102244 /python/pyspark/sql
parent0c0ad436ad909364915b910867d08262c62bc95d (diff)
downloadspark-b515768f2668749ad37a3bdf9d265ce45ec447b1.tar.gz
spark-b515768f2668749ad37a3bdf9d265ce45ec447b1.tar.bz2
spark-b515768f2668749ad37a3bdf9d265ce45ec447b1.zip
[SPARK-17844] Simplify DataFrame API for defining frame boundaries in window functions
## What changes were proposed in this pull request? When I was creating the example code for SPARK-10496, I realized it was pretty convoluted to define the frame boundaries for window functions when there is no partition column or ordering column. The reason is that we don't provide a way to create a WindowSpec directly with the frame boundaries. We can trivially improve this by adding rowsBetween and rangeBetween to Window object. As an example, to compute cumulative sum using the natural ordering, before this pr: ``` df.select('key, sum("value").over(Window.partitionBy(lit(1)).rowsBetween(Long.MinValue, 0))) ``` After this pr: ``` df.select('key, sum("value").over(Window.rowsBetween(Long.MinValue, 0))) ``` Note that you could argue there is no point specifying a window frame without partitionBy/orderBy -- but it is strange that only rowsBetween and rangeBetween are not the only two APIs not available. This also fixes https://issues.apache.org/jira/browse/SPARK-17656 (removing _root_.scala). ## How was this patch tested? Added test cases to compute cumulative sum in DataFrameWindowSuite for Scala/Java and tests.py for Python. Author: Reynold Xin <rxin@databricks.com> Closes #15412 from rxin/SPARK-17844.
Diffstat (limited to 'python/pyspark/sql')
-rw-r--r--python/pyspark/sql/tests.py9
-rw-r--r--python/pyspark/sql/window.py48
2 files changed, 57 insertions, 0 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index a9e455565a..7b6f9f0ef1 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1859,6 +1859,15 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
for r, ex in zip(rs, expected):
self.assertEqual(tuple(r), ex[:len(r)])
+ def test_window_functions_cumulative_sum(self):
+ df = self.spark.createDataFrame([("one", 1), ("two", 2)], ["key", "value"])
+ from pyspark.sql import functions as F
+ sel = df.select(df.key, F.sum(df.value).over(Window.rowsBetween(-sys.maxsize, 0)))
+ rs = sorted(sel.collect())
+ expected = [("one", 1), ("two", 3)]
+ for r, ex in zip(rs, expected):
+ self.assertEqual(tuple(r), ex[:len(r)])
+
def test_collect_functions(self):
df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
from pyspark.sql import functions
diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py
index 46663f69a0..87e9a98898 100644
--- a/python/pyspark/sql/window.py
+++ b/python/pyspark/sql/window.py
@@ -66,6 +66,54 @@ class Window(object):
jspec = sc._jvm.org.apache.spark.sql.expressions.Window.orderBy(_to_java_cols(cols))
return WindowSpec(jspec)
+ @staticmethod
+ @since(2.1)
+ def rowsBetween(start, end):
+ """
+ Creates a :class:`WindowSpec` with the frame boundaries defined,
+ from `start` (inclusive) to `end` (inclusive).
+
+ Both `start` and `end` are relative positions from the current row.
+ For example, "0" means "current row", while "-1" means the row before
+ the current row, and "5" means the fifth row after the current row.
+
+ :param start: boundary start, inclusive.
+ The frame is unbounded if this is ``-sys.maxsize`` (or lower).
+ :param end: boundary end, inclusive.
+ The frame is unbounded if this is ``sys.maxsize`` (or higher).
+ """
+ if start <= -sys.maxsize:
+ start = WindowSpec._JAVA_MIN_LONG
+ if end >= sys.maxsize:
+ end = WindowSpec._JAVA_MAX_LONG
+ sc = SparkContext._active_spark_context
+ jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rowsBetween(start, end)
+ return WindowSpec(jspec)
+
+ @staticmethod
+ @since(2.1)
+ def rangeBetween(start, end):
+ """
+ Creates a :class:`WindowSpec` with the frame boundaries defined,
+ from `start` (inclusive) to `end` (inclusive).
+
+ Both `start` and `end` are relative from the current row. For example,
+ "0" means "current row", while "-1" means one off before the current row,
+ and "5" means the five off after the current row.
+
+ :param start: boundary start, inclusive.
+ The frame is unbounded if this is ``-sys.maxsize`` (or lower).
+ :param end: boundary end, inclusive.
+ The frame is unbounded if this is ``sys.maxsize`` (or higher).
+ """
+ if start <= -sys.maxsize:
+ start = WindowSpec._JAVA_MIN_LONG
+ if end >= sys.maxsize:
+ end = WindowSpec._JAVA_MAX_LONG
+ sc = SparkContext._active_spark_context
+ jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rangeBetween(start, end)
+ return WindowSpec(jspec)
+
class WindowSpec(object):
"""