aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/tests.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-05-23 08:30:05 -0700
committerYin Huai <yhuai@databricks.com>2015-05-23 08:30:05 -0700
commitefe3bfdf496aa6206ace2697e31dd4c0c3c824fb (patch)
treea6c0adbff3ff029c0e87ceff4180f6b3c99ea5ff /python/pyspark/sql/tests.py
parentad0badba1450295982738934da2cc121cde18213 (diff)
downloadspark-efe3bfdf496aa6206ace2697e31dd4c0c3c824fb.tar.gz
spark-efe3bfdf496aa6206ace2697e31dd4c0c3c824fb.tar.bz2
spark-efe3bfdf496aa6206ace2697e31dd4c0c3c824fb.zip
[SPARK-7322, SPARK-7836, SPARK-7822][SQL] DataFrame window function related updates
1. ntile should take an integer as parameter. 2. Added Python API (based on #6364) 3. Update documentation of various DataFrame Python functions. Author: Davies Liu <davies@databricks.com> Author: Reynold Xin <rxin@databricks.com> Closes #6374 from rxin/window-final and squashes the following commits: 69004c7 [Reynold Xin] Style fix. 288cea9 [Reynold Xin] Update documentaiton. 7cb8985 [Reynold Xin] Merge pull request #6364 from davies/window 66092b4 [Davies Liu] update docs ed73cb4 [Reynold Xin] [SPARK-7322][SQL] Improve DataFrame window function documentation. ef55132 [Davies Liu] Merge branch 'master' of github.com:apache/spark into window4 8936ade [Davies Liu] fix maxint in python 3 2649358 [Davies Liu] update docs 778e2c0 [Davies Liu] SPARK-7836 and SPARK-7822: Python API of window functions
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r--python/pyspark/sql/tests.py31
1 files changed, 24 insertions, 7 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 7e34996241..5c53c3a8ed 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -44,6 +44,7 @@ from pyspark.sql.types import *
from pyspark.sql.types import UserDefinedType, _infer_type
from pyspark.tests import ReusedPySparkTestCase
from pyspark.sql.functions import UserDefinedFunction
+from pyspark.sql.window import Window
class ExamplePointUDT(UserDefinedType):
@@ -743,11 +744,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
try:
cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
except py4j.protocol.Py4JError:
- cls.sqlCtx = None
- return
+ raise unittest.SkipTest("Hive is not available")
except TypeError:
- cls.sqlCtx = None
- return
+ raise unittest.SkipTest("Hive is not available")
os.unlink(cls.tempdir.name)
_scala_HiveContext =\
cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
@@ -761,9 +760,6 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
shutil.rmtree(cls.tempdir.name, ignore_errors=True)
def test_save_and_load_table(self):
- if self.sqlCtx is None:
- return # no hive available, skipped
-
df = self.df
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
@@ -805,6 +801,27 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
+ def test_window_functions(self):
+ df = self.sqlCtx.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
+ w = Window.partitionBy("value").orderBy("key")
+ from pyspark.sql import functions as F
+ sel = df.select(df.value, df.key,
+ F.max("key").over(w.rowsBetween(0, 1)),
+ F.min("key").over(w.rowsBetween(0, 1)),
+ F.count("key").over(w.rowsBetween(float('-inf'), float('inf'))),
+ F.rowNumber().over(w),
+ F.rank().over(w),
+ F.denseRank().over(w),
+ F.ntile(2).over(w))
+ rs = sorted(sel.collect())
+ expected = [
+ ("1", 1, 1, 1, 1, 1, 1, 1, 1),
+ ("2", 1, 1, 1, 3, 1, 1, 1, 1),
+ ("2", 1, 2, 1, 3, 2, 1, 1, 1),
+ ("2", 2, 2, 2, 3, 3, 3, 2, 2)
+ ]
+ for r, ex in zip(rs, expected):
+ self.assertEqual(tuple(r), ex[:len(r)])
if __name__ == "__main__":
unittest.main()