aboutsummaryrefslogtreecommitdiff
path: root/docs/structured-streaming-kafka-integration.md
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-10-05 16:45:45 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-10-05 16:45:45 -0700
commit9293734d35eb3d6e4fd4ebb86f54dd5d3a35e6db (patch)
treeece99a6177b900c44cca0a5fa4596c0f41c2cc13 /docs/structured-streaming-kafka-integration.md
parent5fd54b994e2078dbf0794932b4e0ffa9a9eda0c3 (diff)
downloadspark-9293734d35eb3d6e4fd4ebb86f54dd5d3a35e6db.tar.gz
spark-9293734d35eb3d6e4fd4ebb86f54dd5d3a35e6db.tar.bz2
spark-9293734d35eb3d6e4fd4ebb86f54dd5d3a35e6db.zip
[SPARK-17346][SQL] Add Kafka source for Structured Streaming
## What changes were proposed in this pull request? This PR adds a new project ` external/kafka-0-10-sql` for Structured Streaming Kafka source. It's based on the design doc: https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing tdas did most of work and part of them was inspired by koeninger's work. ### Introduction The Kafka source is a structured streaming data source to poll data from Kafka. The schema of reading data is as follows: Column | Type ---- | ---- key | binary value | binary topic | string partition | int offset | long timestamp | long timestampType | int The source can deal with deleting topics. However, the user should make sure there is no Spark job processing the data when deleting a topic. ### Configuration The user can use `DataStreamReader.option` to set the following configurations. Kafka Source's options | value | default | meaning ------ | ------- | ------ | ----- startingOffset | ["earliest", "latest"] | "latest" | The start point when a query is started, either "earliest" which is from the earliest offset, or "latest" which is just from the latest offset. Note: This only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off. failOnDataLost | [true, false] | true | Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected. subscribe | A comma-separated list of topics | (none) | The topic list to subscribe. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source. subscribePattern | Java regex string | (none) | The pattern used to subscribe the topic. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source. kafka.consumer.poll.timeoutMs | long | 512 | The timeout in milliseconds to poll data from Kafka in executors fetchOffset.numRetries | int | 3 | Number of times to retry before giving up fatch Kafka latest offsets. fetchOffset.retryIntervalMs | long | 10 | milliseconds to wait before retrying to fetch Kafka offsets Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, `stream.option("kafka.bootstrap.servers", "host:port")` ### Usage * Subscribe to 1 topic ```Scala spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host:port") .option("subscribe", "topic1") .load() ``` * Subscribe to multiple topics ```Scala spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host:port") .option("subscribe", "topic1,topic2") .load() ``` * Subscribe to a pattern ```Scala spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host:port") .option("subscribePattern", "topic.*") .load() ``` ## How was this patch tested? The new unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Shixiong Zhu <zsxwing@gmail.com> Author: cody koeninger <cody@koeninger.org> Closes #15102 from zsxwing/kafka-source.
Diffstat (limited to 'docs/structured-streaming-kafka-integration.md')
-rw-r--r--docs/structured-streaming-kafka-integration.md239
1 files changed, 239 insertions, 0 deletions
diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
new file mode 100644
index 0000000000..668489addf
--- /dev/null
+++ b/docs/structured-streaming-kafka-integration.md
@@ -0,0 +1,239 @@
+---
+layout: global
+title: Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
+---
+
+Structured Streaming integration for Kafka 0.10 to poll data from Kafka.
+
+### Linking
+For Scala/Java applications using SBT/Maven project definitions, link your application with the following artifact:
+
+ groupId = org.apache.spark
+ artifactId = spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}
+ version = {{site.SPARK_VERSION_SHORT}}
+
+For Python applications, you need to add this above library and its dependencies when deploying your
+application. See the [Deploying](#deploying) subsection below.
+
+### Creating a Kafka Source Stream
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+
+ // Subscribe to 1 topic
+ val ds1 = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribe", "topic1")
+ .load()
+ ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+
+ // Subscribe to multiple topics
+ val ds2 = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribe", "topic1,topic2")
+ .load()
+ ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+
+ // Subscribe to a pattern
+ val ds3 = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribePattern", "topic.*")
+ .load()
+ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+
+</div>
+<div data-lang="java" markdown="1">
+
+ // Subscribe to 1 topic
+ Dataset<Row> ds1 = spark
+ .readStream()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribe", "topic1")
+ .load()
+ ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+
+ // Subscribe to multiple topics
+ Dataset<Row> ds2 = spark
+ .readStream()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribe", "topic1,topic2")
+ .load()
+ ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+
+ // Subscribe to a pattern
+ Dataset<Row> ds3 = spark
+ .readStream()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribePattern", "topic.*")
+ .load()
+ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+
+</div>
+<div data-lang="python" markdown="1">
+
+ # Subscribe to 1 topic
+ ds1 = spark
+ .readStream()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribe", "topic1")
+ .load()
+ ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+
+ # Subscribe to multiple topics
+ ds2 = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribe", "topic1,topic2")
+ .load()
+ ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+
+ # Subscribe to a pattern
+ ds3 = spark
+ .readStream()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribePattern", "topic.*")
+ .load()
+ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+
+</div>
+</div>
+
+Each row in the source has the following schema:
+<table class="table">
+<tr><th>Column</th><th>Type</th></tr>
+<tr>
+ <td>key</td>
+ <td>binary</td>
+</tr>
+<tr>
+ <td>value</td>
+ <td>binary</td>
+</tr>
+<tr>
+ <td>topic</td>
+ <td>string</td>
+</tr>
+<tr>
+ <td>partition</td>
+ <td>int</td>
+</tr>
+<tr>
+ <td>offset</td>
+ <td>long</td>
+</tr>
+<tr>
+ <td>timestamp</td>
+ <td>long</td>
+</tr>
+<tr>
+ <td>timestampType</td>
+ <td>int</td>
+</tr>
+</table>
+
+The following options must be set for the Kafka source.
+
+<table class="table">
+<tr><th>Option</th><th>value</th><th>meaning</th></tr>
+<tr>
+ <td>subscribe</td>
+ <td>A comma-separated list of topics</td>
+ <td>The topic list to subscribe. Only one of "subscribe" and "subscribePattern" options can be
+ specified for Kafka source.</td>
+</tr>
+<tr>
+ <td>subscribePattern</td>
+ <td>Java regex string</td>
+ <td>The pattern used to subscribe the topic. Only one of "subscribe" and "subscribePattern"
+ options can be specified for Kafka source.</td>
+</tr>
+<tr>
+ <td>kafka.bootstrap.servers</td>
+ <td>A comma-separated list of host:port</td>
+ <td>The Kafka "bootstrap.servers" configuration.</td>
+</tr>
+</table>
+
+The following configurations are optional:
+
+<table class="table">
+<tr><th>Option</th><th>value</th><th>default</th><th>meaning</th></tr>
+<tr>
+ <td>startingOffset</td>
+ <td>["earliest", "latest"]</td>
+ <td>"latest"</td>
+ <td>The start point when a query is started, either "earliest" which is from the earliest offset,
+ or "latest" which is just from the latest offset. Note: This only applies when a new Streaming q
+ uery is started, and that resuming will always pick up from where the query left off.</td>
+</tr>
+<tr>
+ <td>failOnDataLoss</td>
+ <td>[true, false]</td>
+ <td>true</td>
+ <td>Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or
+ offsets are out of range). This may be a false alarm. You can disable it when it doesn't work
+ as you expected.</td>
+</tr>
+<tr>
+ <td>kafkaConsumer.pollTimeoutMs</td>
+ <td>long</td>
+ <td>512</td>
+ <td>The timeout in milliseconds to poll data from Kafka in executors.</td>
+</tr>
+<tr>
+ <td>fetchOffset.numRetries</td>
+ <td>int</td>
+ <td>3</td>
+ <td>Number of times to retry before giving up fatch Kafka latest offsets.</td>
+</tr>
+<tr>
+ <td>fetchOffset.retryIntervalMs</td>
+ <td>long</td>
+ <td>10</td>
+ <td>milliseconds to wait before retrying to fetch Kafka offsets</td>
+</tr>
+</table>
+
+Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g,
+`stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafkaParams, see
+[Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs).
+
+Note that the following Kafka params cannot be set and the Kafka source will throw an exception:
+- **group.id**: Kafka source will create a unique group id for each query automatically.
+- **auto.offset.reset**: Set the source option `startingOffset` to `earliest` or `latest` to specify
+ where to start instead. Structured Streaming manages which offsets are consumed internally, rather
+ than rely on the kafka Consumer to do it. This will ensure that no data is missed when when new
+ topics/partitions are dynamically subscribed. Note that `startingOffset` only applies when a new
+ Streaming query is started, and that resuming will always pick up from where the query left off.
+- **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use
+ DataFrame operations to explicitly deserialize the keys.
+- **value.deserializer**: Values are always deserialized as byte arrays with ByteArrayDeserializer.
+ Use DataFrame operations to explicitly deserialize the values.
+- **enable.auto.commit**: Kafka source doesn't commit any offset.
+- **interceptor.classes**: Kafka source always read keys and values as byte arrays. It's not safe to
+ use ConsumerInterceptor as it may break the query.
+
+### Deploying
+
+As with any Spark applications, `spark-submit` is used to launch your application. `spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}`
+and its dependencies can be directly added to `spark-submit` using `--packages`, such as,
+
+ ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ...
+
+See [Application Submission Guide](submitting-applications.html) for more details about submitting
+applications with external dependencies.