From 9543fc0e08a21680961689ea772441c49fcd52ee Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 5 Apr 2017 16:03:04 -0700 Subject: [SPARK-20224][SS] Updated docs for streaming dropDuplicates and mapGroupsWithState ## What changes were proposed in this pull request? - Fixed bug in Java API not passing timeout conf to scala API - Updated markdown docs - Updated scala docs - Added scala and Java example ## How was this patch tested? Manually ran examples. Author: Tathagata Das Closes #17539 from tdas/SPARK-20224. --- docs/structured-streaming-programming-guide.md | 98 +++++++++++++++++++++++--- 1 file changed, 89 insertions(+), 9 deletions(-) (limited to 'docs/structured-streaming-programming-guide.md') diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index b5cf9f1644..37a1d6189a 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1,6 +1,6 @@ --- layout: global -displayTitle: Structured Streaming Programming Guide [Alpha] +displayTitle: Structured Streaming Programming Guide [Experimental] title: Structured Streaming Programming Guide --- @@ -871,6 +871,65 @@ streamingDf.join(staticDf, "type", "right_join") # right outer join with a stat +### Streaming Deduplication +You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking. + +- *With watermark* - If there is a upper bound on how late a duplicate record may arrive, then you can define a watermark on a event time column and deduplicate using both the guid and the event time columns. The query will use the watermark to remove old state data from past records that are not expected to get any duplicates any more. This bounds the amount of the state the query has to maintain. + +- *Without watermark* - Since there are no bounds on when a duplicate record may arrive, the query stores the data from all the past records as state. + +
+
+ +{% highlight scala %} +val streamingDf = spark.readStream. ... // columns: guid, eventTime, ... + +// Without watermark using guid column +streamingDf.dropDuplicates("guid") + +// With watermark using guid and eventTime columns +streamingDf + .withWatermark("eventTime", "10 seconds") + .dropDuplicates("guid", "eventTime") +{% endhighlight %} + +
+
+ +{% highlight java %} +Dataset streamingDf = spark.readStream. ...; // columns: guid, eventTime, ... + +// Without watermark using guid column +streamingDf.dropDuplicates("guid"); + +// With watermark using guid and eventTime columns +streamingDf + .withWatermark("eventTime", "10 seconds") + .dropDuplicates("guid", "eventTime"); +{% endhighlight %} + + +
+
+ +{% highlight python %} +streamingDf = spark.readStream. ... + +// Without watermark using guid column +streamingDf.dropDuplicates("guid") + +// With watermark using guid and eventTime columns +streamingDf \ + .withWatermark("eventTime", "10 seconds") \ + .dropDuplicates("guid", "eventTime") +{% endhighlight %} + +
+
+ +### Arbitrary Stateful Operations +Many uscases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupState)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)). + ### Unsupported Operations There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows. @@ -891,7 +950,7 @@ Some of them are as follows. + Right outer join with a streaming Dataset on the left is not supported -- Any kind of joins between two streaming Datasets are not yet supported. +- Any kind of joins between two streaming Datasets is not yet supported. In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that). @@ -951,13 +1010,6 @@ Here is the compatibility matrix. Supported Output Modes Notes - - Queries without aggregation - Append, Update - - Complete mode not supported as it is infeasible to keep all data in the Result Table. - - Queries with aggregation Aggregation on event-time with watermark @@ -986,6 +1038,33 @@ Here is the compatibility matrix. this mode. + + Queries with mapGroupsWithState + Update + + + + Queries with flatMapGroupsWithState + Append operation mode + Append + + Aggregations are allowed after flatMapGroupsWithState. + + + + Update operation mode + Update + + Aggregations not allowed after flatMapGroupsWithState. + + + + Other queries + Append, Update + + Complete mode not supported as it is infeasible to keep all unaggregated data in the Result Table. + + @@ -994,6 +1073,7 @@ Here is the compatibility matrix. + #### Output Sinks There are a few types of built-in output sinks. -- cgit v1.2.3