aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2017-04-05 16:03:04 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2017-04-05 16:03:04 -0700
commit9543fc0e08a21680961689ea772441c49fcd52ee (patch)
tree3907a5efd012da783b7d188af07083a85bde45f7 /examples
parente2773996b8d1c0214d9ffac634a059b4923caf7b (diff)
downloadspark-9543fc0e08a21680961689ea772441c49fcd52ee.tar.gz
spark-9543fc0e08a21680961689ea772441c49fcd52ee.tar.bz2
spark-9543fc0e08a21680961689ea772441c49fcd52ee.zip
[SPARK-20224][SS] Updated docs for streaming dropDuplicates and mapGroupsWithState
## What changes were proposed in this pull request? - Fixed bug in Java API not passing timeout conf to scala API - Updated markdown docs - Updated scala docs - Added scala and Java example ## How was this patch tested? Manually ran examples. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17539 from tdas/SPARK-20224.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java255
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala151
2 files changed, 406 insertions, 0 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
new file mode 100644
index 0000000000..da3a5dfe86
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql.streaming;
+
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.MapGroupsWithStateFunction;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.streaming.GroupState;
+import org.apache.spark.sql.streaming.GroupStateTimeout;
+import org.apache.spark.sql.streaming.StreamingQuery;
+
+import java.io.Serializable;
+import java.sql.Timestamp;
+import java.util.*;
+
+import scala.Tuple2;
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network.
+ * <p>
+ * Usage: JavaStructuredNetworkWordCount <hostname> <port>
+ * <hostname> and <port> describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ * <p>
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ bin/run-example sql.streaming.JavaStructuredSessionization
+ * localhost 9999`
+ */
+public final class JavaStructuredSessionization {
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println("Usage: JavaStructuredSessionization <hostname> <port>");
+ System.exit(1);
+ }
+
+ String host = args[0];
+ int port = Integer.parseInt(args[1]);
+
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("JavaStructuredSessionization")
+ .getOrCreate();
+
+ // Create DataFrame representing the stream of input lines from connection to host:port
+ Dataset<Row> lines = spark
+ .readStream()
+ .format("socket")
+ .option("host", host)
+ .option("port", port)
+ .option("includeTimestamp", true)
+ .load();
+
+ FlatMapFunction<LineWithTimestamp, Event> linesToEvents =
+ new FlatMapFunction<LineWithTimestamp, Event>() {
+ @Override
+ public Iterator<Event> call(LineWithTimestamp lineWithTimestamp) throws Exception {
+ ArrayList<Event> eventList = new ArrayList<Event>();
+ for (String word : lineWithTimestamp.getLine().split(" ")) {
+ eventList.add(new Event(word, lineWithTimestamp.getTimestamp()));
+ }
+ System.out.println(
+ "Number of events from " + lineWithTimestamp.getLine() + " = " + eventList.size());
+ return eventList.iterator();
+ }
+ };
+
+ // Split the lines into words, treat words as sessionId of events
+ Dataset<Event> events = lines
+ .withColumnRenamed("value", "line")
+ .as(Encoders.bean(LineWithTimestamp.class))
+ .flatMap(linesToEvents, Encoders.bean(Event.class));
+
+ // Sessionize the events. Track number of events, start and end timestamps of session, and
+ // and report session updates.
+ //
+ // Step 1: Define the state update function
+ MapGroupsWithStateFunction<String, Event, SessionInfo, SessionUpdate> stateUpdateFunc =
+ new MapGroupsWithStateFunction<String, Event, SessionInfo, SessionUpdate>() {
+ @Override public SessionUpdate call(
+ String sessionId, Iterator<Event> events, GroupState<SessionInfo> state)
+ throws Exception {
+ // If timed out, then remove session and send final update
+ if (state.hasTimedOut()) {
+ SessionUpdate finalUpdate = new SessionUpdate(
+ sessionId, state.get().getDurationMs(), state.get().getNumEvents(), true);
+ state.remove();
+ return finalUpdate;
+
+ } else {
+ // Find max and min timestamps in events
+ long maxTimestampMs = Long.MIN_VALUE;
+ long minTimestampMs = Long.MAX_VALUE;
+ int numNewEvents = 0;
+ while (events.hasNext()) {
+ Event e = events.next();
+ long timestampMs = e.getTimestamp().getTime();
+ maxTimestampMs = Math.max(timestampMs, maxTimestampMs);
+ minTimestampMs = Math.min(timestampMs, minTimestampMs);
+ numNewEvents += 1;
+ }
+ SessionInfo updatedSession = new SessionInfo();
+
+ // Update start and end timestamps in session
+ if (state.exists()) {
+ SessionInfo oldSession = state.get();
+ updatedSession.setNumEvents(oldSession.numEvents + numNewEvents);
+ updatedSession.setStartTimestampMs(oldSession.startTimestampMs);
+ updatedSession.setEndTimestampMs(Math.max(oldSession.endTimestampMs, maxTimestampMs));
+ } else {
+ updatedSession.setNumEvents(numNewEvents);
+ updatedSession.setStartTimestampMs(minTimestampMs);
+ updatedSession.setEndTimestampMs(maxTimestampMs);
+ }
+ state.update(updatedSession);
+ // Set timeout such that the session will be expired if no data received for 10 seconds
+ state.setTimeoutDuration("10 seconds");
+ return new SessionUpdate(
+ sessionId, state.get().getDurationMs(), state.get().getNumEvents(), false);
+ }
+ }
+ };
+
+ // Step 2: Apply the state update function to the events streaming Dataset grouped by sessionId
+ Dataset<SessionUpdate> sessionUpdates = events
+ .groupByKey(
+ new MapFunction<Event, String>() {
+ @Override public String call(Event event) throws Exception {
+ return event.getSessionId();
+ }
+ }, Encoders.STRING())
+ .mapGroupsWithState(
+ stateUpdateFunc,
+ Encoders.bean(SessionInfo.class),
+ Encoders.bean(SessionUpdate.class),
+ GroupStateTimeout.ProcessingTimeTimeout());
+
+ // Start running the query that prints the session updates to the console
+ StreamingQuery query = sessionUpdates
+ .writeStream()
+ .outputMode("update")
+ .format("console")
+ .start();
+
+ query.awaitTermination();
+ }
+
+ /**
+ * User-defined data type representing the raw lines with timestamps.
+ */
+ public static class LineWithTimestamp implements Serializable {
+ private String line;
+ private Timestamp timestamp;
+
+ public Timestamp getTimestamp() { return timestamp; }
+ public void setTimestamp(Timestamp timestamp) { this.timestamp = timestamp; }
+
+ public String getLine() { return line; }
+ public void setLine(String sessionId) { this.line = sessionId; }
+ }
+
+ /**
+ * User-defined data type representing the input events
+ */
+ public static class Event implements Serializable {
+ private String sessionId;
+ private Timestamp timestamp;
+
+ public Event() { }
+ public Event(String sessionId, Timestamp timestamp) {
+ this.sessionId = sessionId;
+ this.timestamp = timestamp;
+ }
+
+ public Timestamp getTimestamp() { return timestamp; }
+ public void setTimestamp(Timestamp timestamp) { this.timestamp = timestamp; }
+
+ public String getSessionId() { return sessionId; }
+ public void setSessionId(String sessionId) { this.sessionId = sessionId; }
+ }
+
+ /**
+ * User-defined data type for storing a session information as state in mapGroupsWithState.
+ */
+ public static class SessionInfo implements Serializable {
+ private int numEvents = 0;
+ private long startTimestampMs = -1;
+ private long endTimestampMs = -1;
+
+ public int getNumEvents() { return numEvents; }
+ public void setNumEvents(int numEvents) { this.numEvents = numEvents; }
+
+ public long getStartTimestampMs() { return startTimestampMs; }
+ public void setStartTimestampMs(long startTimestampMs) {
+ this.startTimestampMs = startTimestampMs;
+ }
+
+ public long getEndTimestampMs() { return endTimestampMs; }
+ public void setEndTimestampMs(long endTimestampMs) { this.endTimestampMs = endTimestampMs; }
+
+ public long getDurationMs() { return endTimestampMs - startTimestampMs; }
+ @Override public String toString() {
+ return "SessionInfo(numEvents = " + numEvents +
+ ", timestamps = " + startTimestampMs + " to " + endTimestampMs + ")";
+ }
+ }
+
+ /**
+ * User-defined data type representing the update information returned by mapGroupsWithState.
+ */
+ public static class SessionUpdate implements Serializable {
+ private String id;
+ private long durationMs;
+ private int numEvents;
+ private boolean expired;
+
+ public SessionUpdate() { }
+
+ public SessionUpdate(String id, long durationMs, int numEvents, boolean expired) {
+ this.id = id;
+ this.durationMs = durationMs;
+ this.numEvents = numEvents;
+ this.expired = expired;
+ }
+
+ public String getId() { return id; }
+ public void setId(String id) { this.id = id; }
+
+ public long getDurationMs() { return durationMs; }
+ public void setDurationMs(long durationMs) { this.durationMs = durationMs; }
+
+ public int getNumEvents() { return numEvents; }
+ public void setNumEvents(int numEvents) { this.numEvents = numEvents; }
+
+ public boolean isExpired() { return expired; }
+ public void setExpired(boolean expired) { this.expired = expired; }
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
new file mode 100644
index 0000000000..2ce792c008
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.sql.streaming
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming._
+
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network.
+ *
+ * Usage: MapGroupsWithState <hostname> <port>
+ * <hostname> and <port> describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ bin/run-example sql.streaming.StructuredNetworkWordCount
+ * localhost 9999`
+ */
+object StructuredSessionization {
+
+ def main(args: Array[String]): Unit = {
+ if (args.length < 2) {
+ System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>")
+ System.exit(1)
+ }
+
+ val host = args(0)
+ val port = args(1).toInt
+
+ val spark = SparkSession
+ .builder
+ .appName("StructuredSessionization")
+ .getOrCreate()
+
+ import spark.implicits._
+
+ // Create DataFrame representing the stream of input lines from connection to host:port
+ val lines = spark.readStream
+ .format("socket")
+ .option("host", host)
+ .option("port", port)
+ .option("includeTimestamp", true)
+ .load()
+
+ // Split the lines into words, treat words as sessionId of events
+ val events = lines
+ .as[(String, Timestamp)]
+ .flatMap { case (line, timestamp) =>
+ line.split(" ").map(word => Event(sessionId = word, timestamp))
+ }
+
+ // Sessionize the events. Track number of events, start and end timestamps of session, and
+ // and report session updates.
+ val sessionUpdates = events
+ .groupByKey(event => event.sessionId)
+ .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
+
+ case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>
+
+ // If timed out, then remove session and send final update
+ if (state.hasTimedOut) {
+ val finalUpdate =
+ SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = true)
+ state.remove()
+ finalUpdate
+ } else {
+ // Update start and end timestamps in session
+ val timestamps = events.map(_.timestamp.getTime).toSeq
+ val updatedSession = if (state.exists) {
+ val oldSession = state.get
+ SessionInfo(
+ oldSession.numEvents + timestamps.size,
+ oldSession.startTimestampMs,
+ math.max(oldSession.endTimestampMs, timestamps.max))
+ } else {
+ SessionInfo(timestamps.size, timestamps.min, timestamps.max)
+ }
+ state.update(updatedSession)
+
+ // Set timeout such that the session will be expired if no data received for 10 seconds
+ state.setTimeoutDuration("10 seconds")
+ SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = false)
+ }
+ }
+
+ // Start running the query that prints the session updates to the console
+ val query = sessionUpdates
+ .writeStream
+ .outputMode("update")
+ .format("console")
+ .start()
+
+ query.awaitTermination()
+ }
+}
+/** User-defined data type representing the input events */
+case class Event(sessionId: String, timestamp: Timestamp)
+
+/**
+ * User-defined data type for storing a session information as state in mapGroupsWithState.
+ *
+ * @param numEvents total number of events received in the session
+ * @param startTimestampMs timestamp of first event received in the session when it started
+ * @param endTimestampMs timestamp of last event received in the session before it expired
+ */
+case class SessionInfo(
+ numEvents: Int,
+ startTimestampMs: Long,
+ endTimestampMs: Long) {
+
+ /** Duration of the session, between the first and last events */
+ def durationMs: Long = endTimestampMs - startTimestampMs
+}
+
+/**
+ * User-defined data type representing the update information returned by mapGroupsWithState.
+ *
+ * @param id Id of the session
+ * @param durationMs Duration the session was active, that is, from first event to its expiry
+ * @param numEvents Number of events received by the session while it was active
+ * @param expired Is the session active or expired
+ */
+case class SessionUpdate(
+ id: String,
+ durationMs: Long,
+ numEvents: Int,
+ expired: Boolean)
+
+// scalastyle:on println
+