aboutsummaryrefslogtreecommitdiff
path: root/dev
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-10-05 16:45:45 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-10-05 16:45:45 -0700
commit9293734d35eb3d6e4fd4ebb86f54dd5d3a35e6db (patch)
treeece99a6177b900c44cca0a5fa4596c0f41c2cc13 /dev
parent5fd54b994e2078dbf0794932b4e0ffa9a9eda0c3 (diff)
downloadspark-9293734d35eb3d6e4fd4ebb86f54dd5d3a35e6db.tar.gz
spark-9293734d35eb3d6e4fd4ebb86f54dd5d3a35e6db.tar.bz2
spark-9293734d35eb3d6e4fd4ebb86f54dd5d3a35e6db.zip
[SPARK-17346][SQL] Add Kafka source for Structured Streaming
## What changes were proposed in this pull request? This PR adds a new project ` external/kafka-0-10-sql` for Structured Streaming Kafka source. It's based on the design doc: https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing tdas did most of work and part of them was inspired by koeninger's work. ### Introduction The Kafka source is a structured streaming data source to poll data from Kafka. The schema of reading data is as follows: Column | Type ---- | ---- key | binary value | binary topic | string partition | int offset | long timestamp | long timestampType | int The source can deal with deleting topics. However, the user should make sure there is no Spark job processing the data when deleting a topic. ### Configuration The user can use `DataStreamReader.option` to set the following configurations. Kafka Source's options | value | default | meaning ------ | ------- | ------ | ----- startingOffset | ["earliest", "latest"] | "latest" | The start point when a query is started, either "earliest" which is from the earliest offset, or "latest" which is just from the latest offset. Note: This only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off. failOnDataLost | [true, false] | true | Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected. subscribe | A comma-separated list of topics | (none) | The topic list to subscribe. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source. subscribePattern | Java regex string | (none) | The pattern used to subscribe the topic. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source. kafka.consumer.poll.timeoutMs | long | 512 | The timeout in milliseconds to poll data from Kafka in executors fetchOffset.numRetries | int | 3 | Number of times to retry before giving up fatch Kafka latest offsets. fetchOffset.retryIntervalMs | long | 10 | milliseconds to wait before retrying to fetch Kafka offsets Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, `stream.option("kafka.bootstrap.servers", "host:port")` ### Usage * Subscribe to 1 topic ```Scala spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host:port") .option("subscribe", "topic1") .load() ``` * Subscribe to multiple topics ```Scala spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host:port") .option("subscribe", "topic1,topic2") .load() ``` * Subscribe to a pattern ```Scala spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host:port") .option("subscribePattern", "topic.*") .load() ``` ## How was this patch tested? The new unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Shixiong Zhu <zsxwing@gmail.com> Author: cody koeninger <cody@koeninger.org> Closes #15102 from zsxwing/kafka-source.
Diffstat (limited to 'dev')
-rwxr-xr-xdev/run-tests.py2
-rw-r--r--dev/sparktestsupport/modules.py12
2 files changed, 13 insertions, 1 deletions
diff --git a/dev/run-tests.py b/dev/run-tests.py
index ae4b5306fc..5d661f5f1a 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -110,7 +110,7 @@ def determine_modules_to_test(changed_modules):
['graphx', 'examples']
>>> x = [x.name for x in determine_modules_to_test([modules.sql])]
>>> x # doctest: +NORMALIZE_WHITESPACE
- ['sql', 'hive', 'mllib', 'examples', 'hive-thriftserver',
+ ['sql', 'hive', 'mllib', 'sql-kafka-0-10', 'examples', 'hive-thriftserver',
'pyspark-sql', 'sparkr', 'pyspark-mllib', 'pyspark-ml']
"""
modules_to_test = set()
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 050cdf0437..5f14683d9a 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -158,6 +158,18 @@ hive_thriftserver = Module(
)
+sql_kafka = Module(
+ name="sql-kafka-0-10",
+ dependencies=[sql],
+ source_file_regexes=[
+ "external/kafka-0-10-sql",
+ ],
+ sbt_test_goals=[
+ "sql-kafka-0-10/test",
+ ]
+)
+
+
sketch = Module(
name="sketch",
dependencies=[tags],