diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2017-02-07 20:21:00 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2017-02-07 20:21:00 -0800 |
commit | aeb80348dd40c66b84bbc5cfe60d716fbce25acb (patch) | |
tree | b1ae06e53c7272cf4a408b7da2e1c7413061b0b2 /sql/core/src/main/java | |
parent | e33aaa2ac53a6e17e160e4e63821450b3609033b (diff) | |
download | spark-aeb80348dd40c66b84bbc5cfe60d716fbce25acb.tar.gz spark-aeb80348dd40c66b84bbc5cfe60d716fbce25acb.tar.bz2 spark-aeb80348dd40c66b84bbc5cfe60d716fbce25acb.zip |
[SPARK-19413][SS] MapGroupsWithState for arbitrary stateful operations
## What changes were proposed in this pull request?
`mapGroupsWithState` is a new API for arbitrary stateful operations in Structured Streaming, similar to `DStream.mapWithState`
*Requirements*
- Users should be able to specify a function that can do the following
- Access the input row corresponding to a key
- Access the previous state corresponding to a key
- Optionally, update or remove the state
- Output any number of new rows (or none at all)
*Proposed API*
```
// ------------ New methods on KeyValueGroupedDataset ------------
class KeyValueGroupedDataset[K, V] {
// Scala friendly
def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => U)
def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => Iterator[U])
// Java friendly
def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
}
// ------------------- New Java-friendly function classes -------------------
public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
R call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
}
public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable {
Iterator<R> call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
}
// ---------------------- Wrapper class for state data ----------------------
trait State[S] {
def exists(): Boolean
def get(): S // throws Exception is state does not exist
def getOption(): Option[S]
def update(newState: S): Unit
def remove(): Unit // exists() will be false after this
}
```
Key Semantics of the State class
- The state can be null.
- If the state.remove() is called, then state.exists() will return false, and getOption will returm None.
- After that state.update(newState) is called, then state.exists() will return true, and getOption will return Some(...).
- None of the operations are thread-safe. This is to avoid memory barriers.
*Usage*
```
val stateFunc = (word: String, words: Iterator[String, runningCount: KeyedState[Long]) => {
val newCount = words.size + runningCount.getOption.getOrElse(0L)
runningCount.update(newCount)
(word, newCount)
}
dataset // type is Dataset[String]
.groupByKey[String](w => w) // generates KeyValueGroupedDataset[String, String]
.mapGroupsWithState[Long, (String, Long)](stateFunc) // returns Dataset[(String, Long)]
```
## How was this patch tested?
New unit tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #16758 from tdas/mapWithState.
Diffstat (limited to 'sql/core/src/main/java')
2 files changed, 76 insertions, 0 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java b/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java new file mode 100644 index 0000000000..2570c8d02a --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java.function; + +import java.io.Serializable; +import java.util.Iterator; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.KeyedState; + +/** + * ::Experimental:: + * Base interface for a map function used in + * {@link org.apache.spark.sql.KeyValueGroupedDataset#flatMapGroupsWithState(FlatMapGroupsWithStateFunction, Encoder, Encoder)}. + * @since 2.1.1 + */ +@Experimental +@InterfaceStability.Evolving +public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable { + Iterator<R> call(K key, Iterator<V> values, KeyedState<S> state) throws Exception; +} diff --git a/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java b/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java new file mode 100644 index 0000000000..614d3925e0 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java.function; + +import java.io.Serializable; +import java.util.Iterator; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.KeyedState; + +/** + * ::Experimental:: + * Base interface for a map function used in + * {@link org.apache.spark.sql.KeyValueGroupedDataset#mapGroupsWithState(MapGroupsWithStateFunction, Encoder, Encoder)} + * @since 2.1.1 + */ +@Experimental +@InterfaceStability.Evolving +public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable { + R call(K key, Iterator<V> values, KeyedState<S> state) throws Exception; +} |