From 51a6706b1339bb761602e33276a469f71be2cd90 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Wed, 13 Jul 2016 13:26:23 -0700 Subject: [SPARK-16114][SQL] updated structured streaming guide ## What changes were proposed in this pull request? Updated structured streaming programming guide with new windowed example. ## How was this patch tested? Docs Author: James Thomas Closes #14183 from jjthomas/ss_docs_update. --- docs/structured-streaming-programming-guide.md | 49 ++++++++++++-------------- 1 file changed, 23 insertions(+), 26 deletions(-) (limited to 'docs') diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 79493968db..3ef39e4885 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -626,52 +626,49 @@ The result tables would look something like the following. ![Window Operations](img/structured-streaming-window.png) -Since this windowing is similar to grouping, in code, you can use `groupBy()` and `window()` operations to express windowed aggregations. +Since this windowing is similar to grouping, in code, you can use `groupBy()` and `window()` operations to express windowed aggregations. You can see the full code for the below examples in +[Scala]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala)/ +[Java]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java)/ +[Python]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py).
{% highlight scala %} -// Number of events in every 1 minute time windows -df.groupBy(window(df.col("time"), "1 minute")) - .count() +import spark.implicits._ +val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String } -// Average number of events for each device type in every 1 minute time windows -df.groupBy( - df.col("type"), - window(df.col("time"), "1 minute")) - .avg("signal") +// Group the data by window and word and compute the count of each group +val windowedCounts = words.groupBy( + window($"timestamp", "10 minutes", "5 minutes"), + $"word" +).count() {% endhighlight %}
{% highlight java %} -import static org.apache.spark.sql.functions.window; - -// Number of events in every 1 minute time windows -df.groupBy(window(df.col("time"), "1 minute")) - .count(); - -// Average number of events for each device type in every 1 minute time windows -df.groupBy( - df.col("type"), - window(df.col("time"), "1 minute")) - .avg("signal"); +Dataset words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String } +// Group the data by window and word and compute the count of each group +Dataset windowedCounts = words.groupBy( + functions.window(words.col("timestamp"), "10 minutes", "5 minutes"), + words.col("word") +).count(); {% endhighlight %}
{% highlight python %} -from pyspark.sql.functions import window - -# Number of events in every 1 minute time windows -df.groupBy(window("time", "1 minute")).count() +words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String } -# Average number of events for each device type in every 1 minute time windows -df.groupBy("type", window("time", "1 minute")).avg("signal") +# Group the data by window and word and compute the count of each group +windowedCounts = words.groupBy( + window(words.timestamp, '10 minutes', '5 minutes'), + words.word +).count() {% endhighlight %}
-- cgit v1.2.3