diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-10-05 16:45:45 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-10-05 16:45:45 -0700 |
commit | 9293734d35eb3d6e4fd4ebb86f54dd5d3a35e6db (patch) | |
tree | ece99a6177b900c44cca0a5fa4596c0f41c2cc13 /dev/run-tests.py | |
parent | 5fd54b994e2078dbf0794932b4e0ffa9a9eda0c3 (diff) | |
download | spark-9293734d35eb3d6e4fd4ebb86f54dd5d3a35e6db.tar.gz spark-9293734d35eb3d6e4fd4ebb86f54dd5d3a35e6db.tar.bz2 spark-9293734d35eb3d6e4fd4ebb86f54dd5d3a35e6db.zip |
[SPARK-17346][SQL] Add Kafka source for Structured Streaming
## What changes were proposed in this pull request?
This PR adds a new project ` external/kafka-0-10-sql` for Structured Streaming Kafka source.
It's based on the design doc: https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing
tdas did most of work and part of them was inspired by koeninger's work.
### Introduction
The Kafka source is a structured streaming data source to poll data from Kafka. The schema of reading data is as follows:
Column | Type
---- | ----
key | binary
value | binary
topic | string
partition | int
offset | long
timestamp | long
timestampType | int
The source can deal with deleting topics. However, the user should make sure there is no Spark job processing the data when deleting a topic.
### Configuration
The user can use `DataStreamReader.option` to set the following configurations.
Kafka Source's options | value | default | meaning
------ | ------- | ------ | -----
startingOffset | ["earliest", "latest"] | "latest" | The start point when a query is started, either "earliest" which is from the earliest offset, or "latest" which is just from the latest offset. Note: This only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off.
failOnDataLost | [true, false] | true | Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected.
subscribe | A comma-separated list of topics | (none) | The topic list to subscribe. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source.
subscribePattern | Java regex string | (none) | The pattern used to subscribe the topic. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source.
kafka.consumer.poll.timeoutMs | long | 512 | The timeout in milliseconds to poll data from Kafka in executors
fetchOffset.numRetries | int | 3 | Number of times to retry before giving up fatch Kafka latest offsets.
fetchOffset.retryIntervalMs | long | 10 | milliseconds to wait before retrying to fetch Kafka offsets
Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, `stream.option("kafka.bootstrap.servers", "host:port")`
### Usage
* Subscribe to 1 topic
```Scala
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "topic1")
.load()
```
* Subscribe to multiple topics
```Scala
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "topic1,topic2")
.load()
```
* Subscribe to a pattern
```Scala
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribePattern", "topic.*")
.load()
```
## How was this patch tested?
The new unit tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Shixiong Zhu <zsxwing@gmail.com>
Author: cody koeninger <cody@koeninger.org>
Closes #15102 from zsxwing/kafka-source.
Diffstat (limited to 'dev/run-tests.py')
-rwxr-xr-x | dev/run-tests.py | 2 |
1 files changed, 1 insertions, 1 deletions
diff --git a/dev/run-tests.py b/dev/run-tests.py index ae4b5306fc..5d661f5f1a 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -110,7 +110,7 @@ def determine_modules_to_test(changed_modules): ['graphx', 'examples'] >>> x = [x.name for x in determine_modules_to_test([modules.sql])] >>> x # doctest: +NORMALIZE_WHITESPACE - ['sql', 'hive', 'mllib', 'examples', 'hive-thriftserver', + ['sql', 'hive', 'mllib', 'sql-kafka-0-10', 'examples', 'hive-thriftserver', 'pyspark-sql', 'sparkr', 'pyspark-mllib', 'pyspark-ml'] """ modules_to_test = set() |