aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/streaming.py
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-04-20 10:32:01 -0700
committerMichael Armbrust <michael@databricks.com>2016-04-20 10:32:01 -0700
commit80bf48f437939ddc3bb82c8c7530c8ae419f8427 (patch)
treee9be7bd9acac75d677eaad8ba69c84890d4913d3 /python/pyspark/sql/streaming.py
parent834277884fcdaab4758604272881ffb2369e25f0 (diff)
downloadspark-80bf48f437939ddc3bb82c8c7530c8ae419f8427.tar.gz
spark-80bf48f437939ddc3bb82c8c7530c8ae419f8427.tar.bz2
spark-80bf48f437939ddc3bb82c8c7530c8ae419f8427.zip
[SPARK-14555] First cut of Python API for Structured Streaming
## What changes were proposed in this pull request? This patch provides a first cut of python APIs for structured streaming. This PR provides the new classes: - ContinuousQuery - Trigger - ProcessingTime in pyspark under `pyspark.sql.streaming`. In addition, it contains the new methods added under: - `DataFrameWriter` a) `startStream` b) `trigger` c) `queryName` - `DataFrameReader` a) `stream` - `DataFrame` a) `isStreaming` This PR doesn't contain all methods exposed for `ContinuousQuery`, for example: - `exception` - `sourceStatuses` - `sinkStatus` They may be added in a follow up. This PR also contains some very minor doc fixes in the Scala side. ## How was this patch tested? Python doc tests TODO: - [ ] verify Python docs look good Author: Burak Yavuz <brkyvz@gmail.com> Author: Burak Yavuz <burak@databricks.com> Closes #12320 from brkyvz/stream-python.
Diffstat (limited to 'python/pyspark/sql/streaming.py')
-rw-r--r--python/pyspark/sql/streaming.py124
1 files changed, 124 insertions, 0 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
new file mode 100644
index 0000000000..549561669f
--- /dev/null
+++ b/python/pyspark/sql/streaming.py
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from abc import ABCMeta, abstractmethod
+
+from pyspark import since
+
+__all__ = ["ContinuousQuery"]
+
+
+class ContinuousQuery(object):
+ """
+ A handle to a query that is executing continuously in the background as new data arrives.
+ All these methods are thread-safe.
+
+ .. note:: Experimental
+
+ .. versionadded:: 2.0
+ """
+
+ def __init__(self, jcq, sqlContext):
+ self._jcq = jcq
+ self._sqlContext = sqlContext
+
+ @property
+ @since(2.0)
+ def name(self):
+ """The name of the continuous query.
+ """
+ return self._jcq.name()
+
+ @property
+ @since(2.0)
+ def isActive(self):
+ """Whether this continuous query is currently active or not.
+ """
+ return self._jcq.isActive()
+
+ @since(2.0)
+ def awaitTermination(self, timeoutMs=None):
+ """Waits for the termination of `this` query, either by :func:`query.stop()` or by an
+ exception. If the query has terminated with an exception, then the exception will be thrown.
+ If `timeoutMs` is set, it returns whether the query has terminated or not within the
+ `timeoutMs` milliseconds.
+
+ If the query has terminated, then all subsequent calls to this method will either return
+ immediately (if the query was terminated by :func:`stop()`), or throw the exception
+ immediately (if the query has terminated with exception).
+
+ throws ContinuousQueryException, if `this` query has terminated with an exception
+ """
+ if timeoutMs is not None:
+ if type(timeoutMs) != int or timeoutMs < 0:
+ raise ValueError("timeoutMs must be a positive integer. Got %s" % timeoutMs)
+ return self._jcq.awaitTermination(timeoutMs)
+ else:
+ return self._jcq.awaitTermination()
+
+ @since(2.0)
+ def processAllAvailable(self):
+ """Blocks until all available data in the source has been processed an committed to the
+ sink. This method is intended for testing. Note that in the case of continually arriving
+ data, this method may block forever. Additionally, this method is only guaranteed to block
+ until data that has been synchronously appended data to a stream source prior to invocation.
+ (i.e. `getOffset` must immediately reflect the addition).
+ """
+ return self._jcq.processAllAvailable()
+
+ @since(2.0)
+ def stop(self):
+ """Stop this continuous query.
+ """
+ self._jcq.stop()
+
+
+class Trigger(object):
+ """Used to indicate how often results should be produced by a :class:`ContinuousQuery`.
+
+ .. note:: Experimental
+
+ .. versionadded:: 2.0
+ """
+
+ __metaclass__ = ABCMeta
+
+ @abstractmethod
+ def _to_java_trigger(self, sqlContext):
+ """Internal method to construct the trigger on the jvm.
+ """
+ pass
+
+
+class ProcessingTime(Trigger):
+ """A trigger that runs a query periodically based on the processing time. If `interval` is 0,
+ the query will run as fast as possible.
+
+ The interval should be given as a string, e.g. '2 seconds', '5 minutes', ...
+
+ .. note:: Experimental
+
+ .. versionadded:: 2.0
+ """
+
+ def __init__(self, interval):
+ if interval is None or type(interval) != str or len(interval.strip()) == 0:
+ raise ValueError("interval should be a non empty interval string, e.g. '2 seconds'.")
+ self.interval = interval
+
+ def _to_java_trigger(self, sqlContext):
+ return sqlContext._sc._jvm.org.apache.spark.sql.ProcessingTime.create(self.interval)