diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-06-10 00:11:46 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-06-10 00:11:46 -0700 |
commit | 00c310133df4f3893dd90d801168c2ab9841b102 (patch) | |
tree | ba13cb409c9cab4b214181340c7eedf6276b8388 /docs/sql-programming-guide.md | |
parent | 5a3533e779d8e43ce0980203dfd3cbe343cc7d0a (diff) | |
download | spark-00c310133df4f3893dd90d801168c2ab9841b102.tar.gz spark-00c310133df4f3893dd90d801168c2ab9841b102.tar.bz2 spark-00c310133df4f3893dd90d801168c2ab9841b102.zip |
[SPARK-15593][SQL] Add DataFrameWriter.foreach to allow the user consuming data in ContinuousQuery
## What changes were proposed in this pull request?
* Add DataFrameWriter.foreach to allow the user consuming data in ContinuousQuery
* ForeachWriter is the interface for the user to consume partitions of data
* Add a type parameter T to DataFrameWriter
Usage
```Scala
val ds = spark.read....stream().as[String]
ds.....write
.queryName(...)
.option("checkpointLocation", ...)
.foreach(new ForeachWriter[Int] {
def open(partitionId: Long, version: Long): Boolean = {
// prepare some resources for a partition
// check `version` if possible and return `false` if this is a duplicated data to skip the data processing.
}
override def process(value: Int): Unit = {
// process data
}
def close(errorOrNull: Throwable): Unit = {
// release resources for a partition
// check `errorOrNull` and handle the error if necessary.
}
})
```
## How was this patch tested?
New unit tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #13342 from zsxwing/foreach.
Diffstat (limited to 'docs/sql-programming-guide.md')
0 files changed, 0 insertions, 0 deletions