aboutsummaryrefslogtreecommitdiff
path: root/docs/sql-programming-guide.md
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-06-10 00:11:46 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-06-10 00:11:46 -0700
commit00c310133df4f3893dd90d801168c2ab9841b102 (patch)
treeba13cb409c9cab4b214181340c7eedf6276b8388 /docs/sql-programming-guide.md
parent5a3533e779d8e43ce0980203dfd3cbe343cc7d0a (diff)
downloadspark-00c310133df4f3893dd90d801168c2ab9841b102.tar.gz
spark-00c310133df4f3893dd90d801168c2ab9841b102.tar.bz2
spark-00c310133df4f3893dd90d801168c2ab9841b102.zip
[SPARK-15593][SQL] Add DataFrameWriter.foreach to allow the user consuming data in ContinuousQuery
## What changes were proposed in this pull request? * Add DataFrameWriter.foreach to allow the user consuming data in ContinuousQuery * ForeachWriter is the interface for the user to consume partitions of data * Add a type parameter T to DataFrameWriter Usage ```Scala val ds = spark.read....stream().as[String] ds.....write .queryName(...) .option("checkpointLocation", ...) .foreach(new ForeachWriter[Int] { def open(partitionId: Long, version: Long): Boolean = { // prepare some resources for a partition // check `version` if possible and return `false` if this is a duplicated data to skip the data processing. } override def process(value: Int): Unit = { // process data } def close(errorOrNull: Throwable): Unit = { // release resources for a partition // check `errorOrNull` and handle the error if necessary. } }) ``` ## How was this patch tested? New unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13342 from zsxwing/foreach.
Diffstat (limited to 'docs/sql-programming-guide.md')
0 files changed, 0 insertions, 0 deletions