diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2017-04-05 16:03:04 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2017-04-05 16:03:04 -0700 |
commit | 9543fc0e08a21680961689ea772441c49fcd52ee (patch) | |
tree | 3907a5efd012da783b7d188af07083a85bde45f7 /sql/core/src | |
parent | e2773996b8d1c0214d9ffac634a059b4923caf7b (diff) | |
download | spark-9543fc0e08a21680961689ea772441c49fcd52ee.tar.gz spark-9543fc0e08a21680961689ea772441c49fcd52ee.tar.bz2 spark-9543fc0e08a21680961689ea772441c49fcd52ee.zip |
[SPARK-20224][SS] Updated docs for streaming dropDuplicates and mapGroupsWithState
## What changes were proposed in this pull request?
- Fixed bug in Java API not passing timeout conf to scala API
- Updated markdown docs
- Updated scala docs
- Added scala and Java example
## How was this patch tested?
Manually ran examples.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #17539 from tdas/SPARK-20224.
Diffstat (limited to 'sql/core/src')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala | 2 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala | 15 |
2 files changed, 14 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index 022c2f5629..cb42e9e456 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -347,7 +347,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( stateEncoder: Encoder[S], outputEncoder: Encoder[U], timeoutConf: GroupStateTimeout): Dataset[U] = { - mapGroupsWithState[S, U]( + mapGroupsWithState[S, U](timeoutConf)( (key: K, it: Iterator[V], s: GroupState[S]) => func.call(key, it.asJava, s) )(stateEncoder, outputEncoder) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala index 15df906ca7..c659ac7fcf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState * `Dataset.groupByKey()`) while maintaining user-defined per-group state between invocations. * For a static batch Dataset, the function will be invoked once per group. For a streaming * Dataset, the function will be invoked for each group repeatedly in every trigger. - * That is, in every batch of the `streaming.StreamingQuery`, + * That is, in every batch of the `StreamingQuery`, * the function will be invoked once for each group that has data in the trigger. Furthermore, * if timeout is set, then the function will invoked on timed out groups (more detail below). * @@ -42,12 +42,23 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState * - The key of the group. * - An iterator containing all the values for this group. * - A user-defined state object set by previous invocations of the given function. + * * In case of a batch Dataset, there is only one invocation and state object will be empty as * there is no prior state. Essentially, for batch Datasets, `[map/flatMap]GroupsWithState` * is equivalent to `[map/flatMap]Groups` and any updates to the state and/or timeouts have * no effect. * - * Important points to note about the function. + * The major difference between `mapGroupsWithState` and `flatMapGroupsWithState` is that the + * former allows the function to return one and only one record, whereas the latter + * allows the function to return any number of records (including no records). Furthermore, the + * `flatMapGroupsWithState` is associated with an operation output mode, which can be either + * `Append` or `Update`. Semantically, this defines whether the output records of one trigger + * is effectively replacing the previously output records (from previous triggers) or is appending + * to the list of previously output records. Essentially, this defines how the Result Table (refer + * to the semantics in the programming guide) is updated, and allows us to reason about the + * semantics of later operations. + * + * Important points to note about the function (both mapGroupsWithState and flatMapGroupsWithState). * - In a trigger, the function will be called only the groups present in the batch. So do not * assume that the function will be called in every trigger for every group that has state. * - There is no guaranteed ordering of values in the iterator in the function, neither with |