aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorJames Thomas <jamesjoethomas@gmail.com>2016-07-11 17:57:51 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-07-11 17:57:51 -0700
commit9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4 (patch)
treeee8b872c7460ffa571e67b15533f2ba4f067a20f /examples
parentb4fbe140be158f576706f21fa69f40d6e14e4a43 (diff)
downloadspark-9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4.tar.gz
spark-9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4.tar.bz2
spark-9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4.zip
[SPARK-16114][SQL] structured streaming event time window example
## What changes were proposed in this pull request? A structured streaming example with event time windowing. ## How was this patch tested? Run locally Author: James Thomas <jamesjoethomas@gmail.com> Closes #13957 from jjthomas/current.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java4
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java116
-rw-r--r--examples/src/main/python/sql/streaming/structured_network_wordcount.py3
-rw-r--r--examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py102
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala103
6 files changed, 326 insertions, 4 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
index a2cf938954..346d2182c7 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
@@ -24,7 +24,7 @@ import java.util.Arrays;
import java.util.Iterator;
/**
- * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network.
*
* Usage: JavaStructuredNetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Structured Streaming
@@ -40,7 +40,7 @@ public final class JavaStructuredNetworkWordCount {
public static void main(String[] args) throws Exception {
if (args.length < 2) {
- System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
+ System.err.println("Usage: JavaStructuredNetworkWordCount <hostname> <port>");
System.exit(1);
}
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
new file mode 100644
index 0000000000..557d36cff3
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql.streaming;
+
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import scala.Tuple2;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network over a
+ * sliding window of configurable duration. Each line from the network is tagged
+ * with a timestamp that is used to determine the windows into which it falls.
+ *
+ * Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port> <window duration>
+ * [<slide duration>]
+ * <hostname> and <port> describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ * <window duration> gives the size of window, specified as integer number of seconds
+ * <slide duration> gives the amount of time successive windows are offset from one another,
+ * given in the same units as above. <slide duration> should be less than or equal to
+ * <window duration>. If the two are equal, successive windows have no overlap. If
+ * <slide duration> is not provided, it defaults to <window duration>.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCountWindowed
+ * localhost 9999 <window duration in seconds> [<slide duration in seconds>]`
+ *
+ * One recommended <window duration>, <slide duration> pair is 10, 5
+ */
+public final class JavaStructuredNetworkWordCountWindowed {
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 3) {
+ System.err.println("Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port>" +
+ " <window duration in seconds> [<slide duration in seconds>]");
+ System.exit(1);
+ }
+
+ String host = args[0];
+ int port = Integer.parseInt(args[1]);
+ int windowSize = Integer.parseInt(args[2]);
+ int slideSize = (args.length == 3) ? windowSize : Integer.parseInt(args[3]);
+ if (slideSize > windowSize) {
+ System.err.println("<slide duration> must be less than or equal to <window duration>");
+ }
+ String windowDuration = windowSize + " seconds";
+ String slideDuration = slideSize + " seconds";
+
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("JavaStructuredNetworkWordCountWindowed")
+ .getOrCreate();
+
+ // Create DataFrame representing the stream of input lines from connection to host:port
+ Dataset<Tuple2<String, Timestamp>> lines = spark
+ .readStream()
+ .format("socket")
+ .option("host", host)
+ .option("port", port)
+ .option("includeTimestamp", true)
+ .load().as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()));
+
+ // Split the lines into words, retaining timestamps
+ Dataset<Row> words = lines.flatMap(
+ new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
+ @Override
+ public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) {
+ List<Tuple2<String, Timestamp>> result = new ArrayList<>();
+ for (String word : t._1.split(" ")) {
+ result.add(new Tuple2<>(word, t._2));
+ }
+ return result.iterator();
+ }
+ },
+ Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
+ ).toDF("word", "timestamp");
+
+ // Group the data by window and word and compute the count of each group
+ Dataset<Row> windowedCounts = words.groupBy(
+ functions.window(words.col("timestamp"), windowDuration, slideDuration),
+ words.col("word")
+ ).count().orderBy("window");
+
+ // Start running the query that prints the windowed word counts to the console
+ StreamingQuery query = windowedCounts.writeStream()
+ .outputMode("complete")
+ .format("console")
+ .option("truncate", "false")
+ .start();
+
+ query.awaitTermination();
+ }
+}
diff --git a/examples/src/main/python/sql/streaming/structured_network_wordcount.py b/examples/src/main/python/sql/streaming/structured_network_wordcount.py
index 32d63c52c9..afde255058 100644
--- a/examples/src/main/python/sql/streaming/structured_network_wordcount.py
+++ b/examples/src/main/python/sql/streaming/structured_network_wordcount.py
@@ -16,7 +16,7 @@
#
"""
- Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ Counts words in UTF8 encoded, '\n' delimited text received from the network.
Usage: structured_network_wordcount.py <hostname> <port>
<hostname> and <port> describe the TCP server that Structured Streaming
would connect to receive data.
@@ -58,6 +58,7 @@ if __name__ == "__main__":
# Split the lines into words
words = lines.select(
+ # explode turns each item in an array into a separate row
explode(
split(lines.value, ' ')
).alias('word')
diff --git a/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py b/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
new file mode 100644
index 0000000000..02a7d3363d
--- /dev/null
+++ b/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Counts words in UTF8 encoded, '\n' delimited text received from the network over a
+ sliding window of configurable duration. Each line from the network is tagged
+ with a timestamp that is used to determine the windows into which it falls.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port> <window duration>
+ [<slide duration>]
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+ <window duration> gives the size of window, specified as integer number of seconds
+ <slide duration> gives the amount of time successive windows are offset from one another,
+ given in the same units as above. <slide duration> should be less than or equal to
+ <window duration>. If the two are equal, successive windows have no overlap. If
+ <slide duration> is not provided, it defaults to <window duration>.
+
+ To run this on your local machine, you need to first run a Netcat server
+ `$ nc -lk 9999`
+ and then run the example
+ `$ bin/spark-submit
+ examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
+ localhost 9999 <window duration> [<slide duration>]`
+
+ One recommended <window duration>, <slide duration> pair is 10, 5
+"""
+from __future__ import print_function
+
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+
+if __name__ == "__main__":
+ if len(sys.argv) != 5 and len(sys.argv) != 4:
+ msg = ("Usage: structured_network_wordcount_windowed.py <hostname> <port> "
+ "<window duration in seconds> [<slide duration in seconds>]")
+ print(msg, file=sys.stderr)
+ exit(-1)
+
+ host = sys.argv[1]
+ port = int(sys.argv[2])
+ windowSize = int(sys.argv[3])
+ slideSize = int(sys.argv[4]) if (len(sys.argv) == 5) else windowSize
+ if slideSize > windowSize:
+ print("<slide duration> must be less than or equal to <window duration>", file=sys.stderr)
+ windowDuration = '{} seconds'.format(windowSize)
+ slideDuration = '{} seconds'.format(slideSize)
+
+ spark = SparkSession\
+ .builder\
+ .appName("StructuredNetworkWordCountWindowed")\
+ .getOrCreate()
+
+ # Create DataFrame representing the stream of input lines from connection to host:port
+ lines = spark\
+ .readStream\
+ .format('socket')\
+ .option('host', host)\
+ .option('port', port)\
+ .option('includeTimestamp', 'true')\
+ .load()
+
+ # Split the lines into words, retaining timestamps
+ # split() splits each line into an array, and explode() turns the array into multiple rows
+ words = lines.select(
+ explode(split(lines.value, ' ')).alias('word'),
+ lines.timestamp
+ )
+
+ # Group the data by window and word and compute the count of each group
+ windowedCounts = words.groupBy(
+ window(words.timestamp, windowDuration, slideDuration),
+ words.word
+ ).count().orderBy('window')
+
+ # Start running the query that prints the windowed word counts to the console
+ query = windowedCounts\
+ .writeStream\
+ .outputMode('complete')\
+ .format('console')\
+ .option('truncate', 'false')\
+ .start()
+
+ query.awaitTermination()
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
index 433f7a181b..364bff227b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
/**
- * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network.
*
* Usage: StructuredNetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Structured Streaming
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
new file mode 100644
index 0000000000..333b0a9d24
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.sql.streaming
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions._
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network over a
+ * sliding window of configurable duration. Each line from the network is tagged
+ * with a timestamp that is used to determine the windows into which it falls.
+ *
+ * Usage: StructuredNetworkWordCountWindowed <hostname> <port> <window duration>
+ * [<slide duration>]
+ * <hostname> and <port> describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ * <window duration> gives the size of window, specified as integer number of seconds
+ * <slide duration> gives the amount of time successive windows are offset from one another,
+ * given in the same units as above. <slide duration> should be less than or equal to
+ * <window duration>. If the two are equal, successive windows have no overlap. If
+ * <slide duration> is not provided, it defaults to <window duration>.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed
+ * localhost 9999 <window duration in seconds> [<slide duration in seconds>]`
+ *
+ * One recommended <window duration>, <slide duration> pair is 10, 5
+ */
+object StructuredNetworkWordCountWindowed {
+
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +
+ " <window duration in seconds> [<slide duration in seconds>]")
+ System.exit(1)
+ }
+
+ val host = args(0)
+ val port = args(1).toInt
+ val windowSize = args(2).toInt
+ val slideSize = if (args.length == 3) windowSize else args(3).toInt
+ if (slideSize > windowSize) {
+ System.err.println("<slide duration> must be less than or equal to <window duration>")
+ }
+ val windowDuration = s"$windowSize seconds"
+ val slideDuration = s"$slideSize seconds"
+
+ val spark = SparkSession
+ .builder
+ .appName("StructuredNetworkWordCountWindowed")
+ .getOrCreate()
+
+ import spark.implicits._
+
+ // Create DataFrame representing the stream of input lines from connection to host:port
+ val lines = spark.readStream
+ .format("socket")
+ .option("host", host)
+ .option("port", port)
+ .option("includeTimestamp", true)
+ .load().as[(String, Timestamp)]
+
+ // Split the lines into words, retaining timestamps
+ val words = lines.flatMap(line =>
+ line._1.split(" ").map(word => (word, line._2))
+ ).toDF("word", "timestamp")
+
+ // Group the data by window and word and compute the count of each group
+ val windowedCounts = words.groupBy(
+ window($"timestamp", windowDuration, slideDuration), $"word"
+ ).count().orderBy("window")
+
+ // Start running the query that prints the windowed word counts to the console
+ val query = windowedCounts.writeStream
+ .outputMode("complete")
+ .format("console")
+ .option("truncate", "false")
+ .start()
+
+ query.awaitTermination()
+ }
+}
+// scalastyle:on println