diff options
Diffstat (limited to 'examples/src/main')
6 files changed, 326 insertions, 4 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java index a2cf938954..346d2182c7 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java @@ -24,7 +24,7 @@ import java.util.Arrays; import java.util.Iterator; /** - * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Counts words in UTF8 encoded, '\n' delimited text received from the network. * * Usage: JavaStructuredNetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Structured Streaming @@ -40,7 +40,7 @@ public final class JavaStructuredNetworkWordCount { public static void main(String[] args) throws Exception { if (args.length < 2) { - System.err.println("Usage: JavaNetworkWordCount <hostname> <port>"); + System.err.println("Usage: JavaStructuredNetworkWordCount <hostname> <port>"); System.exit(1); } diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java new file mode 100644 index 0000000000..557d36cff3 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql.streaming; + +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.streaming.StreamingQuery; +import scala.Tuple2; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network over a + * sliding window of configurable duration. Each line from the network is tagged + * with a timestamp that is used to determine the windows into which it falls. + * + * Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port> <window duration> + * [<slide duration>] + * <hostname> and <port> describe the TCP server that Structured Streaming + * would connect to receive data. + * <window duration> gives the size of window, specified as integer number of seconds + * <slide duration> gives the amount of time successive windows are offset from one another, + * given in the same units as above. <slide duration> should be less than or equal to + * <window duration>. If the two are equal, successive windows have no overlap. If + * <slide duration> is not provided, it defaults to <window duration>. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCountWindowed + * localhost 9999 <window duration in seconds> [<slide duration in seconds>]` + * + * One recommended <window duration>, <slide duration> pair is 10, 5 + */ +public final class JavaStructuredNetworkWordCountWindowed { + + public static void main(String[] args) throws Exception { + if (args.length < 3) { + System.err.println("Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port>" + + " <window duration in seconds> [<slide duration in seconds>]"); + System.exit(1); + } + + String host = args[0]; + int port = Integer.parseInt(args[1]); + int windowSize = Integer.parseInt(args[2]); + int slideSize = (args.length == 3) ? windowSize : Integer.parseInt(args[3]); + if (slideSize > windowSize) { + System.err.println("<slide duration> must be less than or equal to <window duration>"); + } + String windowDuration = windowSize + " seconds"; + String slideDuration = slideSize + " seconds"; + + SparkSession spark = SparkSession + .builder() + .appName("JavaStructuredNetworkWordCountWindowed") + .getOrCreate(); + + // Create DataFrame representing the stream of input lines from connection to host:port + Dataset<Tuple2<String, Timestamp>> lines = spark + .readStream() + .format("socket") + .option("host", host) + .option("port", port) + .option("includeTimestamp", true) + .load().as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())); + + // Split the lines into words, retaining timestamps + Dataset<Row> words = lines.flatMap( + new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() { + @Override + public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) { + List<Tuple2<String, Timestamp>> result = new ArrayList<>(); + for (String word : t._1.split(" ")) { + result.add(new Tuple2<>(word, t._2)); + } + return result.iterator(); + } + }, + Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()) + ).toDF("word", "timestamp"); + + // Group the data by window and word and compute the count of each group + Dataset<Row> windowedCounts = words.groupBy( + functions.window(words.col("timestamp"), windowDuration, slideDuration), + words.col("word") + ).count().orderBy("window"); + + // Start running the query that prints the windowed word counts to the console + StreamingQuery query = windowedCounts.writeStream() + .outputMode("complete") + .format("console") + .option("truncate", "false") + .start(); + + query.awaitTermination(); + } +} diff --git a/examples/src/main/python/sql/streaming/structured_network_wordcount.py b/examples/src/main/python/sql/streaming/structured_network_wordcount.py index 32d63c52c9..afde255058 100644 --- a/examples/src/main/python/sql/streaming/structured_network_wordcount.py +++ b/examples/src/main/python/sql/streaming/structured_network_wordcount.py @@ -16,7 +16,7 @@ # """ - Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + Counts words in UTF8 encoded, '\n' delimited text received from the network. Usage: structured_network_wordcount.py <hostname> <port> <hostname> and <port> describe the TCP server that Structured Streaming would connect to receive data. @@ -58,6 +58,7 @@ if __name__ == "__main__": # Split the lines into words words = lines.select( + # explode turns each item in an array into a separate row explode( split(lines.value, ' ') ).alias('word') diff --git a/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py b/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py new file mode 100644 index 0000000000..02a7d3363d --- /dev/null +++ b/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + Counts words in UTF8 encoded, '\n' delimited text received from the network over a + sliding window of configurable duration. Each line from the network is tagged + with a timestamp that is used to determine the windows into which it falls. + + Usage: structured_network_wordcount_windowed.py <hostname> <port> <window duration> + [<slide duration>] + <hostname> and <port> describe the TCP server that Structured Streaming + would connect to receive data. + <window duration> gives the size of window, specified as integer number of seconds + <slide duration> gives the amount of time successive windows are offset from one another, + given in the same units as above. <slide duration> should be less than or equal to + <window duration>. If the two are equal, successive windows have no overlap. If + <slide duration> is not provided, it defaults to <window duration>. + + To run this on your local machine, you need to first run a Netcat server + `$ nc -lk 9999` + and then run the example + `$ bin/spark-submit + examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py + localhost 9999 <window duration> [<slide duration>]` + + One recommended <window duration>, <slide duration> pair is 10, 5 +""" +from __future__ import print_function + +import sys + +from pyspark.sql import SparkSession +from pyspark.sql.functions import explode +from pyspark.sql.functions import split +from pyspark.sql.functions import window + +if __name__ == "__main__": + if len(sys.argv) != 5 and len(sys.argv) != 4: + msg = ("Usage: structured_network_wordcount_windowed.py <hostname> <port> " + "<window duration in seconds> [<slide duration in seconds>]") + print(msg, file=sys.stderr) + exit(-1) + + host = sys.argv[1] + port = int(sys.argv[2]) + windowSize = int(sys.argv[3]) + slideSize = int(sys.argv[4]) if (len(sys.argv) == 5) else windowSize + if slideSize > windowSize: + print("<slide duration> must be less than or equal to <window duration>", file=sys.stderr) + windowDuration = '{} seconds'.format(windowSize) + slideDuration = '{} seconds'.format(slideSize) + + spark = SparkSession\ + .builder\ + .appName("StructuredNetworkWordCountWindowed")\ + .getOrCreate() + + # Create DataFrame representing the stream of input lines from connection to host:port + lines = spark\ + .readStream\ + .format('socket')\ + .option('host', host)\ + .option('port', port)\ + .option('includeTimestamp', 'true')\ + .load() + + # Split the lines into words, retaining timestamps + # split() splits each line into an array, and explode() turns the array into multiple rows + words = lines.select( + explode(split(lines.value, ' ')).alias('word'), + lines.timestamp + ) + + # Group the data by window and word and compute the count of each group + windowedCounts = words.groupBy( + window(words.timestamp, windowDuration, slideDuration), + words.word + ).count().orderBy('window') + + # Start running the query that prints the windowed word counts to the console + query = windowedCounts\ + .writeStream\ + .outputMode('complete')\ + .format('console')\ + .option('truncate', 'false')\ + .start() + + query.awaitTermination() diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala index 433f7a181b..364bff227b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession /** - * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Counts words in UTF8 encoded, '\n' delimited text received from the network. * * Usage: StructuredNetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Structured Streaming diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala new file mode 100644 index 0000000000..333b0a9d24 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.sql.streaming + +import java.sql.Timestamp + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions._ + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network over a + * sliding window of configurable duration. Each line from the network is tagged + * with a timestamp that is used to determine the windows into which it falls. + * + * Usage: StructuredNetworkWordCountWindowed <hostname> <port> <window duration> + * [<slide duration>] + * <hostname> and <port> describe the TCP server that Structured Streaming + * would connect to receive data. + * <window duration> gives the size of window, specified as integer number of seconds + * <slide duration> gives the amount of time successive windows are offset from one another, + * given in the same units as above. <slide duration> should be less than or equal to + * <window duration>. If the two are equal, successive windows have no overlap. If + * <slide duration> is not provided, it defaults to <window duration>. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed + * localhost 9999 <window duration in seconds> [<slide duration in seconds>]` + * + * One recommended <window duration>, <slide duration> pair is 10, 5 + */ +object StructuredNetworkWordCountWindowed { + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" + + " <window duration in seconds> [<slide duration in seconds>]") + System.exit(1) + } + + val host = args(0) + val port = args(1).toInt + val windowSize = args(2).toInt + val slideSize = if (args.length == 3) windowSize else args(3).toInt + if (slideSize > windowSize) { + System.err.println("<slide duration> must be less than or equal to <window duration>") + } + val windowDuration = s"$windowSize seconds" + val slideDuration = s"$slideSize seconds" + + val spark = SparkSession + .builder + .appName("StructuredNetworkWordCountWindowed") + .getOrCreate() + + import spark.implicits._ + + // Create DataFrame representing the stream of input lines from connection to host:port + val lines = spark.readStream + .format("socket") + .option("host", host) + .option("port", port) + .option("includeTimestamp", true) + .load().as[(String, Timestamp)] + + // Split the lines into words, retaining timestamps + val words = lines.flatMap(line => + line._1.split(" ").map(word => (word, line._2)) + ).toDF("word", "timestamp") + + // Group the data by window and word and compute the count of each group + val windowedCounts = words.groupBy( + window($"timestamp", windowDuration, slideDuration), $"word" + ).count().orderBy("window") + + // Start running the query that prints the windowed word counts to the console + val query = windowedCounts.writeStream + .outputMode("complete") + .format("console") + .option("truncate", "false") + .start() + + query.awaitTermination() + } +} +// scalastyle:on println |