aboutsummaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2017-04-05 16:03:04 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2017-04-05 16:03:04 -0700
commit9543fc0e08a21680961689ea772441c49fcd52ee (patch)
tree3907a5efd012da783b7d188af07083a85bde45f7 /docs
parente2773996b8d1c0214d9ffac634a059b4923caf7b (diff)
downloadspark-9543fc0e08a21680961689ea772441c49fcd52ee.tar.gz
spark-9543fc0e08a21680961689ea772441c49fcd52ee.tar.bz2
spark-9543fc0e08a21680961689ea772441c49fcd52ee.zip
[SPARK-20224][SS] Updated docs for streaming dropDuplicates and mapGroupsWithState
## What changes were proposed in this pull request? - Fixed bug in Java API not passing timeout conf to scala API - Updated markdown docs - Updated scala docs - Added scala and Java example ## How was this patch tested? Manually ran examples. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17539 from tdas/SPARK-20224.
Diffstat (limited to 'docs')
-rw-r--r--docs/structured-streaming-programming-guide.md98
1 files changed, 89 insertions, 9 deletions
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index b5cf9f1644..37a1d6189a 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1,6 +1,6 @@
---
layout: global
-displayTitle: Structured Streaming Programming Guide [Alpha]
+displayTitle: Structured Streaming Programming Guide [Experimental]
title: Structured Streaming Programming Guide
---
@@ -871,6 +871,65 @@ streamingDf.join(staticDf, "type", "right_join") # right outer join with a stat
</div>
</div>
+### Streaming Deduplication
+You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking.
+
+- *With watermark* - If there is a upper bound on how late a duplicate record may arrive, then you can define a watermark on a event time column and deduplicate using both the guid and the event time columns. The query will use the watermark to remove old state data from past records that are not expected to get any duplicates any more. This bounds the amount of the state the query has to maintain.
+
+- *Without watermark* - Since there are no bounds on when a duplicate record may arrive, the query stores the data from all the past records as state.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
+
+// Without watermark using guid column
+streamingDf.dropDuplicates("guid")
+
+// With watermark using guid and eventTime columns
+streamingDf
+ .withWatermark("eventTime", "10 seconds")
+ .dropDuplicates("guid", "eventTime")
+{% endhighlight %}
+
+</div>
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+Dataset<Row> streamingDf = spark.readStream. ...; // columns: guid, eventTime, ...
+
+// Without watermark using guid column
+streamingDf.dropDuplicates("guid");
+
+// With watermark using guid and eventTime columns
+streamingDf
+ .withWatermark("eventTime", "10 seconds")
+ .dropDuplicates("guid", "eventTime");
+{% endhighlight %}
+
+
+</div>
+<div data-lang="python" markdown="1">
+
+{% highlight python %}
+streamingDf = spark.readStream. ...
+
+// Without watermark using guid column
+streamingDf.dropDuplicates("guid")
+
+// With watermark using guid and eventTime columns
+streamingDf \
+ .withWatermark("eventTime", "10 seconds") \
+ .dropDuplicates("guid", "eventTime")
+{% endhighlight %}
+
+</div>
+</div>
+
+### Arbitrary Stateful Operations
+Many uscases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupState)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)).
+
### Unsupported Operations
There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets.
Some of them are as follows.
@@ -891,7 +950,7 @@ Some of them are as follows.
+ Right outer join with a streaming Dataset on the left is not supported
-- Any kind of joins between two streaming Datasets are not yet supported.
+- Any kind of joins between two streaming Datasets is not yet supported.
In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).
@@ -952,13 +1011,6 @@ Here is the compatibility matrix.
<th>Notes</th>
</tr>
<tr>
- <td colspan="2" style="vertical-align: middle;">Queries without aggregation</td>
- <td style="vertical-align: middle;">Append, Update</td>
- <td style="vertical-align: middle;">
- Complete mode not supported as it is infeasible to keep all data in the Result Table.
- </td>
- </tr>
- <tr>
<td rowspan="2" style="vertical-align: middle;">Queries with aggregation</td>
<td style="vertical-align: middle;">Aggregation on event-time with watermark</td>
<td style="vertical-align: middle;">Append, Update, Complete</td>
@@ -987,6 +1039,33 @@ Here is the compatibility matrix.
</td>
</tr>
<tr>
+ <td colspan="2" style="vertical-align: middle;">Queries with <code>mapGroupsWithState</code></td>
+ <td style="vertical-align: middle;">Update</td>
+ <td style="vertical-align: middle;"></td>
+ </tr>
+ <tr>
+ <td rowspan="2" style="vertical-align: middle;">Queries with <code>flatMapGroupsWithState</code></td>
+ <td style="vertical-align: middle;">Append operation mode</td>
+ <td style="vertical-align: middle;">Append</td>
+ <td style="vertical-align: middle;">
+ Aggregations are allowed after <code>flatMapGroupsWithState</code>.
+ </td>
+ </tr>
+ <tr>
+ <td style="vertical-align: middle;">Update operation mode</td>
+ <td style="vertical-align: middle;">Update</td>
+ <td style="vertical-align: middle;">
+ Aggregations not allowed after <code>flatMapGroupsWithState</code>.
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2" style="vertical-align: middle;">Other queries</td>
+ <td style="vertical-align: middle;">Append, Update</td>
+ <td style="vertical-align: middle;">
+ Complete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.
+ </td>
+ </tr>
+ <tr>
<td></td>
<td></td>
<td></td>
@@ -994,6 +1073,7 @@ Here is the compatibility matrix.
</tr>
</table>
+
#### Output Sinks
There are a few types of built-in output sinks.