aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/tests.py
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-14 13:36:41 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-12-14 13:36:41 -0800
commit1ac6567bdb03d7cc5c5f3473827a102280cb1030 (patch)
treebe8018d4e321c590298727fdc73369c3dfcbbd71 /python/pyspark/sql/tests.py
parent5d799473696a15fddd54ec71a93b6f8cb169810c (diff)
downloadspark-1ac6567bdb03d7cc5c5f3473827a102280cb1030.tar.gz
spark-1ac6567bdb03d7cc5c5f3473827a102280cb1030.tar.bz2
spark-1ac6567bdb03d7cc5c5f3473827a102280cb1030.zip
[SPARK-18852][SS] StreamingQuery.lastProgress should be null when recentProgress is empty
## What changes were proposed in this pull request? Right now `StreamingQuery.lastProgress` throws NoSuchElementException and it's hard to be used in Python since Python user will just see Py4jError. This PR just makes it return null instead. ## How was this patch tested? `test("lastProgress should be null when recentProgress is empty")` Author: Shixiong Zhu <shixiong@databricks.com> Closes #16273 from zsxwing/SPARK-18852.
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r--python/pyspark/sql/tests.py18
1 files changed, 17 insertions, 1 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index af7d52cdac..6ddd804eec 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1128,9 +1128,25 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- q = df.writeStream \
+
+ def func(x):
+ time.sleep(1)
+ return x
+
+ from pyspark.sql.functions import col, udf
+ sleep_udf = udf(func)
+
+ # Use "sleep_udf" to delay the progress update so that we can test `lastProgress` when there
+ # were no updates.
+ q = df.select(sleep_udf(col("value")).alias('value')).writeStream \
.start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
+ # "lastProgress" will return None in most cases. However, as it may be flaky when
+ # Jenkins is very slow, we don't assert it. If there is something wrong, "lastProgress"
+ # may throw error with a high chance and make this test flaky, so we should still be
+ # able to detect broken codes.
+ q.lastProgress
+
q.processAllAvailable()
lastProgress = q.lastProgress
recentProgress = q.recentProgress