aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/java
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2017-02-07 20:21:00 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-02-07 20:21:00 -0800
commitaeb80348dd40c66b84bbc5cfe60d716fbce25acb (patch)
treeb1ae06e53c7272cf4a408b7da2e1c7413061b0b2 /sql/core/src/main/java
parente33aaa2ac53a6e17e160e4e63821450b3609033b (diff)
downloadspark-aeb80348dd40c66b84bbc5cfe60d716fbce25acb.tar.gz
spark-aeb80348dd40c66b84bbc5cfe60d716fbce25acb.tar.bz2
spark-aeb80348dd40c66b84bbc5cfe60d716fbce25acb.zip
[SPARK-19413][SS] MapGroupsWithState for arbitrary stateful operations
## What changes were proposed in this pull request? `mapGroupsWithState` is a new API for arbitrary stateful operations in Structured Streaming, similar to `DStream.mapWithState` *Requirements* - Users should be able to specify a function that can do the following - Access the input row corresponding to a key - Access the previous state corresponding to a key - Optionally, update or remove the state - Output any number of new rows (or none at all) *Proposed API* ``` // ------------ New methods on KeyValueGroupedDataset ------------ class KeyValueGroupedDataset[K, V] { // Scala friendly def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => U) def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => Iterator[U]) // Java friendly def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) } // ------------------- New Java-friendly function classes ------------------- public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable { R call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception; } public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable { Iterator<R> call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception; } // ---------------------- Wrapper class for state data ---------------------- trait State[S] { def exists(): Boolean def get(): S // throws Exception is state does not exist def getOption(): Option[S] def update(newState: S): Unit def remove(): Unit // exists() will be false after this } ``` Key Semantics of the State class - The state can be null. - If the state.remove() is called, then state.exists() will return false, and getOption will returm None. - After that state.update(newState) is called, then state.exists() will return true, and getOption will return Some(...). - None of the operations are thread-safe. This is to avoid memory barriers. *Usage* ``` val stateFunc = (word: String, words: Iterator[String, runningCount: KeyedState[Long]) => { val newCount = words.size + runningCount.getOption.getOrElse(0L) runningCount.update(newCount) (word, newCount) } dataset // type is Dataset[String] .groupByKey[String](w => w) // generates KeyValueGroupedDataset[String, String] .mapGroupsWithState[Long, (String, Long)](stateFunc) // returns Dataset[(String, Long)] ``` ## How was this patch tested? New unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16758 from tdas/mapWithState.
Diffstat (limited to 'sql/core/src/main/java')
-rw-r--r--sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java38
-rw-r--r--sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java38
2 files changed, 76 insertions, 0 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java b/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java
new file mode 100644
index 0000000000..2570c8d02a
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.KeyedState;
+
+/**
+ * ::Experimental::
+ * Base interface for a map function used in
+ * {@link org.apache.spark.sql.KeyValueGroupedDataset#flatMapGroupsWithState(FlatMapGroupsWithStateFunction, Encoder, Encoder)}.
+ * @since 2.1.1
+ */
+@Experimental
+@InterfaceStability.Evolving
+public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable {
+ Iterator<R> call(K key, Iterator<V> values, KeyedState<S> state) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java b/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java
new file mode 100644
index 0000000000..614d3925e0
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.KeyedState;
+
+/**
+ * ::Experimental::
+ * Base interface for a map function used in
+ * {@link org.apache.spark.sql.KeyValueGroupedDataset#mapGroupsWithState(MapGroupsWithStateFunction, Encoder, Encoder)}
+ * @since 2.1.1
+ */
+@Experimental
+@InterfaceStability.Evolving
+public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
+ R call(K key, Iterator<V> values, KeyedState<S> state) throws Exception;
+}