aboutsummaryrefslogtreecommitdiff
path: root/docs/structured-streaming-programming-guide.md
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-06-29 23:38:19 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-06-29 23:38:19 -0700
commit2c3d96134dcc0428983eea087db7e91072215aea (patch)
tree08be19dc09429a0052cf78aac28bc5dd9b340613 /docs/structured-streaming-programming-guide.md
parentdedbceec1ef33ccd88101016de969a1ef3e3e142 (diff)
downloadspark-2c3d96134dcc0428983eea087db7e91072215aea.tar.gz
spark-2c3d96134dcc0428983eea087db7e91072215aea.tar.bz2
spark-2c3d96134dcc0428983eea087db7e91072215aea.zip
[SPARK-16256][DOCS] Minor fixes on the Structured Streaming Programming Guide
Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13978 from tdas/SPARK-16256-1.
Diffstat (limited to 'docs/structured-streaming-programming-guide.md')
-rw-r--r--docs/structured-streaming-programming-guide.md44
1 files changed, 23 insertions, 21 deletions
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 9ed06be62b..593256603f 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -459,7 +459,7 @@ val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema) // Specify schema of the parquet files
- .csv("/path/to/directory") // Equivalent to format("cv").load("/path/to/directory")
+ .csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")
{% endhighlight %}
</div>
@@ -486,7 +486,7 @@ Dataset[Row] csvDF = spark
.readStream()
.option("sep", ";")
.schema(userSchema) // Specify schema of the parquet files
- .csv("/path/to/directory"); // Equivalent to format("cv").load("/path/to/directory")
+ .csv("/path/to/directory"); // Equivalent to format("csv").load("/path/to/directory")
{% endhighlight %}
</div>
@@ -513,7 +513,7 @@ csvDF = spark \
.readStream() \
.option("sep", ";") \
.schema(userSchema) \
- .csv("/path/to/directory") # Equivalent to format("cv").load("/path/to/directory")
+ .csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
{% endhighlight %}
</div>
@@ -522,10 +522,10 @@ csvDF = spark \
These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like `map`, `flatMap`, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the SQL Programming Guide for more details. Additionally, more details on the supported streaming sources are discussed later in the document.
## Operations on streaming DataFrames/Datasets
-You can apply all kinds of operations on streaming DataFrames/Datasets - ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the SQL programming guide for more details. Let’s take a look at a few example operations that you can use.
+You can apply all kinds of operations on streaming DataFrames/Datasets - ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the [SQL programming guide](sql-programming-guide.html) for more details. Let’s take a look at a few example operations that you can use.
### Basic Operations - Selection, Projection, Aggregation
-Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are discussed later in this section.
+Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are [discussed later](#unsupported-operations) in this section.
<div class="codetabs">
<div data-lang="scala" markdown="1">
@@ -618,7 +618,7 @@ df.groupBy("type").count()
</div>
### Window Operations on Event Time
-Aggregations over a sliding event-time window are straightforward with Structured Streaming. The key idea to understand about window-based aggregations are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of, window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let's understand this with an illustration.
+Aggregations over a sliding event-time window are straightforward with Structured Streaming. The key idea to understand about window-based aggregations are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let's understand this with an illustration.
Imagine the quick example is modified and the stream contains lines along with the time when the line was generated. Instead of running word counts, we want to count words within 10 minute windows, updating every 5 minutes. That is, word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived after 12:00 but before 12:10. Now, consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 - 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping key (i.e. the word) and the window (can be calculated from the event-time).
@@ -680,7 +680,7 @@ df.groupBy("type", window("time", "1 minute")).avg("signal")
Now consider what happens if one of the events arrives late to the application.
For example, a word that was generated at 12:04 but it was received at 12:11.
-Since this windowing is based on the time in the data, the time 12:04 should considered for windowing. This occurs naturally in our window-based grouping --the late data is automatically placed in the proper windows and the correct aggregates updated as illustrated below.
+Since this windowing is based on the time in the data, the time 12:04 should be considered for windowing. This occurs naturally in our window-based grouping - the late data is automatically placed in the proper windows and the correct aggregates updated as illustrated below.
![Handling Late Data](img/structured-streaming-late-data.png)
@@ -724,23 +724,25 @@ streamingDf.join(staticDf, "type", "right_join") # right outer join with a stat
</div>
### Unsupported Operations
-However, note that all of the operations applicable on static DataFrames/Datasets are not supported in streaming DataFrames/Datasets yet. While some of these unsupported operations will be supported in future releases of Spark, there are others which are fundamentally hard to implement on streaming data efficiently. As of Spark 2.0, some of the unsupported operations are as follows
+However, note that all of the operations applicable on static DataFrames/Datasets are not supported in streaming DataFrames/Datasets yet. While some of these unsupported operations will be supported in future releases of Spark, there are others which are fundamentally hard to implement on streaming data efficiently. For example, sorting is not supported on the input streaming Dataset, as it requires keeping track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently. As of Spark 2.0, some of the unsupported operations are as follows
-- Multiple aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported
+- Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.
-- Limit and take first N rows are not supported
+- Limit and take first N rows are not supported on streaming Datasets.
-- Distinct and sorting operations are not supported
+- Distinct operations on streaming Datasets are not supported.
-- Stream-batch outer joins are conditionally supported
+- Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.
- + Full outer join not allowed
+- Outer joins between a streaming and a static Datasets are conditionally supported.
- + Left outer join with a streaming DF on the left is not supported
+ + Full outer join with a streaming Dataset is not supported
- + Right outer join with a streaming DF on the right is not supported
+ + Left outer join with a streaming Dataset on the left is not supported
-- Stream-stream joins are not yet supported
+ + Right outer join with a streaming Dataset on the right is not supported
+
+- Any kind of joins between two streaming Datasets are not yet supported.
In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not makes sense on a streaming Dataset. Rather those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).
@@ -753,7 +755,7 @@ In addition, there are some Dataset methods that will not work on streaming Data
If you try any of these operations, you will see an AnalysisException like "operation XYZ is not supported with streaming DataFrames/Datasets".
## Starting Streaming Queries
-Once you have defined the final result DataFrame/Dataset, all that is left is for you start the StreamingQuery. To do that, you have to use the
+Once you have defined the final result DataFrame/Dataset, all that is left is for you start the streaming computation. To do that, you have to use the
`DataStreamWriter` (
[Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter)/
[Java](api/java/org/apache/spark/sql/streaming/DataStreamWriter.html)/
@@ -867,7 +869,7 @@ aggDF
.format("memory")
.start()
-spark.sql("select * from aggregates).show() // interactively query in-memory table
+spark.sql("select * from aggregates").show() // interactively query in-memory table
{% endhighlight %}
</div>
@@ -907,7 +909,7 @@ aggDF
.format("memory")
.start();
-spark.sql("select * from aggregates).show(); // interactively query in-memory table
+spark.sql("select * from aggregates").show(); // interactively query in-memory table
{% endhighlight %}
</div>
@@ -947,7 +949,7 @@ aggDF\
.format("memory")\
.start()
-spark.sql("select * from aggregates).show() # interactively query in-memory table
+spark.sql("select * from aggregates").show() # interactively query in-memory table
{% endhighlight %}
</div>
@@ -1144,7 +1146,7 @@ aggDF\
- Examples: See and run the
[Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming)/[Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/sql/streaming)/[Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/sql/streaming)
examples.
-- Spark Summit 2016 Talk - [A Deep Dive into Structured Streaming(https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/)
+- Spark Summit 2016 Talk - [A Deep Dive into Structured Streaming](https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/)