diff options
author | Burak Yavuz <brkyvz@gmail.com> | 2016-04-20 10:32:01 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-04-20 10:32:01 -0700 |
commit | 80bf48f437939ddc3bb82c8c7530c8ae419f8427 (patch) | |
tree | e9be7bd9acac75d677eaad8ba69c84890d4913d3 /sql | |
parent | 834277884fcdaab4758604272881ffb2369e25f0 (diff) | |
download | spark-80bf48f437939ddc3bb82c8c7530c8ae419f8427.tar.gz spark-80bf48f437939ddc3bb82c8c7530c8ae419f8427.tar.bz2 spark-80bf48f437939ddc3bb82c8c7530c8ae419f8427.zip |
[SPARK-14555] First cut of Python API for Structured Streaming
## What changes were proposed in this pull request?
This patch provides a first cut of python APIs for structured streaming. This PR provides the new classes:
- ContinuousQuery
- Trigger
- ProcessingTime
in pyspark under `pyspark.sql.streaming`.
In addition, it contains the new methods added under:
- `DataFrameWriter`
a) `startStream`
b) `trigger`
c) `queryName`
- `DataFrameReader`
a) `stream`
- `DataFrame`
a) `isStreaming`
This PR doesn't contain all methods exposed for `ContinuousQuery`, for example:
- `exception`
- `sourceStatuses`
- `sinkStatus`
They may be added in a follow up.
This PR also contains some very minor doc fixes in the Scala side.
## How was this patch tested?
Python doc tests
TODO:
- [ ] verify Python docs look good
Author: Burak Yavuz <brkyvz@gmail.com>
Author: Burak Yavuz <burak@databricks.com>
Closes #12320 from brkyvz/stream-python.
Diffstat (limited to 'sql')
3 files changed, 19 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala index d9973b092d..953169b636 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala @@ -56,7 +56,7 @@ trait ContinuousQuery { * Returns current status of all the sources. * @since 2.0.0 */ - def sourceStatuses: Array[SourceStatus] + def sourceStatuses: Array[SourceStatus] /** Returns current status of the sink. */ def sinkStatus: SinkStatus @@ -77,7 +77,7 @@ trait ContinuousQuery { /** * Waits for the termination of `this` query, either by `query.stop()` or by an exception. - * If the query has terminated with an exception, then the exception will be throw. + * If the query has terminated with an exception, then the exception will be thrown. * Otherwise, it returns whether the query has terminated or not within the `timeoutMs` * milliseconds. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 1deeb8a2d2..0745ef47ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -85,18 +85,18 @@ final class DataFrameWriter private[sql](df: DataFrame) { * * Scala Example: * {{{ - * def.writer.trigger(ProcessingTime("10 seconds")) + * df.write.trigger(ProcessingTime("10 seconds")) * * import scala.concurrent.duration._ - * def.writer.trigger(ProcessingTime(10.seconds)) + * df.write.trigger(ProcessingTime(10.seconds)) * }}} * * Java Example: * {{{ - * def.writer.trigger(ProcessingTime.create("10 seconds")) + * df.write.trigger(ProcessingTime.create("10 seconds")) * * import java.util.concurrent.TimeUnit - * def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) * }}} * * @since 2.0.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala index c4e54b3f90..256e8a47a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala @@ -35,23 +35,23 @@ sealed trait Trigger {} /** * :: Experimental :: - * A trigger that runs a query periodically based on the processing time. If `intervalMs` is 0, + * A trigger that runs a query periodically based on the processing time. If `interval` is 0, * the query will run as fast as possible. * * Scala Example: * {{{ - * def.writer.trigger(ProcessingTime("10 seconds")) + * df.write.trigger(ProcessingTime("10 seconds")) * * import scala.concurrent.duration._ - * def.writer.trigger(ProcessingTime(10.seconds)) + * df.write.trigger(ProcessingTime(10.seconds)) * }}} * * Java Example: * {{{ - * def.writer.trigger(ProcessingTime.create("10 seconds")) + * df.write.trigger(ProcessingTime.create("10 seconds")) * * import java.util.concurrent.TimeUnit - * def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) * }}} */ @Experimental @@ -67,11 +67,11 @@ case class ProcessingTime(intervalMs: Long) extends Trigger { object ProcessingTime { /** - * Create a [[ProcessingTime]]. If `intervalMs` is 0, the query will run as fast as possible. + * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. * * Example: * {{{ - * def.writer.trigger(ProcessingTime("10 seconds")) + * df.write.trigger(ProcessingTime("10 seconds")) * }}} */ def apply(interval: String): ProcessingTime = { @@ -94,12 +94,12 @@ object ProcessingTime { } /** - * Create a [[ProcessingTime]]. If `intervalMs` is 0, the query will run as fast as possible. + * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. * * Example: * {{{ * import scala.concurrent.duration._ - * def.writer.trigger(ProcessingTime(10.seconds)) + * df.write.trigger(ProcessingTime(10.seconds)) * }}} */ def apply(interval: Duration): ProcessingTime = { @@ -107,11 +107,11 @@ object ProcessingTime { } /** - * Create a [[ProcessingTime]]. If `intervalMs` is 0, the query will run as fast as possible. + * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. * * Example: * {{{ - * def.writer.trigger(ProcessingTime.create("10 seconds")) + * df.write.trigger(ProcessingTime.create("10 seconds")) * }}} */ def create(interval: String): ProcessingTime = { @@ -119,12 +119,12 @@ object ProcessingTime { } /** - * Create a [[ProcessingTime]]. If `intervalMs` is 0, the query will run as fast as possible. + * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. * * Example: * {{{ * import java.util.concurrent.TimeUnit - * def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) * }}} */ def create(interval: Long, unit: TimeUnit): ProcessingTime = { |