aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-05-23 08:30:05 -0700
committerYin Huai <yhuai@databricks.com>2015-05-23 08:30:05 -0700
commitefe3bfdf496aa6206ace2697e31dd4c0c3c824fb (patch)
treea6c0adbff3ff029c0e87ceff4180f6b3c99ea5ff /python
parentad0badba1450295982738934da2cc121cde18213 (diff)
downloadspark-efe3bfdf496aa6206ace2697e31dd4c0c3c824fb.tar.gz
spark-efe3bfdf496aa6206ace2697e31dd4c0c3c824fb.tar.bz2
spark-efe3bfdf496aa6206ace2697e31dd4c0c3c824fb.zip
[SPARK-7322, SPARK-7836, SPARK-7822][SQL] DataFrame window function related updates
1. ntile should take an integer as parameter. 2. Added Python API (based on #6364) 3. Update documentation of various DataFrame Python functions. Author: Davies Liu <davies@databricks.com> Author: Reynold Xin <rxin@databricks.com> Closes #6374 from rxin/window-final and squashes the following commits: 69004c7 [Reynold Xin] Style fix. 288cea9 [Reynold Xin] Update documentaiton. 7cb8985 [Reynold Xin] Merge pull request #6364 from davies/window 66092b4 [Davies Liu] update docs ed73cb4 [Reynold Xin] [SPARK-7322][SQL] Improve DataFrame window function documentation. ef55132 [Davies Liu] Merge branch 'master' of github.com:apache/spark into window4 8936ade [Davies Liu] fix maxint in python 3 2649358 [Davies Liu] update docs 778e2c0 [Davies Liu] SPARK-7836 and SPARK-7822: Python API of window functions
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/__init__.py25
-rw-r--r--python/pyspark/sql/column.py54
-rw-r--r--python/pyspark/sql/context.py2
-rw-r--r--python/pyspark/sql/dataframe.py2
-rw-r--r--python/pyspark/sql/functions.py147
-rw-r--r--python/pyspark/sql/group.py2
-rw-r--r--python/pyspark/sql/tests.py31
-rw-r--r--python/pyspark/sql/window.py158
8 files changed, 365 insertions, 56 deletions
diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py
index 66b0bff290..8fee92ae3a 100644
--- a/python/pyspark/sql/__init__.py
+++ b/python/pyspark/sql/__init__.py
@@ -18,26 +18,28 @@
"""
Important classes of Spark SQL and DataFrames:
- - L{SQLContext}
+ - :class:`pyspark.sql.SQLContext`
Main entry point for :class:`DataFrame` and SQL functionality.
- - L{DataFrame}
+ - :class:`pyspark.sql.DataFrame`
A distributed collection of data grouped into named columns.
- - L{Column}
+ - :class:`pyspark.sql.Column`
A column expression in a :class:`DataFrame`.
- - L{Row}
+ - :class:`pyspark.sql.Row`
A row of data in a :class:`DataFrame`.
- - L{HiveContext}
+ - :class:`pyspark.sql.HiveContext`
Main entry point for accessing data stored in Apache Hive.
- - L{GroupedData}
+ - :class:`pyspark.sql.GroupedData`
Aggregation methods, returned by :func:`DataFrame.groupBy`.
- - L{DataFrameNaFunctions}
+ - :class:`pyspark.sql.DataFrameNaFunctions`
Methods for handling missing data (null values).
- - L{DataFrameStatFunctions}
+ - :class:`pyspark.sql.DataFrameStatFunctions`
Methods for statistics functionality.
- - L{functions}
+ - :class:`pyspark.sql.functions`
List of built-in functions available for :class:`DataFrame`.
- - L{types}
+ - :class:`pyspark.sql.types`
List of data types available.
+ - :class:`pyspark.sql.Window`
+ For working with window functions.
"""
from __future__ import absolute_import
@@ -66,8 +68,9 @@ from pyspark.sql.column import Column
from pyspark.sql.dataframe import DataFrame, SchemaRDD, DataFrameNaFunctions, DataFrameStatFunctions
from pyspark.sql.group import GroupedData
from pyspark.sql.readwriter import DataFrameReader, DataFrameWriter
+from pyspark.sql.window import Window, WindowSpec
__all__ = [
'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row',
- 'DataFrameNaFunctions', 'DataFrameStatFunctions'
+ 'DataFrameNaFunctions', 'DataFrameStatFunctions', 'Window', 'WindowSpec',
]
diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py
index baf1ecbd0a..8dc5039f58 100644
--- a/python/pyspark/sql/column.py
+++ b/python/pyspark/sql/column.py
@@ -116,6 +116,8 @@ class Column(object):
df.colName + 1
1 / df.colName
+ .. note:: Experimental
+
.. versionadded:: 1.3
"""
@@ -164,8 +166,9 @@ class Column(object):
@since(1.3)
def getItem(self, key):
- """An expression that gets an item at position `ordinal` out of a list,
- or gets an item by key out of a dict.
+ """
+ An expression that gets an item at position ``ordinal`` out of a list,
+ or gets an item by key out of a dict.
>>> df = sc.parallelize([([1, 2], {"key": "value"})]).toDF(["l", "d"])
>>> df.select(df.l.getItem(0), df.d.getItem("key")).show()
@@ -185,7 +188,8 @@ class Column(object):
@since(1.3)
def getField(self, name):
- """An expression that gets a field by name in a StructField.
+ """
+ An expression that gets a field by name in a StructField.
>>> from pyspark.sql import Row
>>> df = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
@@ -219,7 +223,7 @@ class Column(object):
@since(1.3)
def substr(self, startPos, length):
"""
- Return a :class:`Column` which is a substring of the column
+ Return a :class:`Column` which is a substring of the column.
:param startPos: start position (int or Column)
:param length: length of the substring (int or Column)
@@ -242,7 +246,8 @@ class Column(object):
@ignore_unicode_prefix
@since(1.3)
def inSet(self, *cols):
- """ A boolean expression that is evaluated to true if the value of this
+ """
+ A boolean expression that is evaluated to true if the value of this
expression is contained by the evaluated values of the arguments.
>>> df[df.name.inSet("Bob", "Mike")].collect()
@@ -268,7 +273,8 @@ class Column(object):
@since(1.3)
def alias(self, *alias):
- """Returns this column aliased with a new name or names (in the case of expressions that
+ """
+ Returns this column aliased with a new name or names (in the case of expressions that
return more than one column, such as explode).
>>> df.select(df.age.alias("age2")).collect()
@@ -284,7 +290,7 @@ class Column(object):
@ignore_unicode_prefix
@since(1.3)
def cast(self, dataType):
- """ Convert the column into type `dataType`
+ """ Convert the column into type ``dataType``.
>>> df.select(df.age.cast("string").alias('ages')).collect()
[Row(ages=u'2'), Row(ages=u'5')]
@@ -304,25 +310,24 @@ class Column(object):
astype = cast
- @ignore_unicode_prefix
@since(1.3)
def between(self, lowerBound, upperBound):
- """ A boolean expression that is evaluated to true if the value of this
+ """
+ A boolean expression that is evaluated to true if the value of this
expression is between the given columns.
"""
return (self >= lowerBound) & (self <= upperBound)
- @ignore_unicode_prefix
@since(1.4)
def when(self, condition, value):
- """Evaluates a list of conditions and returns one of multiple possible result expressions.
+ """
+ Evaluates a list of conditions and returns one of multiple possible result expressions.
If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions.
See :func:`pyspark.sql.functions.when` for example usage.
:param condition: a boolean :class:`Column` expression.
:param value: a literal value, or a :class:`Column` expression.
-
"""
sc = SparkContext._active_spark_context
if not isinstance(condition, Column):
@@ -331,10 +336,10 @@ class Column(object):
jc = sc._jvm.functions.when(condition._jc, v)
return Column(jc)
- @ignore_unicode_prefix
@since(1.4)
def otherwise(self, value):
- """Evaluates a list of conditions and returns one of multiple possible result expressions.
+ """
+ Evaluates a list of conditions and returns one of multiple possible result expressions.
If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions.
See :func:`pyspark.sql.functions.when` for example usage.
@@ -345,6 +350,27 @@ class Column(object):
jc = self._jc.otherwise(value)
return Column(jc)
+ @since(1.4)
+ def over(self, window):
+ """
+ Define a windowing column.
+
+ :param window: a :class:`WindowSpec`
+ :return: a Column
+
+ >>> from pyspark.sql import Window
+ >>> window = Window.partitionBy("name").orderBy("age").rowsBetween(-1, 1)
+ >>> from pyspark.sql.functions import rank, min
+ >>> # df.select(rank().over(window), min('age').over(window))
+
+ .. note:: Window functions is only supported with HiveContext in 1.4
+ """
+ from pyspark.sql.window import WindowSpec
+ if not isinstance(window, WindowSpec):
+ raise TypeError("window should be WindowSpec")
+ jc = self._jc.over(window._jspec)
+ return Column(jc)
+
def __repr__(self):
return 'Column<%s>' % self._jc.toString().encode('utf8')
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 51f12c5bb4..22f6257dfe 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -585,8 +585,6 @@ class SQLContext(object):
Returns a :class:`DataFrameReader` that can be used to read data
in as a :class:`DataFrame`.
- .. note:: Experimental
-
>>> sqlContext.read
<pyspark.sql.readwriter.DataFrameReader object at ...>
"""
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 132db90e69..55cad8238e 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -62,6 +62,8 @@ class DataFrame(object):
people.filter(people.age > 30).join(department, people.deptId == department.id)) \
.groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
+ .. note:: Experimental
+
.. versionadded:: 1.3
"""
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 9b0d7f3e66..bbf465aca8 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -32,16 +32,21 @@ from pyspark.sql.column import Column, _to_java_column, _to_seq
__all__ = [
+ 'array',
'approxCountDistinct',
'coalesce',
'countDistinct',
+ 'explode',
'monotonicallyIncreasingId',
'rand',
'randn',
'sparkPartitionId',
+ 'struct',
'udf',
'when']
+__all__ += ['lag', 'lead', 'ntile']
+
def _create_function(name, doc=""):
""" Create a function for aggregator by name"""
@@ -67,6 +72,17 @@ def _create_binary_mathfunction(name, doc=""):
return _
+def _create_window_function(name, doc=''):
+ """ Create a window function by name """
+ def _():
+ sc = SparkContext._active_spark_context
+ jc = getattr(sc._jvm.functions, name)()
+ return Column(jc)
+ _.__name__ = name
+ _.__doc__ = 'Window function: ' + doc
+ return _
+
+
_functions = {
'lit': 'Creates a :class:`Column` of literal value.',
'col': 'Returns a :class:`Column` based on the given column name.',
@@ -130,15 +146,53 @@ _binary_mathfunctions = {
'pow': 'Returns the value of the first argument raised to the power of the second argument.'
}
+_window_functions = {
+ 'rowNumber':
+ """returns a sequential number starting at 1 within a window partition.
+
+ This is equivalent to the ROW_NUMBER function in SQL.""",
+ 'denseRank':
+ """returns the rank of rows within a window partition, without any gaps.
+
+ The difference between rank and denseRank is that denseRank leaves no gaps in ranking
+ sequence when there are ties. That is, if you were ranking a competition using denseRank
+ and had three people tie for second place, you would say that all three were in second
+ place and that the next person came in third.
+
+ This is equivalent to the DENSE_RANK function in SQL.""",
+ 'rank':
+ """returns the rank of rows within a window partition.
+
+ The difference between rank and denseRank is that denseRank leaves no gaps in ranking
+ sequence when there are ties. That is, if you were ranking a competition using denseRank
+ and had three people tie for second place, you would say that all three were in second
+ place and that the next person came in third.
+
+ This is equivalent to the RANK function in SQL.""",
+ 'cumeDist':
+ """returns the cumulative distribution of values within a window partition,
+ i.e. the fraction of rows that are below the current row.
+
+ This is equivalent to the CUME_DIST function in SQL.""",
+ 'percentRank':
+ """returns the relative rank (i.e. percentile) of rows within a window partition.
+
+ This is equivalent to the PERCENT_RANK function in SQL.""",
+}
+
for _name, _doc in _functions.items():
globals()[_name] = since(1.3)(_create_function(_name, _doc))
for _name, _doc in _functions_1_4.items():
globals()[_name] = since(1.4)(_create_function(_name, _doc))
for _name, _doc in _binary_mathfunctions.items():
globals()[_name] = since(1.4)(_create_binary_mathfunction(_name, _doc))
+for _name, _doc in _window_functions.items():
+ globals()[_name] = since(1.4)(_create_window_function(_name, _doc))
del _name, _doc
__all__ += _functions.keys()
+__all__ += _functions_1_4.keys()
__all__ += _binary_mathfunctions.keys()
+__all__ += _window_functions.keys()
__all__.sort()
@@ -177,27 +231,6 @@ def approxCountDistinct(col, rsd=None):
@since(1.4)
-def explode(col):
- """Returns a new row for each element in the given array or map.
-
- >>> from pyspark.sql import Row
- >>> eDF = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
- >>> eDF.select(explode(eDF.intlist).alias("anInt")).collect()
- [Row(anInt=1), Row(anInt=2), Row(anInt=3)]
-
- >>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
- +---+-----+
- |key|value|
- +---+-----+
- | a| b|
- +---+-----+
- """
- sc = SparkContext._active_spark_context
- jc = sc._jvm.functions.explode(_to_java_column(col))
- return Column(jc)
-
-
-@since(1.4)
def coalesce(*cols):
"""Returns the first column that is not null.
@@ -250,6 +283,27 @@ def countDistinct(col, *cols):
@since(1.4)
+def explode(col):
+ """Returns a new row for each element in the given array or map.
+
+ >>> from pyspark.sql import Row
+ >>> eDF = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
+ >>> eDF.select(explode(eDF.intlist).alias("anInt")).collect()
+ [Row(anInt=1), Row(anInt=2), Row(anInt=3)]
+
+ >>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
+ +---+-----+
+ |key|value|
+ +---+-----+
+ | a| b|
+ +---+-----+
+ """
+ sc = SparkContext._active_spark_context
+ jc = sc._jvm.functions.explode(_to_java_column(col))
+ return Column(jc)
+
+
+@since(1.4)
def monotonicallyIncreasingId():
"""A column that generates monotonically increasing 64-bit integers.
@@ -258,7 +312,7 @@ def monotonicallyIncreasingId():
within each partition in the lower 33 bits. The assumption is that the data frame has
less than 1 billion partitions, and each partition has less than 8 billion records.
- As an example, consider a [[DataFrame]] with two partitions, each with 3 records.
+ As an example, consider a :class:`DataFrame` with two partitions, each with 3 records.
This expression would return the following IDs:
0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
@@ -349,6 +403,55 @@ def when(condition, value):
return Column(jc)
+@since(1.4)
+def lag(col, count=1, default=None):
+ """
+ Window function: returns the value that is `offset` rows before the current row, and
+ `defaultValue` if there is less than `offset` rows before the current row. For example,
+ an `offset` of one will return the previous row at any given point in the window partition.
+
+ This is equivalent to the LAG function in SQL.
+
+ :param col: name of column or expression
+ :param count: number of row to extend
+ :param default: default value
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.lag(_to_java_column(col), count, default))
+
+
+@since(1.4)
+def lead(col, count=1, default=None):
+ """
+ Window function: returns the value that is `offset` rows after the current row, and
+ `defaultValue` if there is less than `offset` rows after the current row. For example,
+ an `offset` of one will return the next row at any given point in the window partition.
+
+ This is equivalent to the LEAD function in SQL.
+
+ :param col: name of column or expression
+ :param count: number of row to extend
+ :param default: default value
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.lead(_to_java_column(col), count, default))
+
+
+@since(1.4)
+def ntile(n):
+ """
+ Window function: returns a group id from 1 to `n` (inclusive) in a round-robin fashion in
+ a window partition. Fow example, if `n` is 3, the first row will get 1, the second row will
+ get 2, the third row will get 3, and the fourth row will get 1...
+
+ This is equivalent to the NTILE function in SQL.
+
+ :param n: an integer
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.ntile(int(n)))
+
+
class UserDefinedFunction(object):
"""
User defined function in Python
diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py
index 4da472a577..5a37a673ee 100644
--- a/python/pyspark/sql/group.py
+++ b/python/pyspark/sql/group.py
@@ -49,6 +49,8 @@ class GroupedData(object):
A set of methods for aggregations on a :class:`DataFrame`,
created by :func:`DataFrame.groupBy`.
+ .. note:: Experimental
+
.. versionadded:: 1.3
"""
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 7e34996241..5c53c3a8ed 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -44,6 +44,7 @@ from pyspark.sql.types import *
from pyspark.sql.types import UserDefinedType, _infer_type
from pyspark.tests import ReusedPySparkTestCase
from pyspark.sql.functions import UserDefinedFunction
+from pyspark.sql.window import Window
class ExamplePointUDT(UserDefinedType):
@@ -743,11 +744,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
try:
cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
except py4j.protocol.Py4JError:
- cls.sqlCtx = None
- return
+ raise unittest.SkipTest("Hive is not available")
except TypeError:
- cls.sqlCtx = None
- return
+ raise unittest.SkipTest("Hive is not available")
os.unlink(cls.tempdir.name)
_scala_HiveContext =\
cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
@@ -761,9 +760,6 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
shutil.rmtree(cls.tempdir.name, ignore_errors=True)
def test_save_and_load_table(self):
- if self.sqlCtx is None:
- return # no hive available, skipped
-
df = self.df
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
@@ -805,6 +801,27 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
+ def test_window_functions(self):
+ df = self.sqlCtx.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
+ w = Window.partitionBy("value").orderBy("key")
+ from pyspark.sql import functions as F
+ sel = df.select(df.value, df.key,
+ F.max("key").over(w.rowsBetween(0, 1)),
+ F.min("key").over(w.rowsBetween(0, 1)),
+ F.count("key").over(w.rowsBetween(float('-inf'), float('inf'))),
+ F.rowNumber().over(w),
+ F.rank().over(w),
+ F.denseRank().over(w),
+ F.ntile(2).over(w))
+ rs = sorted(sel.collect())
+ expected = [
+ ("1", 1, 1, 1, 1, 1, 1, 1, 1),
+ ("2", 1, 1, 1, 3, 1, 1, 1, 1),
+ ("2", 1, 2, 1, 3, 2, 1, 1, 1),
+ ("2", 2, 2, 2, 3, 3, 3, 2, 2)
+ ]
+ for r, ex in zip(rs, expected):
+ self.assertEqual(tuple(r), ex[:len(r)])
if __name__ == "__main__":
unittest.main()
diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py
new file mode 100644
index 0000000000..0a0e006bdf
--- /dev/null
+++ b/python/pyspark/sql/window.py
@@ -0,0 +1,158 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+from pyspark.sql import since
+from pyspark.sql.column import _to_seq, _to_java_column
+
+__all__ = ["Window", "WindowSpec"]
+
+
+def _to_java_cols(cols):
+ sc = SparkContext._active_spark_context
+ if len(cols) == 1 and isinstance(cols[0], list):
+ cols = cols[0]
+ return _to_seq(sc, cols, _to_java_column)
+
+
+class Window(object):
+
+ """
+ Utility functions for defining window in DataFrames.
+
+ For example:
+
+ >>> # PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+ >>> window = Window.partitionBy("country").orderBy("date").rowsBetween(-sys.maxsize, 0)
+
+ >>> # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
+ >>> window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)
+
+ .. note:: Experimental
+
+ .. versionadded:: 1.4
+ """
+ @staticmethod
+ @since(1.4)
+ def partitionBy(*cols):
+ """
+ Creates a :class:`WindowSpec` with the partitioning defined.
+ """
+ sc = SparkContext._active_spark_context
+ jspec = sc._jvm.org.apache.spark.sql.expressions.Window.partitionBy(_to_java_cols(cols))
+ return WindowSpec(jspec)
+
+ @staticmethod
+ @since(1.4)
+ def orderBy(*cols):
+ """
+ Creates a :class:`WindowSpec` with the partitioning defined.
+ """
+ sc = SparkContext._active_spark_context
+ jspec = sc._jvm.org.apache.spark.sql.expressions.Window.partitionBy(_to_java_cols(cols))
+ return WindowSpec(jspec)
+
+
+class WindowSpec(object):
+ """
+ A window specification that defines the partitioning, ordering,
+ and frame boundaries.
+
+ Use the static methods in :class:`Window` to create a :class:`WindowSpec`.
+
+ .. note:: Experimental
+
+ .. versionadded:: 1.4
+ """
+
+ _JAVA_MAX_LONG = (1 << 63) - 1
+ _JAVA_MIN_LONG = - (1 << 63)
+
+ def __init__(self, jspec):
+ self._jspec = jspec
+
+ @since(1.4)
+ def partitionBy(self, *cols):
+ """
+ Defines the partitioning columns in a :class:`WindowSpec`.
+
+ :param cols: names of columns or expressions
+ """
+ return WindowSpec(self._jspec.partitionBy(_to_java_cols(cols)))
+
+ @since(1.4)
+ def orderBy(self, *cols):
+ """
+ Defines the ordering columns in a :class:`WindowSpec`.
+
+ :param cols: names of columns or expressions
+ """
+ return WindowSpec(self._jspec.orderBy(_to_java_cols(cols)))
+
+ @since(1.4)
+ def rowsBetween(self, start, end):
+ """
+ Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
+
+ Both `start` and `end` are relative positions from the current row.
+ For example, "0" means "current row", while "-1" means the row before
+ the current row, and "5" means the fifth row after the current row.
+
+ :param start: boundary start, inclusive.
+ The frame is unbounded if this is ``-sys.maxsize`` (or lower).
+ :param end: boundary end, inclusive.
+ The frame is unbounded if this is ``sys.maxsize`` (or higher).
+ """
+ if start <= -sys.maxsize:
+ start = self._JAVA_MIN_LONG
+ if end >= sys.maxsize:
+ end = self._JAVA_MAX_LONG
+ return WindowSpec(self._jspec.rowsBetween(start, end))
+
+ @since(1.4)
+ def rangeBetween(self, start, end):
+ """
+ Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
+
+ Both `start` and `end` are relative from the current row. For example,
+ "0" means "current row", while "-1" means one off before the current row,
+ and "5" means the five off after the current row.
+
+ :param start: boundary start, inclusive.
+ The frame is unbounded if this is ``-sys.maxsize`` (or lower).
+ :param end: boundary end, inclusive.
+ The frame is unbounded if this is ``sys.maxsize`` (or higher).
+ """
+ if start <= -sys.maxsize:
+ start = self._JAVA_MIN_LONG
+ if end >= sys.maxsize:
+ end = self._JAVA_MAX_LONG
+ return WindowSpec(self._jspec.rangeBetween(start, end))
+
+
+def _test():
+ import doctest
+ SparkContext('local[4]', 'PythonTest')
+ (failure_count, test_count) = doctest.testmod()
+ if failure_count:
+ exit(-1)
+
+
+if __name__ == "__main__":
+ _test()