aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/utils.py
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-04-28 15:22:28 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-04-28 15:22:28 -0700
commit78c8aaf849aadbb065730959e7c1b70bb58d69c9 (patch)
tree8795a2ad1c070ec852809d5f6e81c5bbd1c3afd8 /python/pyspark/sql/utils.py
parentd584a2b8ac57eff3bf230c760e5bda205c6ea747 (diff)
downloadspark-78c8aaf849aadbb065730959e7c1b70bb58d69c9.tar.gz
spark-78c8aaf849aadbb065730959e7c1b70bb58d69c9.tar.bz2
spark-78c8aaf849aadbb065730959e7c1b70bb58d69c9.zip
[SPARK-14555] Second cut of Python API for Structured Streaming
## What changes were proposed in this pull request? This PR adds Python APIs for: - `ContinuousQueryManager` - `ContinuousQueryException` The `ContinuousQueryException` is a very basic wrapper, it doesn't provide the functionality that the Scala side provides, but it follows the same pattern for `AnalysisException`. For `ContinuousQueryManager`, all APIs are provided except for registering listeners. This PR also attempts to fix test flakiness by stopping all active streams just before tests. ## How was this patch tested? Python Doc tests and unit tests Author: Burak Yavuz <brkyvz@gmail.com> Closes #12673 from brkyvz/pyspark-cqm.
Diffstat (limited to 'python/pyspark/sql/utils.py')
-rw-r--r--python/pyspark/sql/utils.py8
1 files changed, 8 insertions, 0 deletions
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index 7ea0e0d5c9..cb172d21f3 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -45,6 +45,12 @@ class IllegalArgumentException(CapturedException):
"""
+class ContinuousQueryException(CapturedException):
+ """
+ Exception that stopped a :class:`ContinuousQuery`.
+ """
+
+
def capture_sql_exception(f):
def deco(*a, **kw):
try:
@@ -57,6 +63,8 @@ def capture_sql_exception(f):
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
raise ParseException(s.split(': ', 1)[1], stackTrace)
+ if s.startswith('org.apache.spark.sql.ContinuousQueryException: '):
+ raise ContinuousQueryException(s.split(': ', 1)[1], stackTrace)
if s.startswith('java.lang.IllegalArgumentException: '):
raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
raise