diff options
author | Davies Liu <davies@databricks.com> | 2015-05-23 08:30:05 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-05-23 08:30:05 -0700 |
commit | efe3bfdf496aa6206ace2697e31dd4c0c3c824fb (patch) | |
tree | a6c0adbff3ff029c0e87ceff4180f6b3c99ea5ff /python/pyspark/sql/tests.py | |
parent | ad0badba1450295982738934da2cc121cde18213 (diff) | |
download | spark-efe3bfdf496aa6206ace2697e31dd4c0c3c824fb.tar.gz spark-efe3bfdf496aa6206ace2697e31dd4c0c3c824fb.tar.bz2 spark-efe3bfdf496aa6206ace2697e31dd4c0c3c824fb.zip |
[SPARK-7322, SPARK-7836, SPARK-7822][SQL] DataFrame window function related updates
1. ntile should take an integer as parameter.
2. Added Python API (based on #6364)
3. Update documentation of various DataFrame Python functions.
Author: Davies Liu <davies@databricks.com>
Author: Reynold Xin <rxin@databricks.com>
Closes #6374 from rxin/window-final and squashes the following commits:
69004c7 [Reynold Xin] Style fix.
288cea9 [Reynold Xin] Update documentaiton.
7cb8985 [Reynold Xin] Merge pull request #6364 from davies/window
66092b4 [Davies Liu] update docs
ed73cb4 [Reynold Xin] [SPARK-7322][SQL] Improve DataFrame window function documentation.
ef55132 [Davies Liu] Merge branch 'master' of github.com:apache/spark into window4
8936ade [Davies Liu] fix maxint in python 3
2649358 [Davies Liu] update docs
778e2c0 [Davies Liu] SPARK-7836 and SPARK-7822: Python API of window functions
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r-- | python/pyspark/sql/tests.py | 31 |
1 files changed, 24 insertions, 7 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 7e34996241..5c53c3a8ed 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -44,6 +44,7 @@ from pyspark.sql.types import * from pyspark.sql.types import UserDefinedType, _infer_type from pyspark.tests import ReusedPySparkTestCase from pyspark.sql.functions import UserDefinedFunction +from pyspark.sql.window import Window class ExamplePointUDT(UserDefinedType): @@ -743,11 +744,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase): try: cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf() except py4j.protocol.Py4JError: - cls.sqlCtx = None - return + raise unittest.SkipTest("Hive is not available") except TypeError: - cls.sqlCtx = None - return + raise unittest.SkipTest("Hive is not available") os.unlink(cls.tempdir.name) _scala_HiveContext =\ cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc()) @@ -761,9 +760,6 @@ class HiveContextSQLTests(ReusedPySparkTestCase): shutil.rmtree(cls.tempdir.name, ignore_errors=True) def test_save_and_load_table(self): - if self.sqlCtx is None: - return # no hive available, skipped - df = self.df tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) @@ -805,6 +801,27 @@ class HiveContextSQLTests(ReusedPySparkTestCase): shutil.rmtree(tmpPath) + def test_window_functions(self): + df = self.sqlCtx.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"]) + w = Window.partitionBy("value").orderBy("key") + from pyspark.sql import functions as F + sel = df.select(df.value, df.key, + F.max("key").over(w.rowsBetween(0, 1)), + F.min("key").over(w.rowsBetween(0, 1)), + F.count("key").over(w.rowsBetween(float('-inf'), float('inf'))), + F.rowNumber().over(w), + F.rank().over(w), + F.denseRank().over(w), + F.ntile(2).over(w)) + rs = sorted(sel.collect()) + expected = [ + ("1", 1, 1, 1, 1, 1, 1, 1, 1), + ("2", 1, 1, 1, 3, 1, 1, 1, 1), + ("2", 1, 2, 1, 3, 2, 1, 1, 1), + ("2", 2, 2, 2, 3, 3, 3, 2, 2) + ] + for r, ex in zip(rs, expected): + self.assertEqual(tuple(r), ex[:len(r)]) if __name__ == "__main__": unittest.main() |