From 9543fc0e08a21680961689ea772441c49fcd52ee Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 5 Apr 2017 16:03:04 -0700 Subject: [SPARK-20224][SS] Updated docs for streaming dropDuplicates and mapGroupsWithState ## What changes were proposed in this pull request? - Fixed bug in Java API not passing timeout conf to scala API - Updated markdown docs - Updated scala docs - Added scala and Java example ## How was this patch tested? Manually ran examples. Author: Tathagata Das Closes #17539 from tdas/SPARK-20224. --- .../streaming/JavaStructuredSessionization.java | 255 +++++++++++++++++++++ .../sql/streaming/StructuredSessionization.scala | 151 ++++++++++++ 2 files changed, 406 insertions(+) create mode 100644 examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java create mode 100644 examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala (limited to 'examples') diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java new file mode 100644 index 0000000000..da3a5dfe86 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql.streaming; + +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsWithStateFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.streaming.GroupState; +import org.apache.spark.sql.streaming.GroupStateTimeout; +import org.apache.spark.sql.streaming.StreamingQuery; + +import java.io.Serializable; +import java.sql.Timestamp; +import java.util.*; + +import scala.Tuple2; + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network. + *

+ * Usage: JavaStructuredNetworkWordCount + * and describe the TCP server that Structured Streaming + * would connect to receive data. + *

+ * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ bin/run-example sql.streaming.JavaStructuredSessionization + * localhost 9999` + */ +public final class JavaStructuredSessionization { + + public static void main(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: JavaStructuredSessionization "); + System.exit(1); + } + + String host = args[0]; + int port = Integer.parseInt(args[1]); + + SparkSession spark = SparkSession + .builder() + .appName("JavaStructuredSessionization") + .getOrCreate(); + + // Create DataFrame representing the stream of input lines from connection to host:port + Dataset lines = spark + .readStream() + .format("socket") + .option("host", host) + .option("port", port) + .option("includeTimestamp", true) + .load(); + + FlatMapFunction linesToEvents = + new FlatMapFunction() { + @Override + public Iterator call(LineWithTimestamp lineWithTimestamp) throws Exception { + ArrayList eventList = new ArrayList(); + for (String word : lineWithTimestamp.getLine().split(" ")) { + eventList.add(new Event(word, lineWithTimestamp.getTimestamp())); + } + System.out.println( + "Number of events from " + lineWithTimestamp.getLine() + " = " + eventList.size()); + return eventList.iterator(); + } + }; + + // Split the lines into words, treat words as sessionId of events + Dataset events = lines + .withColumnRenamed("value", "line") + .as(Encoders.bean(LineWithTimestamp.class)) + .flatMap(linesToEvents, Encoders.bean(Event.class)); + + // Sessionize the events. Track number of events, start and end timestamps of session, and + // and report session updates. + // + // Step 1: Define the state update function + MapGroupsWithStateFunction stateUpdateFunc = + new MapGroupsWithStateFunction() { + @Override public SessionUpdate call( + String sessionId, Iterator events, GroupState state) + throws Exception { + // If timed out, then remove session and send final update + if (state.hasTimedOut()) { + SessionUpdate finalUpdate = new SessionUpdate( + sessionId, state.get().getDurationMs(), state.get().getNumEvents(), true); + state.remove(); + return finalUpdate; + + } else { + // Find max and min timestamps in events + long maxTimestampMs = Long.MIN_VALUE; + long minTimestampMs = Long.MAX_VALUE; + int numNewEvents = 0; + while (events.hasNext()) { + Event e = events.next(); + long timestampMs = e.getTimestamp().getTime(); + maxTimestampMs = Math.max(timestampMs, maxTimestampMs); + minTimestampMs = Math.min(timestampMs, minTimestampMs); + numNewEvents += 1; + } + SessionInfo updatedSession = new SessionInfo(); + + // Update start and end timestamps in session + if (state.exists()) { + SessionInfo oldSession = state.get(); + updatedSession.setNumEvents(oldSession.numEvents + numNewEvents); + updatedSession.setStartTimestampMs(oldSession.startTimestampMs); + updatedSession.setEndTimestampMs(Math.max(oldSession.endTimestampMs, maxTimestampMs)); + } else { + updatedSession.setNumEvents(numNewEvents); + updatedSession.setStartTimestampMs(minTimestampMs); + updatedSession.setEndTimestampMs(maxTimestampMs); + } + state.update(updatedSession); + // Set timeout such that the session will be expired if no data received for 10 seconds + state.setTimeoutDuration("10 seconds"); + return new SessionUpdate( + sessionId, state.get().getDurationMs(), state.get().getNumEvents(), false); + } + } + }; + + // Step 2: Apply the state update function to the events streaming Dataset grouped by sessionId + Dataset sessionUpdates = events + .groupByKey( + new MapFunction() { + @Override public String call(Event event) throws Exception { + return event.getSessionId(); + } + }, Encoders.STRING()) + .mapGroupsWithState( + stateUpdateFunc, + Encoders.bean(SessionInfo.class), + Encoders.bean(SessionUpdate.class), + GroupStateTimeout.ProcessingTimeTimeout()); + + // Start running the query that prints the session updates to the console + StreamingQuery query = sessionUpdates + .writeStream() + .outputMode("update") + .format("console") + .start(); + + query.awaitTermination(); + } + + /** + * User-defined data type representing the raw lines with timestamps. + */ + public static class LineWithTimestamp implements Serializable { + private String line; + private Timestamp timestamp; + + public Timestamp getTimestamp() { return timestamp; } + public void setTimestamp(Timestamp timestamp) { this.timestamp = timestamp; } + + public String getLine() { return line; } + public void setLine(String sessionId) { this.line = sessionId; } + } + + /** + * User-defined data type representing the input events + */ + public static class Event implements Serializable { + private String sessionId; + private Timestamp timestamp; + + public Event() { } + public Event(String sessionId, Timestamp timestamp) { + this.sessionId = sessionId; + this.timestamp = timestamp; + } + + public Timestamp getTimestamp() { return timestamp; } + public void setTimestamp(Timestamp timestamp) { this.timestamp = timestamp; } + + public String getSessionId() { return sessionId; } + public void setSessionId(String sessionId) { this.sessionId = sessionId; } + } + + /** + * User-defined data type for storing a session information as state in mapGroupsWithState. + */ + public static class SessionInfo implements Serializable { + private int numEvents = 0; + private long startTimestampMs = -1; + private long endTimestampMs = -1; + + public int getNumEvents() { return numEvents; } + public void setNumEvents(int numEvents) { this.numEvents = numEvents; } + + public long getStartTimestampMs() { return startTimestampMs; } + public void setStartTimestampMs(long startTimestampMs) { + this.startTimestampMs = startTimestampMs; + } + + public long getEndTimestampMs() { return endTimestampMs; } + public void setEndTimestampMs(long endTimestampMs) { this.endTimestampMs = endTimestampMs; } + + public long getDurationMs() { return endTimestampMs - startTimestampMs; } + @Override public String toString() { + return "SessionInfo(numEvents = " + numEvents + + ", timestamps = " + startTimestampMs + " to " + endTimestampMs + ")"; + } + } + + /** + * User-defined data type representing the update information returned by mapGroupsWithState. + */ + public static class SessionUpdate implements Serializable { + private String id; + private long durationMs; + private int numEvents; + private boolean expired; + + public SessionUpdate() { } + + public SessionUpdate(String id, long durationMs, int numEvents, boolean expired) { + this.id = id; + this.durationMs = durationMs; + this.numEvents = numEvents; + this.expired = expired; + } + + public String getId() { return id; } + public void setId(String id) { this.id = id; } + + public long getDurationMs() { return durationMs; } + public void setDurationMs(long durationMs) { this.durationMs = durationMs; } + + public int getNumEvents() { return numEvents; } + public void setNumEvents(int numEvents) { this.numEvents = numEvents; } + + public boolean isExpired() { return expired; } + public void setExpired(boolean expired) { this.expired = expired; } + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala new file mode 100644 index 0000000000..2ce792c008 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.sql.streaming + +import java.sql.Timestamp + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.streaming._ + + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network. + * + * Usage: MapGroupsWithState + * and describe the TCP server that Structured Streaming + * would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ bin/run-example sql.streaming.StructuredNetworkWordCount + * localhost 9999` + */ +object StructuredSessionization { + + def main(args: Array[String]): Unit = { + if (args.length < 2) { + System.err.println("Usage: StructuredNetworkWordCount ") + System.exit(1) + } + + val host = args(0) + val port = args(1).toInt + + val spark = SparkSession + .builder + .appName("StructuredSessionization") + .getOrCreate() + + import spark.implicits._ + + // Create DataFrame representing the stream of input lines from connection to host:port + val lines = spark.readStream + .format("socket") + .option("host", host) + .option("port", port) + .option("includeTimestamp", true) + .load() + + // Split the lines into words, treat words as sessionId of events + val events = lines + .as[(String, Timestamp)] + .flatMap { case (line, timestamp) => + line.split(" ").map(word => Event(sessionId = word, timestamp)) + } + + // Sessionize the events. Track number of events, start and end timestamps of session, and + // and report session updates. + val sessionUpdates = events + .groupByKey(event => event.sessionId) + .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) { + + case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) => + + // If timed out, then remove session and send final update + if (state.hasTimedOut) { + val finalUpdate = + SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = true) + state.remove() + finalUpdate + } else { + // Update start and end timestamps in session + val timestamps = events.map(_.timestamp.getTime).toSeq + val updatedSession = if (state.exists) { + val oldSession = state.get + SessionInfo( + oldSession.numEvents + timestamps.size, + oldSession.startTimestampMs, + math.max(oldSession.endTimestampMs, timestamps.max)) + } else { + SessionInfo(timestamps.size, timestamps.min, timestamps.max) + } + state.update(updatedSession) + + // Set timeout such that the session will be expired if no data received for 10 seconds + state.setTimeoutDuration("10 seconds") + SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = false) + } + } + + // Start running the query that prints the session updates to the console + val query = sessionUpdates + .writeStream + .outputMode("update") + .format("console") + .start() + + query.awaitTermination() + } +} +/** User-defined data type representing the input events */ +case class Event(sessionId: String, timestamp: Timestamp) + +/** + * User-defined data type for storing a session information as state in mapGroupsWithState. + * + * @param numEvents total number of events received in the session + * @param startTimestampMs timestamp of first event received in the session when it started + * @param endTimestampMs timestamp of last event received in the session before it expired + */ +case class SessionInfo( + numEvents: Int, + startTimestampMs: Long, + endTimestampMs: Long) { + + /** Duration of the session, between the first and last events */ + def durationMs: Long = endTimestampMs - startTimestampMs +} + +/** + * User-defined data type representing the update information returned by mapGroupsWithState. + * + * @param id Id of the session + * @param durationMs Duration the session was active, that is, from first event to its expiry + * @param numEvents Number of events received by the session while it was active + * @param expired Is the session active or expired + */ +case class SessionUpdate( + id: String, + durationMs: Long, + numEvents: Int, + expired: Boolean) + +// scalastyle:on println + -- cgit v1.2.3