aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-04-20 10:32:01 -0700
committerMichael Armbrust <michael@databricks.com>2016-04-20 10:32:01 -0700
commit80bf48f437939ddc3bb82c8c7530c8ae419f8427 (patch)
treee9be7bd9acac75d677eaad8ba69c84890d4913d3 /sql
parent834277884fcdaab4758604272881ffb2369e25f0 (diff)
downloadspark-80bf48f437939ddc3bb82c8c7530c8ae419f8427.tar.gz
spark-80bf48f437939ddc3bb82c8c7530c8ae419f8427.tar.bz2
spark-80bf48f437939ddc3bb82c8c7530c8ae419f8427.zip
[SPARK-14555] First cut of Python API for Structured Streaming
## What changes were proposed in this pull request? This patch provides a first cut of python APIs for structured streaming. This PR provides the new classes: - ContinuousQuery - Trigger - ProcessingTime in pyspark under `pyspark.sql.streaming`. In addition, it contains the new methods added under: - `DataFrameWriter` a) `startStream` b) `trigger` c) `queryName` - `DataFrameReader` a) `stream` - `DataFrame` a) `isStreaming` This PR doesn't contain all methods exposed for `ContinuousQuery`, for example: - `exception` - `sourceStatuses` - `sinkStatus` They may be added in a follow up. This PR also contains some very minor doc fixes in the Scala side. ## How was this patch tested? Python doc tests TODO: - [ ] verify Python docs look good Author: Burak Yavuz <brkyvz@gmail.com> Author: Burak Yavuz <burak@databricks.com> Closes #12320 from brkyvz/stream-python.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala26
3 files changed, 19 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
index d9973b092d..953169b636 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
@@ -56,7 +56,7 @@ trait ContinuousQuery {
* Returns current status of all the sources.
* @since 2.0.0
*/
- def sourceStatuses: Array[SourceStatus]
+ def sourceStatuses: Array[SourceStatus]
/** Returns current status of the sink. */
def sinkStatus: SinkStatus
@@ -77,7 +77,7 @@ trait ContinuousQuery {
/**
* Waits for the termination of `this` query, either by `query.stop()` or by an exception.
- * If the query has terminated with an exception, then the exception will be throw.
+ * If the query has terminated with an exception, then the exception will be thrown.
* Otherwise, it returns whether the query has terminated or not within the `timeoutMs`
* milliseconds.
*
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 1deeb8a2d2..0745ef47ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -85,18 +85,18 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*
* Scala Example:
* {{{
- * def.writer.trigger(ProcessingTime("10 seconds"))
+ * df.write.trigger(ProcessingTime("10 seconds"))
*
* import scala.concurrent.duration._
- * def.writer.trigger(ProcessingTime(10.seconds))
+ * df.write.trigger(ProcessingTime(10.seconds))
* }}}
*
* Java Example:
* {{{
- * def.writer.trigger(ProcessingTime.create("10 seconds"))
+ * df.write.trigger(ProcessingTime.create("10 seconds"))
*
* import java.util.concurrent.TimeUnit
- * def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+ * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
* }}}
*
* @since 2.0.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala
index c4e54b3f90..256e8a47a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala
@@ -35,23 +35,23 @@ sealed trait Trigger {}
/**
* :: Experimental ::
- * A trigger that runs a query periodically based on the processing time. If `intervalMs` is 0,
+ * A trigger that runs a query periodically based on the processing time. If `interval` is 0,
* the query will run as fast as possible.
*
* Scala Example:
* {{{
- * def.writer.trigger(ProcessingTime("10 seconds"))
+ * df.write.trigger(ProcessingTime("10 seconds"))
*
* import scala.concurrent.duration._
- * def.writer.trigger(ProcessingTime(10.seconds))
+ * df.write.trigger(ProcessingTime(10.seconds))
* }}}
*
* Java Example:
* {{{
- * def.writer.trigger(ProcessingTime.create("10 seconds"))
+ * df.write.trigger(ProcessingTime.create("10 seconds"))
*
* import java.util.concurrent.TimeUnit
- * def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+ * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
* }}}
*/
@Experimental
@@ -67,11 +67,11 @@ case class ProcessingTime(intervalMs: Long) extends Trigger {
object ProcessingTime {
/**
- * Create a [[ProcessingTime]]. If `intervalMs` is 0, the query will run as fast as possible.
+ * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible.
*
* Example:
* {{{
- * def.writer.trigger(ProcessingTime("10 seconds"))
+ * df.write.trigger(ProcessingTime("10 seconds"))
* }}}
*/
def apply(interval: String): ProcessingTime = {
@@ -94,12 +94,12 @@ object ProcessingTime {
}
/**
- * Create a [[ProcessingTime]]. If `intervalMs` is 0, the query will run as fast as possible.
+ * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible.
*
* Example:
* {{{
* import scala.concurrent.duration._
- * def.writer.trigger(ProcessingTime(10.seconds))
+ * df.write.trigger(ProcessingTime(10.seconds))
* }}}
*/
def apply(interval: Duration): ProcessingTime = {
@@ -107,11 +107,11 @@ object ProcessingTime {
}
/**
- * Create a [[ProcessingTime]]. If `intervalMs` is 0, the query will run as fast as possible.
+ * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible.
*
* Example:
* {{{
- * def.writer.trigger(ProcessingTime.create("10 seconds"))
+ * df.write.trigger(ProcessingTime.create("10 seconds"))
* }}}
*/
def create(interval: String): ProcessingTime = {
@@ -119,12 +119,12 @@ object ProcessingTime {
}
/**
- * Create a [[ProcessingTime]]. If `intervalMs` is 0, the query will run as fast as possible.
+ * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible.
*
* Example:
* {{{
* import java.util.concurrent.TimeUnit
- * def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+ * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
* }}}
*/
def create(interval: Long, unit: TimeUnit): ProcessingTime = {