aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2017-04-05 16:03:04 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2017-04-05 16:03:04 -0700
commit9543fc0e08a21680961689ea772441c49fcd52ee (patch)
tree3907a5efd012da783b7d188af07083a85bde45f7 /sql/core/src
parente2773996b8d1c0214d9ffac634a059b4923caf7b (diff)
downloadspark-9543fc0e08a21680961689ea772441c49fcd52ee.tar.gz
spark-9543fc0e08a21680961689ea772441c49fcd52ee.tar.bz2
spark-9543fc0e08a21680961689ea772441c49fcd52ee.zip
[SPARK-20224][SS] Updated docs for streaming dropDuplicates and mapGroupsWithState
## What changes were proposed in this pull request? - Fixed bug in Java API not passing timeout conf to scala API - Updated markdown docs - Updated scala docs - Added scala and Java example ## How was this patch tested? Manually ran examples. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17539 from tdas/SPARK-20224.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala15
2 files changed, 14 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 022c2f5629..cb42e9e456 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -347,7 +347,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
stateEncoder: Encoder[S],
outputEncoder: Encoder[U],
timeoutConf: GroupStateTimeout): Dataset[U] = {
- mapGroupsWithState[S, U](
+ mapGroupsWithState[S, U](timeoutConf)(
(key: K, it: Iterator[V], s: GroupState[S]) => func.call(key, it.asJava, s)
)(stateEncoder, outputEncoder)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
index 15df906ca7..c659ac7fcf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState
* `Dataset.groupByKey()`) while maintaining user-defined per-group state between invocations.
* For a static batch Dataset, the function will be invoked once per group. For a streaming
* Dataset, the function will be invoked for each group repeatedly in every trigger.
- * That is, in every batch of the `streaming.StreamingQuery`,
+ * That is, in every batch of the `StreamingQuery`,
* the function will be invoked once for each group that has data in the trigger. Furthermore,
* if timeout is set, then the function will invoked on timed out groups (more detail below).
*
@@ -42,12 +42,23 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState
* - The key of the group.
* - An iterator containing all the values for this group.
* - A user-defined state object set by previous invocations of the given function.
+ *
* In case of a batch Dataset, there is only one invocation and state object will be empty as
* there is no prior state. Essentially, for batch Datasets, `[map/flatMap]GroupsWithState`
* is equivalent to `[map/flatMap]Groups` and any updates to the state and/or timeouts have
* no effect.
*
- * Important points to note about the function.
+ * The major difference between `mapGroupsWithState` and `flatMapGroupsWithState` is that the
+ * former allows the function to return one and only one record, whereas the latter
+ * allows the function to return any number of records (including no records). Furthermore, the
+ * `flatMapGroupsWithState` is associated with an operation output mode, which can be either
+ * `Append` or `Update`. Semantically, this defines whether the output records of one trigger
+ * is effectively replacing the previously output records (from previous triggers) or is appending
+ * to the list of previously output records. Essentially, this defines how the Result Table (refer
+ * to the semantics in the programming guide) is updated, and allows us to reason about the
+ * semantics of later operations.
+ *
+ * Important points to note about the function (both mapGroupsWithState and flatMapGroupsWithState).
* - In a trigger, the function will be called only the groups present in the batch. So do not
* assume that the function will be called in every trigger for every group that has state.
* - There is no guaranteed ordering of values in the iterator in the function, neither with